tutanota/src/api/worker/EventBusClient.ts

661 lines
22 KiB
TypeScript
Raw Normal View History

2021-12-23 14:03:23 +01:00
import {LoginFacadeImpl} from "./facades/LoginFacade"
import type {MailFacade} from "./facades/MailFacade"
import type {WorkerImpl} from "./WorkerImpl"
2021-02-03 17:13:38 +01:00
import {assertWorkerOrNode, getWebsocketOrigin, isAdminClient, isTest, Mode} from "../common/Env"
2017-08-15 13:54:22 +02:00
import {_TypeModel as MailTypeModel} from "../entities/tutanota/Mail"
import {
AccessBlockedError,
AccessDeactivatedError,
ConnectionError,
handleRestError,
NotAuthorizedError,
ServiceUnavailableError,
2021-12-23 14:03:23 +01:00
SessionExpiredError,
} from "../common/error/RestError"
2017-08-15 13:54:22 +02:00
import {EntityEventBatchTypeRef} from "../entities/sys/EntityEventBatch"
import {
assertNotNull,
binarySearch,
delay,
downcast,
identity,
lastThrow,
neverNull,
ofClass,
promiseMap,
randomIntFromInterval
} from "@tutao/tutanota-utils"
2017-08-15 13:54:22 +02:00
import {OutOfSyncError} from "../common/error/OutOfSyncError"
import type {Indexer} from "./search/Indexer"
2021-12-23 14:03:23 +01:00
import type {CloseEventBusOption} from "../common/TutanotaConstants"
import {CloseEventBusOption, GroupType, SECOND_MS} from "../common/TutanotaConstants"
import type {WebsocketEntityData} from "../entities/sys/WebsocketEntityData"
2019-01-17 15:42:16 +01:00
import {_TypeModel as WebsocketEntityDataTypeModel} from "../entities/sys/WebsocketEntityData"
import {CancelledError} from "../common/error/CancelledError"
import {_TypeModel as PhishingMarkerWebsocketDataTypeModel} from "../entities/tutanota/PhishingMarkerWebsocketData"
import type {EntityUpdate} from "../entities/sys/EntityUpdate"
import type {EntityRestInterface} from "./rest/EntityRestClient"
import {EntityClient} from "../common/EntityClient"
import type {QueuedBatch} from "./search/EventQueue"
import {EventQueue} from "./search/EventQueue"
2021-12-23 14:03:23 +01:00
import {
_TypeModel as WebsocketLeaderStatusTypeModel,
createWebsocketLeaderStatus
} from "../entities/sys/WebsocketLeaderStatus"
2020-11-17 16:29:19 +01:00
import {ProgressMonitorDelegate} from "./ProgressMonitorDelegate"
import type {IProgressMonitor} from "../common/utils/ProgressMonitor"
import {NoopProgressMonitor} from "../common/utils/ProgressMonitor"
2019-09-13 13:49:11 +02:00
import {
compareOldestFirst,
firstBiggerThanSecond,
GENERATED_MAX_ID,
GENERATED_MIN_ID,
getElementId,
getLetId,
isSameId
2021-12-23 14:03:23 +01:00
} from "../common/utils/EntityUtils"
2021-12-15 16:07:07 +01:00
import {InstanceMapper} from "./crypto/InstanceMapper"
2017-08-15 13:54:22 +02:00
assertWorkerOrNode()
2021-12-23 14:03:23 +01:00
export const enum EventBusState {
Automatic= "automatic",
// automatic reconnection is enabled
Suspended = "suspended",
// automatic reconnection is suspended but can be enabled again
Terminated = "terminated", // automatic reconnection is disabled and websocket is closed but can be opened again by calling connect explicit
}
2020-09-15 11:35:25 +02:00
// EntityEventBatches expire after 45 days. keep a time diff security of one day.
const ENTITY_EVENT_BATCH_EXPIRE_MS = 44 * 24 * 60 * 60 * 1000
const RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS = 30000
const NORMAL_SHUTDOWN_CLOSE_CODE = 1
const LARGE_RECONNECT_INTERVAL = [60, 120]
const SMALL_RECONNECT_INTERVAL = [5, 10]
const MEDIUM_RECONNECT_INTERVAL = [20, 40]
2017-08-15 13:54:22 +02:00
export class EventBusClient {
2021-12-23 14:03:23 +01:00
_MAX_EVENT_IDS_QUEUE_LENGTH: number
readonly _indexer: Indexer
readonly _cache: EntityRestInterface
readonly _entity: EntityClient
readonly _worker: WorkerImpl
readonly _mail: MailFacade
readonly _login: LoginFacadeImpl
_state: EventBusState
_socket: WebSocket | null
_immediateReconnect: boolean // if true tries to reconnect immediately after the websocket is closed
/**
* Map from group id to last event ids (max. _MAX_EVENT_IDS_QUEUE_LENGTH). We keep them to avoid processing the same event twice if
* it comes out of order from the server) and for requesting missed entity events on reconnect.
*
* We do not have to update these event ids if the groups of the user change because we always take the current users groups from the
* LoginFacade.
*/
2021-12-23 14:03:23 +01:00
_lastEntityEventIds: Map<Id, Array<Id>>
2020-12-11 14:45:42 +01:00
/**
* Last batch which was actually added to the queue. We need it to find out when the group is processed
*/
2021-12-23 14:03:23 +01:00
_lastAddedBatchForGroup: Map<Id, Id>
2020-12-11 14:45:42 +01:00
/**
* The last time we received an EntityEventBatch or checked for updates. we use this to find out if our data has expired.
* see ENTITY_EVENT_BATCH_EXPIRE_MS
*/
2021-12-23 14:03:23 +01:00
_lastUpdateTime: number
_lastAntiphishingMarkersId: Id | null
2017-08-15 13:54:22 +02:00
2020-12-10 17:18:19 +01:00
/* queue to process all events. */
2021-12-23 14:03:23 +01:00
readonly _eventQueue: EventQueue
2017-08-15 13:54:22 +02:00
2021-12-23 14:03:23 +01:00
/* queue that handles incoming websocket messages while. */
readonly _entityUpdateMessageQueue: EventQueue
_reconnectTimer: TimeoutID | null
_connectTimer: TimeoutID | null
/**
* Represents a currently retried executing due to a ServiceUnavailableError
*/
2021-12-23 14:03:23 +01:00
_serviceUnavailableRetry: Promise<void> | null
_failedConnectionAttempts: number = 0
_progressMonitor: IProgressMonitor
2021-12-15 16:07:07 +01:00
_instanceMapper: InstanceMapper
2021-12-23 14:03:23 +01:00
constructor(
worker: WorkerImpl,
indexer: Indexer,
cache: EntityRestInterface,
mail: MailFacade,
login: LoginFacadeImpl,
entityClient: EntityClient,
instanceMapper: InstanceMapper,
) {
this._indexer = indexer
this._cache = cache
this._entity = entityClient
2020-12-11 14:45:42 +01:00
this._worker = worker
this._mail = mail
this._login = login
2021-12-15 16:07:07 +01:00
this._instanceMapper = instanceMapper
this._state = EventBusState.Automatic
2020-12-11 14:45:42 +01:00
this._lastEntityEventIds = new Map()
this._lastAddedBatchForGroup = new Map()
this._socket = null
this._reconnectTimer = null
this._connectTimer = null
this._progressMonitor = new NoopProgressMonitor()
2021-12-23 14:03:23 +01:00
this._eventQueue = new EventQueue(true, modification => {
return this._processEventBatch(modification)
2021-12-23 14:03:23 +01:00
.catch(e => {
console.log("Error while processing event batches", e)
this._worker.sendError(e)
2020-12-09 16:07:56 +01:00
2021-12-23 14:03:23 +01:00
throw e
})
.then(() => {
// If we completed the event, it was added before
const lastForGroup = assertNotNull(this._lastAddedBatchForGroup.get(modification.groupId))
if (isSameId(modification.batchId, lastForGroup) || firstBiggerThanSecond(modification.batchId, lastForGroup)) {
this._progressMonitor && this._progressMonitor.workDone(1)
}
})
})
this._entityUpdateMessageQueue = new EventQueue(false, batch => {
2020-12-11 14:45:42 +01:00
this._addBatch(batch.batchId, batch.groupId, batch.events)
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
this._eventQueue.resume()
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
return Promise.resolve()
2020-12-10 17:18:19 +01:00
})
2020-12-09 16:07:56 +01:00
this._reset()
// we store the last 1000 event ids per group, so we know if an event was already processed.
// it is not sufficient to check the last event id because a smaller event id may arrive later
// than a bigger one if the requests are processed in parallel on the server
this._MAX_EVENT_IDS_QUEUE_LENGTH = 1000
}
_reset(): void {
this._immediateReconnect = false
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
this._lastEntityEventIds.clear()
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
this._lastAddedBatchForGroup.clear()
2021-12-23 14:03:23 +01:00
2020-12-09 16:07:56 +01:00
this._lastUpdateTime = 0
2021-12-23 14:03:23 +01:00
2020-12-09 16:07:56 +01:00
this._eventQueue.pause()
2021-12-23 14:03:23 +01:00
2020-12-09 16:07:56 +01:00
this._eventQueue.clear()
this._serviceUnavailableRetry = null
2017-12-20 16:48:36 +01:00
}
2017-08-15 13:54:22 +02:00
/**
* Opens a WebSocket connection to receive server events.
* @param reconnect Set to true if the connection has been opened before.
* @returns The event bus client object.
*/
connect(reconnect: boolean) {
if (env.mode === Mode.Test) {
return
}
2018-10-19 18:13:58 +02:00
2021-12-23 14:03:23 +01:00
console.log(new Date().toISOString(), "ws connect reconnect=", reconnect, "state:", this._state)
// make sure a retry will be cancelled by setting _serviceUnavailableRetry to null
this._serviceUnavailableRetry = null
2021-12-23 14:03:23 +01:00
this._worker.updateWebSocketState("connecting")
2020-12-07 17:02:35 +01:00
// Task for updating events are number of groups + 2. Use 2 as base for reconnect state.
if (this._progressMonitor) {
// Say that the old monitor is completed so that we don't calculate its amount as still to do.
this._progressMonitor.completed()
}
2021-12-23 14:03:23 +01:00
this._progressMonitor = reconnect ? new ProgressMonitorDelegate(this._eventGroups().length + 2, this._worker) : new NoopProgressMonitor()
this._progressMonitor.workDone(1)
2021-12-23 14:03:23 +01:00
this._state = EventBusState.Automatic
this._connectTimer = null
2018-07-31 17:07:41 +02:00
2019-01-17 15:42:16 +01:00
const authHeaders = this._login.createAuthHeaders()
2021-12-23 14:03:23 +01:00
2019-01-17 15:42:16 +01:00
// Native query building is not supported in old browser, mithril is not available in the worker
const authQuery =
2021-12-23 14:03:23 +01:00
"modelVersions=" +
WebsocketEntityDataTypeModel.version +
"." +
MailTypeModel.version +
"&clientVersion=" +
env.versionNumber +
"&userId=" +
this._login.getLoggedInUser()._id +
"&accessToken=" +
authHeaders.accessToken +
(this._lastAntiphishingMarkersId ? "&lastPhishingMarkersId=" + this._lastAntiphishingMarkersId : "")
let url = getWebsocketOrigin() + "/event?" + authQuery
this._unsubscribeFromOldWebsocket()
2021-12-23 14:03:23 +01:00
this._socket = new WebSocket(url)
2020-12-11 14:45:42 +01:00
this._socket.onopen = () => this._onOpen(reconnect)
2021-12-23 14:03:23 +01:00
this._socket.onclose = (event: CloseEvent) => this._close(event)
this._socket.onerror = (error: any) => this._error(error)
this._socket.onmessage = (message: MessageEvent) => this._message(message)
2017-08-15 13:54:22 +02:00
}
// Returning promise for tests
_onOpen(reconnect: boolean): Promise<void> {
2020-12-11 14:45:42 +01:00
this._failedConnectionAttempts = 0
2021-12-23 14:03:23 +01:00
console.log("ws open: ", new Date(), "state:", this._state)
2020-12-11 14:45:42 +01:00
// Indicate some progress right away
this._progressMonitor.workDone(1)
2021-12-23 14:03:23 +01:00
const p = this._initEntityEvents(reconnect)
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
this._worker.updateWebSocketState("connected")
2021-12-23 14:03:23 +01:00
return p
2020-12-11 14:45:42 +01:00
}
_initEntityEvents(reconnect: boolean): Promise<void> {
2020-12-10 17:18:19 +01:00
// pause processing entity update message while initializing event queue
this._entityUpdateMessageQueue.pause()
2021-12-23 14:03:23 +01:00
2020-12-10 17:18:19 +01:00
// pause event queue and add all missed entity events first
this._eventQueue.pause()
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
let existingConnection = reconnect && this._lastEntityEventIds.size > 0
let p = existingConnection ? this._loadMissedEntityEvents() : this._setLatestEntityEventIds()
2021-12-23 14:03:23 +01:00
return p
.then(() => {
this._entityUpdateMessageQueue.resume()
this._eventQueue.resume()
})
.catch(
ofClass(ConnectionError, e => {
console.log("not connected in connect(), close websocket", e)
this.close(CloseEventBusOption.Reconnect)
}),
)
.catch(
ofClass(CancelledError, e => {
// the processing was aborted due to a reconnect. do not reset any attributes because they might already be in use since reconnection
console.log("cancelled retry process entity events after reconnect")
}),
)
.catch(
ofClass(ServiceUnavailableError, e => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
// some EventBatches/missed events are processed already now
// for an existing connection we just keep the current state and continue loading missed events for the other groups
// for a new connection we reset the last entity event ids because otherwise this would not be completed in the next try
if (!existingConnection) {
// FIXME: why?
this._lastEntityEventIds.clear()
this._lastUpdateTime = 0
}
console.log("retry init entity events in 30s", e)
let promise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this._serviceUnavailableRetry === promise) {
console.log("retry initializing entity events")
return this._initEntityEvents(reconnect)
} else {
console.log("cancel initializing entity events")
}
})
this._serviceUnavailableRetry = promise
return promise
}),
)
.catch(e => {
this._entityUpdateMessageQueue.resume()
2021-12-23 14:03:23 +01:00
this._eventQueue.resume()
this._worker.sendError(e)
})
}
2017-08-15 13:54:22 +02:00
/**
2017-09-28 11:32:02 +02:00
* Sends a close event to the server and finally closes the connection.
2017-12-20 16:48:36 +01:00
* The state of this event bus client is reset and the client is terminated (does not automatically reconnect) except reconnect == true
2017-08-15 13:54:22 +02:00
*/
2021-12-23 14:03:23 +01:00
close(closeOption: CloseEventBusOption) {
console.log(new Date().toISOString(), "ws close closeOption: ", closeOption, "state:", this._state)
switch (closeOption) {
case CloseEventBusOption.Terminate:
this._terminate()
2021-12-23 14:03:23 +01:00
break
2021-12-23 14:03:23 +01:00
case CloseEventBusOption.Pause:
this._state = EventBusState.Suspended
2021-12-23 14:03:23 +01:00
this._worker.updateWebSocketState("connecting")
2021-12-23 14:03:23 +01:00
break
2021-12-23 14:03:23 +01:00
case CloseEventBusOption.Reconnect:
this._worker.updateWebSocketState("connecting")
2021-12-23 14:03:23 +01:00
break
}
this._socket?.close()
2017-08-15 13:54:22 +02:00
}
_unsubscribeFromOldWebsocket() {
if (this._socket) {
// Remove listeners. We don't want old socket to mess our state
this._socket.onopen = this._socket.onclose = this._socket.onerror = this._socket.onmessage = identity
}
}
2017-12-20 16:48:36 +01:00
_terminate(): void {
this._state = EventBusState.Terminated
2021-12-23 14:03:23 +01:00
2017-12-20 16:48:36 +01:00
this._reset()
2021-12-23 14:03:23 +01:00
this._worker.updateWebSocketState("terminated")
2017-12-20 16:48:36 +01:00
}
2017-08-15 13:54:22 +02:00
_error(error: any) {
2021-12-23 14:03:23 +01:00
console.log(new Date().toISOString(), "ws error: ", error, JSON.stringify(error), "state:", this._state)
2017-08-15 13:54:22 +02:00
}
_message(message: MessageEvent): Promise<void> {
//console.log("ws message: ", message.data);
2019-01-17 15:42:16 +01:00
const [type, value] = downcast(message.data).split(";")
2021-12-23 14:03:23 +01:00
2019-01-17 15:42:16 +01:00
if (type === "entityUpdate") {
// specify type of decrypted entity explicitly because decryptAndMapToInstance effectively returns `any`
2021-12-15 16:07:07 +01:00
return this._instanceMapper.decryptAndMapToInstance(WebsocketEntityDataTypeModel, JSON.parse(value), null).then((data: WebsocketEntityData) => {
2020-12-10 17:18:19 +01:00
this._entityUpdateMessageQueue.add(data.eventBatchId, data.eventBatchOwner, data.eventBatch)
})
2019-01-21 10:48:07 +01:00
} else if (type === "unreadCounterUpdate") {
this._worker.updateCounter(JSON.parse(value))
} else if (type === "phishingMarkers") {
2021-12-23 14:03:23 +01:00
return this._instanceMapper.decryptAndMapToInstance(PhishingMarkerWebsocketDataTypeModel, JSON.parse(value), null).then(data => {
this._lastAntiphishingMarkersId = data.lastId
2020-10-13 17:15:52 +02:00
2021-12-23 14:03:23 +01:00
this._mail.phishingMarkersUpdateReceived(data.markers)
})
} else if (type === "leaderStatus") {
return this._instanceMapper.decryptAndMapToInstance(WebsocketLeaderStatusTypeModel, JSON.parse(value), null).then(status => {
return this._login.setLeaderStatus(status)
})
} else {
console.log("ws message with unknown type", type)
2019-01-17 15:42:16 +01:00
}
2021-12-23 14:03:23 +01:00
2019-01-17 15:42:16 +01:00
return Promise.resolve()
2017-08-15 13:54:22 +02:00
}
_close(event: CloseEvent) {
this._failedConnectionAttempts++
2021-12-23 14:03:23 +01:00
console.log(new Date().toISOString(), "ws _close: ", event, "state:", this._state)
this._login.setLeaderStatus(
createWebsocketLeaderStatus({
leaderStatus: false,
}),
)
// Avoid running into penalties when trying to authenticate with an invalid session
// NotAuthenticatedException 401, AccessDeactivatedException 470, AccessBlocked 472
// do not catch session expired here because websocket will be reused when we authenticate again
const serverCode = event.code - 4000
2021-12-23 14:03:23 +01:00
if ([NotAuthorizedError.CODE, AccessDeactivatedError.CODE, AccessBlockedError.CODE].includes(serverCode)) {
2017-12-20 16:48:36 +01:00
this._terminate()
2021-12-23 14:03:23 +01:00
this._worker.sendError(handleRestError(serverCode, "web socket error"))
} else if (serverCode === SessionExpiredError.CODE) {
// session is expired. do not try to reconnect until the user creates a new session
this._state = EventBusState.Suspended
2021-12-23 14:03:23 +01:00
this._worker.updateWebSocketState("connecting")
} else if (this._state === EventBusState.Automatic && this._login.isLoggedIn()) {
this._worker.updateWebSocketState("connecting")
if (this._immediateReconnect) {
2017-08-15 13:54:22 +02:00
this._immediateReconnect = false
2021-12-23 14:03:23 +01:00
this.tryReconnect(false, false)
} else {
let reconnectionInterval: [number, number]
if (serverCode === NORMAL_SHUTDOWN_CLOSE_CODE) {
reconnectionInterval = LARGE_RECONNECT_INTERVAL
} else if (this._failedConnectionAttempts === 1) {
reconnectionInterval = SMALL_RECONNECT_INTERVAL
} else if (this._failedConnectionAttempts === 2) {
reconnectionInterval = MEDIUM_RECONNECT_INTERVAL
} else {
reconnectionInterval = LARGE_RECONNECT_INTERVAL
}
2021-12-23 14:03:23 +01:00
this.tryReconnect(false, false, SECOND_MS * randomIntFromInterval(reconnectionInterval[0], reconnectionInterval[1]))
2017-08-15 13:54:22 +02:00
}
}
}
2021-12-23 14:03:23 +01:00
tryReconnect(closeIfOpen: boolean, enableAutomaticState: boolean, delay: number | null = null) {
console.log("tryReconnect, closeIfOpen", closeIfOpen, "enableAutomaticState", enableAutomaticState, "delay", delay)
2021-12-23 14:03:23 +01:00
if (this._reconnectTimer) {
// prevent reconnect race-condition
clearTimeout(this._reconnectTimer)
this._reconnectTimer = null
}
if (!delay) {
this._reconnect(closeIfOpen, enableAutomaticState)
} else {
2021-12-23 14:03:23 +01:00
this._reconnectTimer = setTimeout(() => this._reconnect(closeIfOpen, enableAutomaticState), delay)
2017-08-15 13:54:22 +02:00
}
}
/**
* Tries to reconnect the websocket if it is not connected.
*/
_reconnect(closeIfOpen: boolean, enableAutomaticState: boolean) {
2021-12-23 14:03:23 +01:00
console.log(
new Date().toISOString(),
"ws _reconnect socket state (CONNECTING=0, OPEN=1, CLOSING=2, CLOSED=3): " + (this._socket ? this._socket.readyState : "null"),
"state:",
this._state,
"closeIfOpen",
closeIfOpen,
"enableAutomaticState",
enableAutomaticState,
)
if (this._state !== EventBusState.Terminated && enableAutomaticState) {
this._state = EventBusState.Automatic
}
2021-12-23 14:03:23 +01:00
2018-07-26 14:25:29 +02:00
if (closeIfOpen && this._socket && this._socket.readyState === WebSocket.OPEN) {
2017-08-15 13:54:22 +02:00
this._immediateReconnect = true
2021-12-23 14:03:23 +01:00
neverNull(this._socket).close()
} else if (
2021-12-23 14:03:23 +01:00
(this._socket == null || this._socket.readyState === WebSocket.CLOSED || this._socket.readyState === WebSocket.CLOSING) &&
this._state !== EventBusState.Terminated &&
this._login.isLoggedIn()
) {
// Don't try to connect right away because connection may not be actually there
// see #1165
if (this._connectTimer) {
clearTimeout(this._connectTimer)
}
2021-12-23 14:03:23 +01:00
this._connectTimer = setTimeout(() => this.connect(true), 100)
2017-08-15 13:54:22 +02:00
}
}
/**
* stores the latest event batch ids for each of the users groups or min id if there is no event batch yet.
* this is needed to know from where to start loading missed events after a reconnect
*/
_setLatestEntityEventIds(): Promise<void> {
// set all last event ids in one step to avoid that we have just set them for a few groups when a ServiceUnavailableError occurs
2020-12-11 14:45:42 +01:00
const lastIds: Map<Id, Array<Id>> = new Map()
2021-06-22 15:24:10 +02:00
return promiseMap(this._eventGroups(), groupId => {
return this._entity.loadRange(EntityEventBatchTypeRef, groupId, GENERATED_MAX_ID, 1, true).then(batches => {
2021-12-23 14:03:23 +01:00
lastIds.set(groupId, [batches.length === 1 ? getLetId(batches[0])[1] : GENERATED_MIN_ID])
2017-08-15 13:54:22 +02:00
})
}).then(() => {
this._lastEntityEventIds = lastIds
2020-09-15 16:37:42 +02:00
this._lastUpdateTime = Date.now()
2021-12-23 14:03:23 +01:00
2020-12-07 17:02:35 +01:00
this._eventQueue.resume()
2017-08-15 13:54:22 +02:00
})
}
_loadMissedEntityEvents(): Promise<void> {
if (this._login.isLoggedIn()) {
2020-09-15 16:37:42 +02:00
if (Date.now() > this._lastUpdateTime + ENTITY_EVENT_BATCH_EXPIRE_MS) {
2020-09-15 11:35:25 +02:00
// we did not check for updates for too long, so some missed EntityEventBatches can not be loaded any more
return this._worker.sendError(new OutOfSyncError("some missed EntityEventBatches cannot be loaded any more"))
2020-09-15 11:35:25 +02:00
} else {
2021-12-23 14:03:23 +01:00
return promiseMap(this._eventGroups(), groupId => {
return this._entity
2021-12-23 14:03:23 +01:00
.loadAll(EntityEventBatchTypeRef, groupId, this._getLastEventBatchIdOrMinIdForGroup(groupId))
.then(eventBatches => {
if (eventBatches.length === 0) {
// There won't be a callback from the queue to process the event so we mark this group as
// completed right away
this._progressMonitor.workDone(1)
} else {
for (const batch of eventBatches) {
this._addBatch(getElementId(batch), groupId, batch.events)
}
}
})
.catch(
ofClass(NotAuthorizedError, () => {
console.log("could not download entity updates => lost permission")
// We need to do this to mark group as "processed", otherwise progress bar will get stuck
this._progressMonitor.workDone(1)
}),
)
2020-09-15 11:35:25 +02:00
}).then(() => {
2020-09-15 16:37:42 +02:00
this._lastUpdateTime = Date.now()
2021-12-23 14:03:23 +01:00
2020-12-07 17:02:35 +01:00
this._eventQueue.resume()
2020-09-15 11:35:25 +02:00
})
}
2017-08-15 13:54:22 +02:00
} else {
return Promise.resolve()
}
}
2021-12-23 14:03:23 +01:00
_addBatch(batchId: Id, groupId: Id, events: ReadonlyArray<EntityUpdate>) {
2020-12-11 14:45:42 +01:00
const lastForGroup = this._lastEntityEventIds.get(groupId) || []
2020-12-10 17:18:19 +01:00
// find the position for inserting into last entity events (negative value is considered as not present in the array)
const index = binarySearch(lastForGroup, batchId, compareOldestFirst)
2020-12-11 14:45:42 +01:00
let wasAdded
2021-12-23 14:03:23 +01:00
if (index < 0) {
lastForGroup.splice(-index, 0, batchId)
2020-12-10 17:18:19 +01:00
// only add the batch if it was not process before
2020-12-11 14:45:42 +01:00
wasAdded = this._eventQueue.add(batchId, groupId, events)
} else {
wasAdded = false
}
2021-12-23 14:03:23 +01:00
if (lastForGroup.length > this._MAX_EVENT_IDS_QUEUE_LENGTH) {
lastForGroup.shift()
2017-08-15 13:54:22 +02:00
}
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
this._lastEntityEventIds.set(batchId, lastForGroup)
if (wasAdded) {
this._lastAddedBatchForGroup.set(groupId, batchId)
}
}
_processEventBatch(batch: QueuedBatch): Promise<void> {
return this._executeIfNotTerminated(() => {
2021-12-23 14:03:23 +01:00
return this._cache
.entityEventsReceived(batch.events)
.then(filteredEvents => {
return this._executeIfNotTerminated(() => this._login.entityEventsReceived(filteredEvents))
.then(() => this._executeIfNotTerminated(() => this._mail.entityEventsReceived(filteredEvents)))
.then(() => this._executeIfNotTerminated(() => this._worker.entityEventsReceived(filteredEvents, batch.groupId)))
.then(() => filteredEvents)
})
.then(filteredEvents => {
// Call the indexer in this last step because now the processed event is stored and the indexer has a separate event queue that
// shall not receive the event twice.
if (!isTest() && !isAdminClient()) {
this._executeIfNotTerminated(() => {
this._indexer.addBatchesToQueue([
{
groupId: batch.groupId,
batchId: batch.batchId,
events: filteredEvents,
},
])
this._indexer.startProcessing()
})
}
})
}).catch(
ofClass(ServiceUnavailableError, e => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
console.log("retry processing event in 30s", e)
let promise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this._serviceUnavailableRetry === promise) {
return this._processEventBatch(batch)
} else {
throw new CancelledError("stop retry processing after service unavailable due to reconnect")
}
})
this._serviceUnavailableRetry = promise
return promise
}),
)
2017-08-15 13:54:22 +02:00
}
_getLastEventBatchIdOrMinIdForGroup(groupId: Id): Id {
2020-12-11 14:45:42 +01:00
const lastIds = this._lastEntityEventIds.get(groupId)
2021-12-23 14:03:23 +01:00
return lastIds && lastIds.length > 0 ? lastThrow(lastIds) : GENERATED_MIN_ID
}
2021-12-23 14:03:23 +01:00
_executeIfNotTerminated(call: (...args: Array<any>) => any): Promise<void> {
if (this._state !== EventBusState.Terminated) {
2017-08-15 13:54:22 +02:00
return call()
} else {
return Promise.resolve()
}
}
_eventGroups(): Id[] {
2021-12-23 14:03:23 +01:00
return this._login
.getLoggedInUser()
.memberships.filter(membership => membership.groupType !== GroupType.MailingList)
.map(membership => membership.group)
.concat(this._login.getLoggedInUser().userGroup.group)
}
}