tutanota/src/api/worker/EventBusClient.ts

652 lines
24 KiB
TypeScript
Raw Normal View History

import {LoginFacadeImpl} from "./facades/LoginFacade"
import type {MailFacade} from "./facades/MailFacade"
import type {WorkerImpl} from "./WorkerImpl"
2021-02-03 17:13:38 +01:00
import {assertWorkerOrNode, getWebsocketOrigin, isAdminClient, isTest, Mode} from "../common/Env"
2017-08-15 13:54:22 +02:00
import {_TypeModel as MailTypeModel} from "../entities/tutanota/Mail"
import {
AccessBlockedError,
AccessDeactivatedError,
ConnectionError,
handleRestError,
NotAuthorizedError,
ServiceUnavailableError,
2021-12-23 14:03:23 +01:00
SessionExpiredError,
} from "../common/error/RestError"
2017-08-15 13:54:22 +02:00
import {EntityEventBatchTypeRef} from "../entities/sys/EntityEventBatch"
import {assertNotNull, binarySearch, delay, downcast, identity, lastThrow, neverNull, ofClass, randomIntFromInterval} from "@tutao/tutanota-utils"
2017-08-15 13:54:22 +02:00
import {OutOfSyncError} from "../common/error/OutOfSyncError"
import type {Indexer} from "./search/Indexer"
import {CloseEventBusOption, GroupType, SECOND_MS} from "../common/TutanotaConstants"
import type {WebsocketEntityData} from "../entities/sys/WebsocketEntityData"
2019-01-17 15:42:16 +01:00
import {_TypeModel as WebsocketEntityDataTypeModel} from "../entities/sys/WebsocketEntityData"
import {CancelledError} from "../common/error/CancelledError"
import {_TypeModel as PhishingMarkerWebsocketDataTypeModel, PhishingMarkerWebsocketData} from "../entities/tutanota/PhishingMarkerWebsocketData"
import {_TypeModel as WebsocketCounterDataTypeModel, WebsocketCounterData} from "../entities/sys/WebsocketCounterData"
import type {EntityUpdate} from "../entities/sys/EntityUpdate"
import {EntityClient} from "../common/EntityClient"
import type {QueuedBatch} from "./search/EventQueue"
import {EventQueue} from "./search/EventQueue"
import {_TypeModel as WebsocketLeaderStatusTypeModel, createWebsocketLeaderStatus, WebsocketLeaderStatus} from "../entities/sys/WebsocketLeaderStatus"
2020-11-17 16:29:19 +01:00
import {ProgressMonitorDelegate} from "./ProgressMonitorDelegate"
import type {IProgressMonitor} from "../common/utils/ProgressMonitor"
import {NoopProgressMonitor} from "../common/utils/ProgressMonitor"
import {compareOldestFirst, firstBiggerThanSecond, GENERATED_MAX_ID, GENERATED_MIN_ID, getElementId, getLetId, isSameId} from "../common/utils/EntityUtils"
2021-12-15 16:07:07 +01:00
import {InstanceMapper} from "./crypto/InstanceMapper"
2021-12-28 13:53:11 +01:00
import {WsConnectionState} from "../main/WorkerClient";
import {IEntityRestCache, isUsingOfflineCache} from "./rest/EntityRestCache"
2017-08-15 13:54:22 +02:00
assertWorkerOrNode()
2021-12-23 14:03:23 +01:00
export const enum EventBusState {
Automatic = "automatic",
2021-12-23 14:03:23 +01:00
// automatic reconnection is enabled
Suspended = "suspended",
// automatic reconnection is suspended but can be enabled again
Terminated = "terminated", // automatic reconnection is disabled and websocket is closed but can be opened again by calling connect explicit
}
2020-09-15 11:35:25 +02:00
// EntityEventBatches expire after 45 days. keep a time diff security of one day.
export const ENTITY_EVENT_BATCH_EXPIRE_MS = 44 * 24 * 60 * 60 * 1000
const RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS = 30000
const NORMAL_SHUTDOWN_CLOSE_CODE = 1
2022-02-28 15:19:23 +01:00
/**
* Reconnection interval bounds. When we reconnect we pick a random number of seconds in a range. The range depends on the number of attempts and server
* return.
* */
const RECONNECT_INTERVAL = Object.freeze({
SMALL: [5, 10],
MEDIUM: [20, 40],
LARGE: [60, 120],
} as const)
// we store the last 1000 event ids per group, so we know if an event was already processed.
// it is not sufficient to check the last event id because a smaller event id may arrive later
// than a bigger one if the requests are processed in parallel on the server
const MAX_EVENT_IDS_QUEUE_LENGTH = 1000
2017-08-15 13:54:22 +02:00
export class EventBusClient {
// needed for test
_state: EventBusState
private socket: WebSocket | null
private immediateReconnect: boolean = false // if true tries to reconnect immediately after the websocket is closed
2021-12-23 14:03:23 +01:00
/**
* Map from group id to last event ids (max. _MAX_EVENT_IDS_QUEUE_LENGTH). We keep them to avoid processing the same event twice if
* it comes out of order from the server) and for requesting missed entity events on reconnect.
*
* We do not have to update these event ids if the groups of the user change because we always take the current users groups from the
* LoginFacade.
*/
private lastEntityEventIds: Map<Id, Array<Id>>
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
/**
* Last batch which was actually added to the queue. We need it to find out when the group is processed
*/
private lastAddedBatchForGroup: Map<Id, Id>
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
/**
* The last time we received an EntityEventBatch or checked for updates. we use this to find out if our data has expired.
* see ENTITY_EVENT_BATCH_EXPIRE_MS
*
* nullable because we cannot load it during construction, since it comes from the native part
2020-12-11 14:45:42 +01:00
*/
private lastUpdateTime: number | null = null
private lastAntiphishingMarkersId: Id | null = null
2017-08-15 13:54:22 +02:00
/** queue to process all events. */
private readonly eventQueue: EventQueue
2017-08-15 13:54:22 +02:00
/** queue that handles incoming websocket messages while. */
private readonly entityUpdateMessageQueue: EventQueue
private reconnectTimer: TimeoutID | null
private connectTimer: TimeoutID | null
/**
* Represents a currently retried executing due to a ServiceUnavailableError
*/
private serviceUnavailableRetry: Promise<void> | null = null
private failedConnectionAttempts: number = 0
private progressMonitor: IProgressMonitor
2021-12-23 14:03:23 +01:00
constructor(
2022-02-28 15:19:23 +01:00
private readonly worker: WorkerImpl,
private readonly indexer: Indexer,
private readonly cache: IEntityRestCache,
private readonly mail: MailFacade,
private readonly login: LoginFacadeImpl,
private readonly entity: EntityClient,
private readonly instanceMapper: InstanceMapper,
2021-12-23 14:03:23 +01:00
) {
this._state = EventBusState.Automatic
this.lastEntityEventIds = new Map()
this.lastAddedBatchForGroup = new Map()
this.socket = null
this.reconnectTimer = null
this.connectTimer = null
this.progressMonitor = new NoopProgressMonitor()
this.eventQueue = new EventQueue(true, modification => this.eventQueueCallback(modification))
this.entityUpdateMessageQueue = new EventQueue(false, (batch) => this.entityUpdateMessageQueueCallback(batch))
this.reset()
2020-12-09 16:07:56 +01:00
}
private reset() {
this.immediateReconnect = false
2021-12-23 14:03:23 +01:00
this.lastEntityEventIds.clear()
2021-12-23 14:03:23 +01:00
this.lastAddedBatchForGroup.clear()
2021-12-23 14:03:23 +01:00
this.lastUpdateTime = null
2021-12-23 14:03:23 +01:00
this.eventQueue.pause()
2021-12-23 14:03:23 +01:00
this.eventQueue.clear()
2020-12-09 16:07:56 +01:00
this.serviceUnavailableRetry = null
2017-12-20 16:48:36 +01:00
}
2017-08-15 13:54:22 +02:00
/**
* Opens a WebSocket connection to receive server events.
* @param reconnect Set to true if the connection has been opened before.
* @returns The event bus client object.
*/
connect(reconnect: boolean) {
if (env.mode === Mode.Test) {
return
}
2018-10-19 18:13:58 +02:00
2021-12-23 14:03:23 +01:00
console.log(new Date().toISOString(), "ws connect reconnect=", reconnect, "state:", this._state)
// make sure a retry will be cancelled by setting _serviceUnavailableRetry to null
this.serviceUnavailableRetry = null
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
2020-12-07 17:02:35 +01:00
// Task for updating events are number of groups + 2. Use 2 as base for reconnect state.
if (this.progressMonitor) {
// Say that the old monitor is completed so that we don't calculate its amount as still to do.
this.progressMonitor.completed()
}
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
this.progressMonitor = reconnect ? new ProgressMonitorDelegate(this.eventGroups().length + 2, this.worker) : new NoopProgressMonitor()
2021-12-23 14:03:23 +01:00
this.progressMonitor.workDone(1)
2021-12-23 14:03:23 +01:00
this._state = EventBusState.Automatic
this.connectTimer = null
2018-07-31 17:07:41 +02:00
const authHeaders = this.login.createAuthHeaders()
2021-12-23 14:03:23 +01:00
2019-01-17 15:42:16 +01:00
// Native query building is not supported in old browser, mithril is not available in the worker
const authQuery =
"modelVersions=" +
WebsocketEntityDataTypeModel.version +
"." +
MailTypeModel.version +
"&clientVersion=" +
env.versionNumber +
"&userId=" +
this.login.getLoggedInUser()._id +
"&accessToken=" +
authHeaders.accessToken +
(this.lastAntiphishingMarkersId ? "&lastPhishingMarkersId=" + this.lastAntiphishingMarkersId : "")
2021-12-23 14:03:23 +01:00
let url = getWebsocketOrigin() + "/event?" + authQuery
this.unsubscribeFromOldWebsocket()
2021-12-23 14:03:23 +01:00
this.socket = new WebSocket(url)
2021-12-23 14:03:23 +01:00
this.socket.onopen = () => this._onOpen(reconnect)
2021-12-23 14:03:23 +01:00
this.socket.onclose = (event: CloseEvent) => this._close(event)
2021-12-23 14:03:23 +01:00
this.socket.onerror = (error: any) => this.error(error)
2021-12-23 14:03:23 +01:00
this.socket.onmessage = (message: MessageEvent) => this._message(message)
2017-08-15 13:54:22 +02:00
}
// Returning promise for tests
_onOpen(reconnect: boolean): Promise<void> {
this.failedConnectionAttempts = 0
2021-12-23 14:03:23 +01:00
console.log("ws open: ", new Date(), "state:", this._state)
2020-12-11 14:45:42 +01:00
// Indicate some progress right away
this.progressMonitor.workDone(1)
2021-12-23 14:03:23 +01:00
const p = this.initEntityEvents(reconnect)
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connected)
2021-12-23 14:03:23 +01:00
return p
2020-12-11 14:45:42 +01:00
}
private async initEntityEvents(reconnect: boolean): Promise<void> {
2020-12-10 17:18:19 +01:00
// pause processing entity update message while initializing event queue
this.entityUpdateMessageQueue.pause()
2021-12-23 14:03:23 +01:00
2020-12-10 17:18:19 +01:00
// pause event queue and add all missed entity events first
this.eventQueue.pause()
// if it's the first connection on desktop, we want to fetch events since the last login
2021-12-23 14:03:23 +01:00
let existingConnection = reconnect && this.lastEntityEventIds.size > 0
let p: Promise<void>
if (existingConnection) {
p = this.loadMissedEntityEvents()
} else {
// If we have offline cache then we need to both set last ids from persistence and load batches since then
if (isUsingOfflineCache()) {
p = this.setLatestEntityEventIds()
.then(() => this.loadMissedEntityEvents())
} else {
p = this.setLatestEntityEventIds()
}
}
2021-12-23 14:03:23 +01:00
return p
.then(() => {
this.entityUpdateMessageQueue.resume()
2021-12-23 14:03:23 +01:00
this.eventQueue.resume()
})
.catch(ofClass(ConnectionError, e => {
console.log("not connected in connect(), close websocket", e)
this.close(CloseEventBusOption.Reconnect)
}))
2022-02-28 15:19:23 +01:00
.catch(ofClass(CancelledError, () => {
// the processing was aborted due to a reconnect. do not reset any attributes because they might already be in use since reconnection
console.log("cancelled retry process entity events after reconnect")
}))
.catch(ofClass(ServiceUnavailableError, async e => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
// some EventBatches/missed events are processed already now
// for an existing connection we just keep the current state and continue loading missed events for the other groups
// for a new connection we reset the last entity event ids because otherwise this would not be completed in the next try
if (!existingConnection) {
this.lastEntityEventIds.clear()
2021-12-23 14:03:23 +01:00
this.lastUpdateTime = await this.cache.getLastUpdateTime() ?? 0
}
console.log("retry init entity events in 30s", e)
let promise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this.serviceUnavailableRetry === promise) {
console.log("retry initializing entity events")
return this.initEntityEvents(reconnect)
} else {
console.log("cancel initializing entity events")
}
})
this.serviceUnavailableRetry = promise
return promise
}))
.catch(e => {
this.entityUpdateMessageQueue.resume()
this.eventQueue.resume()
2022-02-28 15:19:23 +01:00
this.worker.sendError(e)
})
2021-12-23 14:03:23 +01:00
}
2017-08-15 13:54:22 +02:00
/**
2017-09-28 11:32:02 +02:00
* Sends a close event to the server and finally closes the connection.
2017-12-20 16:48:36 +01:00
* The state of this event bus client is reset and the client is terminated (does not automatically reconnect) except reconnect == true
2017-08-15 13:54:22 +02:00
*/
2021-12-23 14:03:23 +01:00
close(closeOption: CloseEventBusOption) {
console.log(new Date().toISOString(), "ws close closeOption: ", closeOption, "state:", this._state)
switch (closeOption) {
case CloseEventBusOption.Terminate:
this.terminate()
2021-12-23 14:03:23 +01:00
break
2021-12-23 14:03:23 +01:00
case CloseEventBusOption.Pause:
this._state = EventBusState.Suspended
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
2021-12-23 14:03:23 +01:00
break
2021-12-23 14:03:23 +01:00
case CloseEventBusOption.Reconnect:
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
2021-12-23 14:03:23 +01:00
break
}
this.socket?.close()
2017-08-15 13:54:22 +02:00
}
private async eventQueueCallback(modification: QueuedBatch): Promise<void> {
try {
await this.processEventBatch(modification)
} catch (e) {
console.log("Error while processing event batches", e)
2022-02-28 15:19:23 +01:00
this.worker.sendError(e)
throw e
}
// If we completed the event, it was added before
const lastForGroup = assertNotNull(this.lastAddedBatchForGroup.get(modification.groupId))
if (isSameId(modification.batchId, lastForGroup) || firstBiggerThanSecond(modification.batchId, lastForGroup)) {
this.progressMonitor && this.progressMonitor.workDone(1)
}
}
private async entityUpdateMessageQueueCallback(batch: QueuedBatch): Promise<void> {
this.addBatch(batch.batchId, batch.groupId, batch.events)
this.eventQueue.resume()
}
private unsubscribeFromOldWebsocket() {
if (this.socket) {
// Remove listeners. We don't want old socket to mess our state
this.socket.onopen = this.socket.onclose = this.socket.onerror = this.socket.onmessage = identity
}
}
private async terminate(): Promise<void> {
this._state = EventBusState.Terminated
2021-12-23 14:03:23 +01:00
this.reset()
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.terminated)
2017-12-20 16:48:36 +01:00
}
private error(error: any) {
2021-12-23 14:03:23 +01:00
console.log(new Date().toISOString(), "ws error: ", error, JSON.stringify(error), "state:", this._state)
2017-08-15 13:54:22 +02:00
}
async _message(message: MessageEvent): Promise<void> {
//console.log("ws message: ", message.data);
2019-01-17 15:42:16 +01:00
const [type, value] = downcast(message.data).split(";")
2021-12-23 14:03:23 +01:00
2019-01-17 15:42:16 +01:00
if (type === "entityUpdate") {
// specify type of decrypted entity explicitly because decryptAndMapToInstance effectively returns `any`
return this.instanceMapper.decryptAndMapToInstance(WebsocketEntityDataTypeModel, JSON.parse(value), null).then((data: WebsocketEntityData) => {
this.entityUpdateMessageQueue.add(data.eventBatchId, data.eventBatchOwner, data.eventBatch)
})
2019-01-21 10:48:07 +01:00
} else if (type === "unreadCounterUpdate") {
const counterData: WebsocketCounterData = await this.instanceMapper.decryptAndMapToInstance(WebsocketCounterDataTypeModel, JSON.parse(value), null)
2022-02-28 15:19:23 +01:00
this.worker.updateCounter(counterData)
} else if (type === "phishingMarkers") {
return this.instanceMapper.decryptAndMapToInstance<PhishingMarkerWebsocketData>(PhishingMarkerWebsocketDataTypeModel, JSON.parse(value), null).then(data => {
this.lastAntiphishingMarkersId = data.lastId
2020-10-13 17:15:52 +02:00
this.mail.phishingMarkersUpdateReceived(data.markers)
2021-12-23 14:03:23 +01:00
})
} else if (type === "leaderStatus") {
return this.instanceMapper.decryptAndMapToInstance<WebsocketLeaderStatus>(WebsocketLeaderStatusTypeModel, JSON.parse(value), null).then(status => {
return this.login.setLeaderStatus(status)
2021-12-23 14:03:23 +01:00
})
} else {
console.log("ws message with unknown type", type)
2019-01-17 15:42:16 +01:00
}
2021-12-23 14:03:23 +01:00
2019-01-17 15:42:16 +01:00
return Promise.resolve()
2017-08-15 13:54:22 +02:00
}
private _close(event: CloseEvent) {
this.failedConnectionAttempts++
2021-12-23 14:03:23 +01:00
console.log(new Date().toISOString(), "ws _close: ", event, "state:", this._state)
this.login.setLeaderStatus(
createWebsocketLeaderStatus({
leaderStatus: false,
}),
2021-12-23 14:03:23 +01:00
)
// Avoid running into penalties when trying to authenticate with an invalid session
// NotAuthenticatedException 401, AccessDeactivatedException 470, AccessBlocked 472
// do not catch session expired here because websocket will be reused when we authenticate again
const serverCode = event.code - 4000
2021-12-23 14:03:23 +01:00
if ([NotAuthorizedError.CODE, AccessDeactivatedError.CODE, AccessBlockedError.CODE].includes(serverCode)) {
this.terminate()
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
this.worker.sendError(handleRestError(serverCode, "web socket error", null, null))
} else if (serverCode === SessionExpiredError.CODE) {
// session is expired. do not try to reconnect until the user creates a new session
this._state = EventBusState.Suspended
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
} else if (this._state === EventBusState.Automatic && this.login.isLoggedIn()) {
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
if (this.immediateReconnect) {
this.immediateReconnect = false
2021-12-23 14:03:23 +01:00
this.tryReconnect(false, false)
} else {
2021-12-28 13:53:11 +01:00
let reconnectionInterval: readonly [number, number]
if (serverCode === NORMAL_SHUTDOWN_CLOSE_CODE) {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.LARGE
} else if (this.failedConnectionAttempts === 1) {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.SMALL
} else if (this.failedConnectionAttempts === 2) {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.MEDIUM
} else {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.LARGE
}
2021-12-23 14:03:23 +01:00
this.tryReconnect(false, false, SECOND_MS * randomIntFromInterval(reconnectionInterval[0], reconnectionInterval[1]))
2017-08-15 13:54:22 +02:00
}
}
}
2021-12-23 14:03:23 +01:00
tryReconnect(closeIfOpen: boolean, enableAutomaticState: boolean, delay: number | null = null) {
console.log("tryReconnect, closeIfOpen", closeIfOpen, "enableAutomaticState", enableAutomaticState, "delay", delay)
2021-12-23 14:03:23 +01:00
if (this.reconnectTimer) {
// prevent reconnect race-condition
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
if (!delay) {
this.reconnect(closeIfOpen, enableAutomaticState)
} else {
this.reconnectTimer = setTimeout(() => this.reconnect(closeIfOpen, enableAutomaticState), delay)
2017-08-15 13:54:22 +02:00
}
}
/**
* Tries to reconnect the websocket if it is not connected.
*/
private reconnect(closeIfOpen: boolean, enableAutomaticState: boolean) {
2021-12-23 14:03:23 +01:00
console.log(
new Date().toISOString(),
"ws _reconnect socket state (CONNECTING=0, OPEN=1, CLOSING=2, CLOSED=3): " + (this.socket ? this.socket.readyState : "null"),
"state:",
this._state,
"closeIfOpen",
closeIfOpen,
"enableAutomaticState",
enableAutomaticState,
2021-12-23 14:03:23 +01:00
)
if (this._state !== EventBusState.Terminated && enableAutomaticState) {
this._state = EventBusState.Automatic
}
2021-12-23 14:03:23 +01:00
if (closeIfOpen && this.socket && this.socket.readyState === WebSocket.OPEN) {
this.immediateReconnect = true
neverNull(this.socket).close()
} else if (
(this.socket == null || this.socket.readyState === WebSocket.CLOSED || this.socket.readyState === WebSocket.CLOSING) &&
this._state !== EventBusState.Terminated &&
this.login.isLoggedIn()
2021-12-23 14:03:23 +01:00
) {
// Don't try to connect right away because connection may not be actually there
// see #1165
if (this.connectTimer) {
clearTimeout(this.connectTimer)
}
2021-12-23 14:03:23 +01:00
this.connectTimer = setTimeout(() => this.connect(true), 100)
2017-08-15 13:54:22 +02:00
}
}
/**
* stores the latest event batch ids for each of the users groups or min id if there is no event batch yet.
* this is needed to know from where to start loading missed events after a reconnect
*/
private async setLatestEntityEventIds(): Promise<void> {
// set all last event ids in one step to avoid that we have just set them for a few groups when a ServiceUnavailableError occurs
2020-12-11 14:45:42 +01:00
const lastIds: Map<Id, Array<Id>> = new Map()
for (const groupId of this.eventGroups()) {
const cachedBatchId = await this.cache.getLastEntityEventBatchForGroup(groupId)
if (cachedBatchId != null) {
lastIds.set(groupId, [cachedBatchId])
} else {
const batches = await this.entity.loadRange(EntityEventBatchTypeRef, groupId, GENERATED_MAX_ID, 1, true)
2021-12-23 14:03:23 +01:00
lastIds.set(groupId, [batches.length === 1 ? getLetId(batches[0])[1] : GENERATED_MIN_ID])
}
}
2021-12-23 14:03:23 +01:00
this.lastEntityEventIds = lastIds
this.lastUpdateTime = this.cache.getServerTimestampMs()
await this.cache.setLastUpdateTime(this.lastUpdateTime)
2017-08-15 13:54:22 +02:00
}
//visible for testing
async loadMissedEntityEvents(): Promise<void> {
if (this.login.isLoggedIn()) {
const now = this.cache.getServerTimestampMs()
if (this.lastUpdateTime == null) {
this.lastUpdateTime = await this.cache.getLastUpdateTime() ?? 0
}
if (now > this.lastUpdateTime + ENTITY_EVENT_BATCH_EXPIRE_MS) {
console.log("cache is out of sync, purging...")
// Allow the progress bar to complete
this.progressMonitor.workDone(this.eventGroups().length)
2020-09-15 11:35:25 +02:00
// we did not check for updates for too long, so some missed EntityEventBatches can not be loaded any more
// purge cache if out of sync
await this.cache.purgeStorage()
//If in memory cached is used user has to log out and in again to clean the cache so we return an error. We might also purge the in memory cache.
if (!isUsingOfflineCache()) {
2022-02-28 15:19:23 +01:00
await this.worker.sendError(new OutOfSyncError("some missed EntityEventBatches cannot be loaded any more"))
}
2020-09-15 11:35:25 +02:00
} else {
for (let groupId of this.eventGroups()) {
let eventBatches
try {
eventBatches = await this.entity.loadAll(EntityEventBatchTypeRef, groupId, this.getLastEventBatchIdOrMinIdForGroup(groupId))
} catch (e) {
if (e instanceof NotAuthorizedError) {
console.log("could not download entity updates => lost permission")
// We need to do this to mark group as "processed", otherwise progress bar will get stuck
this.progressMonitor.workDone(1)
continue
} else {
throw e
}
}
if (eventBatches.length === 0) {
// There won't be a callback from the queue to process the event so we mark this group as
// completed right away
this.progressMonitor.workDone(1)
} else {
for (const batch of eventBatches) {
this.addBatch(getElementId(batch), groupId, batch.events)
}
}
}
this.lastUpdateTime = this.cache.getServerTimestampMs()
await this.cache.setLastUpdateTime(this.lastUpdateTime)
2020-09-15 11:35:25 +02:00
}
2017-08-15 13:54:22 +02:00
}
}
private addBatch(batchId: Id, groupId: Id, events: ReadonlyArray<EntityUpdate>) {
const lastForGroup = this.lastEntityEventIds.get(groupId) || []
2020-12-10 17:18:19 +01:00
// find the position for inserting into last entity events (negative value is considered as not present in the array)
const index = binarySearch(lastForGroup, batchId, compareOldestFirst)
2020-12-11 14:45:42 +01:00
let wasAdded
2021-12-23 14:03:23 +01:00
if (index < 0) {
lastForGroup.splice(-index, 0, batchId)
2020-12-10 17:18:19 +01:00
// only add the batch if it was not process before
wasAdded = this.eventQueue.add(batchId, groupId, events)
2020-12-11 14:45:42 +01:00
} else {
wasAdded = false
}
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
if (lastForGroup.length > MAX_EVENT_IDS_QUEUE_LENGTH) {
lastForGroup.shift()
2017-08-15 13:54:22 +02:00
}
2021-12-23 14:03:23 +01:00
this.lastEntityEventIds.set(batchId, lastForGroup)
2020-12-11 14:45:42 +01:00
if (wasAdded) {
this.lastAddedBatchForGroup.set(groupId, batchId)
2020-12-11 14:45:42 +01:00
}
}
private processEventBatch(batch: QueuedBatch): Promise<void> {
return this.executeIfNotTerminated(async () => {
const filteredEvents = await this.cache.entityEventsReceived(batch)
await this.executeIfNotTerminated(() => this.login.entityEventsReceived(filteredEvents))
await this.executeIfNotTerminated(() => this.mail.entityEventsReceived(filteredEvents))
2022-02-28 15:19:23 +01:00
await this.executeIfNotTerminated(() => this.worker.entityEventsReceived(filteredEvents, batch.groupId))
// Call the indexer in this last step because now the processed event is stored and the indexer has a separate event queue that
// shall not receive the event twice.
if (!isTest() && !isAdminClient()) {
this.executeIfNotTerminated(() => {
this.indexer.addBatchesToQueue([
{
groupId: batch.groupId,
batchId: batch.batchId,
events: filteredEvents,
},
])
this.indexer.startProcessing()
})
}
}).catch(ofClass(ServiceUnavailableError, e => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
console.log("retry processing event in 30s", e)
let promise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this.serviceUnavailableRetry === promise) {
return this.processEventBatch(batch)
} else {
throw new CancelledError("stop retry processing after service unavailable due to reconnect")
}
})
this.serviceUnavailableRetry = promise
return promise
}))
2017-08-15 13:54:22 +02:00
}
private getLastEventBatchIdOrMinIdForGroup(groupId: Id): Id {
const lastIds = this.lastEntityEventIds.get(groupId)
2021-12-23 14:03:23 +01:00
return lastIds && lastIds.length > 0 ? lastThrow(lastIds) : GENERATED_MIN_ID
}
private executeIfNotTerminated(call: (...args: Array<any>) => any): Promise<void> {
if (this._state !== EventBusState.Terminated) {
2017-08-15 13:54:22 +02:00
return call()
} else {
return Promise.resolve()
}
}
private eventGroups(): Id[] {
return this.login
.getLoggedInUser()
.memberships.filter(membership => membership.groupType !== GroupType.MailingList)
.map(membership => membership.group)
.concat(this.login.getLoggedInUser().userGroup.group)
}
}