tutanota/src/common/api/worker/EventBusClient.ts
jhm 279744b590
read full instance from UPDATE entityUpdate if available
Usually, entityUpdates with operation UPDATE only have patches
on the entityUpdate instead of the full instance, like e.g. on
entityUpdates with operation CREATE. However, in certain
scenarios we are still / again sending full instances on entityUpdates
with operation UPDATE, and should therefore also correctly process them.
This commit change the processing of UPDATE entityUpdates to be:
-> use entityUpdate.patches
--> if not available use entityUpdate.instance
---> if not available re-load instance from server

This commit fixes issues where instance were re-loaded from the server
even though the instances was available on the entityUpdate.
2025-11-07 12:06:39 +01:00

771 lines
30 KiB
TypeScript

import { assertWorkerOrNode } from "../common/Env"
import {
AccessBlockedError,
AccessDeactivatedError,
ConnectionError,
handleRestError,
NotAuthorizedError,
ServiceUnavailableError,
SessionExpiredError,
} from "../common/error/RestError"
import {
createWebsocketLeaderStatus,
EntityEventBatch,
EntityEventBatchTypeRef,
EntityUpdate,
WebsocketCounterData,
WebsocketCounterDataTypeRef,
WebsocketEntityDataTypeRef,
WebsocketLeaderStatus,
WebsocketLeaderStatusTypeRef,
} from "../entities/sys/TypeRefs.js"
import { AppName, binarySearch, delay, identity, lastThrow, Nullable, ofClass, promiseMap, randomIntFromInterval, TypeRef } from "@tutao/tutanota-utils"
import { OutOfSyncError } from "../common/error/OutOfSyncError"
import { CloseEventBusOption, GroupType, SECOND_MS } from "../common/TutanotaConstants"
import { CancelledError } from "../common/error/CancelledError"
import { EntityClient } from "../common/EntityClient"
import type { QueuedBatch } from "./EventQueue.js"
import { EventQueue } from "./EventQueue.js"
import { ProgressMonitorDelegate } from "./ProgressMonitorDelegate"
import { compareOldestFirst, GENERATED_MAX_ID, GENERATED_MIN_ID, getElementId, getListId } from "../common/utils/EntityUtils"
import { WsConnectionState } from "../main/WorkerClient"
import { EntityRestCache } from "./rest/DefaultEntityRestCache.js"
import { SleepDetector } from "./utils/SleepDetector.js"
import sysModelInfo from "../entities/sys/ModelInfo.js"
import tutanotaModelInfo from "../entities/tutanota/ModelInfo.js"
import { TypeModelResolver } from "../common/EntityFunctions.js"
import { PhishingMarkerWebsocketDataTypeRef, ReportedMailFieldMarker } from "../entities/tutanota/TypeRefs"
import { UserFacade } from "./facades/UserFacade"
import { ExposedProgressTracker } from "../main/ProgressTracker.js"
import { Entity, ServerModelParsedInstance, ServerModelUntypedInstance } from "../common/EntityTypes"
import { InstancePipeline } from "./crypto/InstancePipeline"
import { EntityUpdateData, entityUpdateToUpdateData } from "../common/utils/EntityUpdateUtils"
import { CryptoFacade } from "./crypto/CryptoFacade"
import { EntityAdapter } from "./crypto/EntityAdapter"
import { EventInstancePrefetcher } from "./EventInstancePrefetcher"
import { AttributeModel } from "../common/AttributeModel"
import { newSyncMetrics } from "./utils/SyncMetrics"
import { SessionKeyNotFoundError } from "../common/error/SessionKeyNotFoundError"
import { hasError } from "../common/utils/ErrorUtils"
assertWorkerOrNode()
export const enum EventBusState {
Automatic = "automatic",
// automatic reconnection is enabled
Suspended = "suspended",
// automatic reconnection is suspended but can be enabled again
Terminated = "terminated", // automatic reconnection is disabled and websocket is closed but can be opened again by calling connect explicit
}
// EntityEventBatches expire after 45 days. keep a time diff security of one day.
export const ENTITY_EVENT_BATCH_EXPIRE_MS = 44 * 24 * 60 * 60 * 1000
const RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS = 30000
const NORMAL_SHUTDOWN_CLOSE_CODE = 1
/**
* Reconnection interval bounds. When we reconnect we pick a random number of seconds in a range to prevent that all the clients connect at the same time which
* would put unnecessary load on the server.
* The range depends on the number of attempts and the server response.
* */
const RECONNECT_INTERVAL = Object.freeze({
SMALL: [5, 10],
MEDIUM: [20, 40],
LARGE: [60, 120],
} as const)
// we store the last 1000 event ids per group, so we know if an event was already processed.
// it is not sufficient to check the last event id because a smaller event id may arrive later
// than a bigger one if the requests are processed in parallel on the server
const MAX_EVENT_IDS_QUEUE_LENGTH = 1000
/** Known types of messages that can be received over websocket. */
const enum MessageType {
EntityUpdate = "entityUpdate",
UnreadCounterUpdate = "unreadCounterUpdate",
PhishingMarkers = "phishingMarkers",
LeaderStatus = "leaderStatus",
}
export const enum ConnectMode {
Initial,
Reconnect,
}
export interface EventBusListener {
onWebsocketStateChanged(state: WsConnectionState): unknown
onCounterChanged(counter: WebsocketCounterData): unknown
onLeaderStatusChanged(leaderStatus: WebsocketLeaderStatus): unknown
onEntityEventsReceived(events: readonly EntityUpdateData[], batchId: Id, groupId: Id): Promise<void>
/**
* @param markers only phishing (not spam) markers will be sent as event bus updates
*/
onPhishingMarkersReceived(markers: ReportedMailFieldMarker[]): unknown
onError(tutanotaError: Error): void
onSyncDone(): unknown
}
export class EventBusClient {
private state: EventBusState
private socket: WebSocket | null
private immediateReconnect: boolean = false // if true tries to reconnect immediately after the websocket is closed
/**
* Map from group id to last event ids (max. _MAX_EVENT_IDS_QUEUE_LENGTH). We keep them to avoid processing the same event twice if
* it comes out of order from the server) and for requesting missed entity events on reconnect.
*
* We do not have to update these event ids if the groups of the user change because we always take the current users groups from the
* LoginFacade.
*/
private lastEntityEventIds: Map<Id, Array<Id>>
/**
* Last batch which was actually added to the queue. We need it to find out when the group is processed
*/
private lastAddedBatchForGroup: Map<Id, Id>
private lastAntiphishingMarkersId: Id | null = null
/** Qrueue to process all events. */
private readonly eventQueue: EventQueue
/** Queue that handles incoming websocket messages only. Caches them until we process downloaded ones and then adds them to eventQueue. */
private readonly entityUpdateMessageQueue: EventQueue
private reconnectTimer: TimeoutID | null
private connectTimer: TimeoutID | null
/**
* Represents a currently retried executing due to a ServiceUnavailableError
*/
private serviceUnavailableRetry: Promise<void> | null = null
private failedConnectionAttempts: number = 0
/**
* Represents the last item from the initial missed entity updates batches.
* This will be used to determinate if the queue has finished processing missed updates
*/
private lastInitialEventBatch: Id | null = null
constructor(
private readonly listener: EventBusListener,
private readonly cache: EntityRestCache,
private readonly userFacade: UserFacade,
private readonly entity: EntityClient,
private readonly instancePipeline: InstancePipeline,
private readonly socketFactory: (path: string) => WebSocket,
private readonly sleepDetector: SleepDetector,
private readonly progressTracker: ExposedProgressTracker,
private readonly typeModelResolver: TypeModelResolver,
private readonly cryptoFacade: CryptoFacade,
private readonly eventInstancePrefetcher: EventInstancePrefetcher,
) {
// We are not connected by default and will not try to unless connect() is called
this.state = EventBusState.Terminated
this.lastEntityEventIds = new Map()
this.lastAddedBatchForGroup = new Map()
this.socket = null
this.reconnectTimer = null
this.connectTimer = null
this.eventQueue = new EventQueue("ws_opt", (modification) => this.eventQueueCallback(modification))
this.entityUpdateMessageQueue = new EventQueue("ws_msg", (batch) => this.entityUpdateMessageQueueCallback(batch))
this.reset()
}
private reset() {
this.immediateReconnect = false
this.lastEntityEventIds.clear()
this.lastAddedBatchForGroup.clear()
this.eventQueue.pause()
this.eventQueue.clear()
this.serviceUnavailableRetry = null
}
/**
* Opens a WebSocket connection to receive server events.
* @param connectMode
*/
connect(connectMode: ConnectMode) {
console.log("ws connect reconnect:", connectMode === ConnectMode.Reconnect, "state:", this.state)
// make sure a retry will be cancelled by setting _serviceUnavailableRetry to null
this.serviceUnavailableRetry = null
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
this.state = EventBusState.Automatic
this.connectTimer = null
const authHeaders = this.userFacade.createAuthHeaders()
// Native query building is not supported in old browser, mithril is not available in the worker
const authQuery =
"modelVersions=" +
sysModelInfo.version +
"." +
tutanotaModelInfo.version +
"&clientVersion=" +
env.versionNumber +
"&userId=" +
this.userFacade.getLoggedInUser()._id +
"&accessToken=" +
authHeaders.accessToken +
(this.lastAntiphishingMarkersId ? "&lastPhishingMarkersId=" + this.lastAntiphishingMarkersId : "") +
(env.clientName ? "&clientName=" + env.clientName : "") +
(env.networkDebugging ? "&network-debugging=" + "enable-network-debugging" : "")
const path = "/event?" + authQuery
this.unsubscribeFromOldWebsocket()
this.socket = this.socketFactory(path)
this.socket.onopen = () => this.onOpen(connectMode)
this.socket.onclose = (event: CloseEvent) => this.onClose(event)
this.socket.onerror = (error: any) => this.onError(error)
this.socket.onmessage = (message: MessageEvent<string>) => this.onMessage(message)
this.sleepDetector.start(() => {
console.log("ws sleep detected, reconnecting...")
this.tryReconnect(true, true)
})
}
/**
* Sends a close event to the server and finally closes the connection.
* The state of this event bus client is reset and the client is terminated (does not automatically reconnect) except reconnect == true
*/
async close(closeOption: CloseEventBusOption): Promise<void> {
console.log("ws close closeOption: ", closeOption, "state:", this.state)
switch (closeOption) {
case CloseEventBusOption.Terminate:
this.terminate()
break
case CloseEventBusOption.Pause:
this.state = EventBusState.Suspended
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
break
case CloseEventBusOption.Reconnect:
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
break
}
this.socket?.close()
}
async tryReconnect(closeIfOpen: boolean, enableAutomaticState: boolean, delay: number | null = null): Promise<void> {
console.log("ws tryReconnect closeIfOpen:", closeIfOpen, "enableAutomaticState:", enableAutomaticState, "delay:", delay)
if (this.reconnectTimer) {
// prevent reconnect race-condition
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
if (!delay) {
this.reconnect(closeIfOpen, enableAutomaticState)
} else {
this.reconnectTimer = setTimeout(() => this.reconnect(closeIfOpen, enableAutomaticState), delay)
}
}
// Returning promise for tests
private onOpen(connectMode: ConnectMode): Promise<void> {
this.failedConnectionAttempts = 0
console.log("ws open state:", this.state)
const p = this.initEntityEvents(connectMode)
this.listener.onWebsocketStateChanged(WsConnectionState.connected)
return p
}
private async decodeEntityEventValue<E extends Entity>(messageType: TypeRef<E>, untypedInstance: ServerModelUntypedInstance): Promise<E> {
const untypedInstanceSanitized = AttributeModel.removeNetworkDebuggingInfoIfNeeded(untypedInstance)
return await this.instancePipeline.decryptAndMap(messageType, untypedInstanceSanitized, null)
}
private onError(error: any) {
console.log("ws error:", error, JSON.stringify(error), "state:", this.state)
}
private async onMessage(message: MessageEvent<string>): Promise<void> {
const [type, ...values] = message.data.split(";")
const value = values.join(";")
switch (type) {
case MessageType.EntityUpdate: {
const entityUpdateData = await this.decodeEntityEventValue(WebsocketEntityDataTypeRef, JSON.parse(value))
this.typeModelResolver.setServerApplicationTypesModelHash(entityUpdateData.applicationTypesHash)
const updates = await promiseMap(entityUpdateData.entityUpdates, async (event) => {
let instance = await this.getInstanceFromEntityEvent(event)
return entityUpdateToUpdateData(this.typeModelResolver, event, instance)
})
this.entityUpdateMessageQueue.add(entityUpdateData.eventBatchId, entityUpdateData.eventBatchOwner, updates)
break
}
case MessageType.UnreadCounterUpdate: {
const counterData = await this.decodeEntityEventValue(WebsocketCounterDataTypeRef, JSON.parse(value))
this.typeModelResolver.setServerApplicationTypesModelHash(counterData.applicationTypesHash)
this.listener.onCounterChanged(counterData)
break
}
case MessageType.PhishingMarkers: {
const data = await this.decodeEntityEventValue(PhishingMarkerWebsocketDataTypeRef, JSON.parse(value))
this.typeModelResolver.setServerApplicationTypesModelHash(data.applicationTypesHash)
this.lastAntiphishingMarkersId = data.lastId
this.listener.onPhishingMarkersReceived(data.markers)
break
}
case MessageType.LeaderStatus: {
const data = await this.decodeEntityEventValue(WebsocketLeaderStatusTypeRef, JSON.parse(value))
if (data.applicationTypesHash) {
this.typeModelResolver.setServerApplicationTypesModelHash(data.applicationTypesHash)
}
this.userFacade.setLeaderStatus(data)
await this.listener.onLeaderStatusChanged(data)
break
}
default:
console.log("ws message with unknown type", type)
break
}
}
private async getInstanceFromEntityEvent(event: EntityUpdate): Promise<Nullable<ServerModelParsedInstance>> {
const typeRef = new TypeRef<any>(event.application as AppName, parseInt(event.typeId!))
if (event.instance != null) {
try {
const serverTypeModel = await this.typeModelResolver.resolveServerTypeReference(typeRef)
const untypedInstance = JSON.parse(event.instance) as ServerModelUntypedInstance
const untypedInstanceSanitized = AttributeModel.removeNetworkDebuggingInfoIfNeeded(untypedInstance)
const encryptedParsedInstance = await this.instancePipeline.typeMapper.applyJsTypes(serverTypeModel, untypedInstanceSanitized)
const entityAdapter = await EntityAdapter.from(serverTypeModel, encryptedParsedInstance, this.instancePipeline)
const migratedEntity = await this.cryptoFacade.applyMigrations(typeRef, entityAdapter)
const sessionKey = await this.cryptoFacade.resolveSessionKey(migratedEntity)
const parsedInstance = await this.instancePipeline.cryptoMapper.decryptParsedInstance(serverTypeModel, encryptedParsedInstance, sessionKey)
if (!hasError(parsedInstance)) {
// we do not want to process the instance if there are _errors (when decrypting)
return parsedInstance
} else {
return null
}
} catch (e) {
if (e instanceof SessionKeyNotFoundError) {
// After resolving the main instance with the BucketKey, the _ownerEncSessionKeys for files on the mails are
// updated only after the UpdateSessionKeyService call, while the _ownerEncSessionKey for the main instance is
// immediately updated. Therefore, there is a brief period where the File created after a reply to a mail has
// null _ownerEncSessionKey. This means we cannot use the instance on the update for the File type.
return null
} else {
throw e
}
}
}
return null
}
private onClose(event: CloseEvent) {
this.failedConnectionAttempts++
console.log("ws close event:", event, "state:", this.state)
this.userFacade.setLeaderStatus(
createWebsocketLeaderStatus({
leaderStatus: false,
// a valid applicationVersionSum and applicationTypesHash can only be provided by the server
applicationVersionSum: null,
applicationTypesHash: null,
}),
)
this.sleepDetector.stop()
// Avoid running into penalties when trying to authenticate with an invalid session
// NotAuthenticatedException 401, AccessDeactivatedException 470, AccessBlocked 472
// do not catch session expired here because websocket will be reused when we authenticate again
const serverCode = event.code - 4000
if ([NotAuthorizedError.CODE, AccessDeactivatedError.CODE, AccessBlockedError.CODE].includes(serverCode)) {
this.terminate()
this.listener.onError(handleRestError(serverCode, "web socket error", null, null))
} else if (serverCode === SessionExpiredError.CODE) {
// session is expired. do not try to reconnect until the user creates a new session
this.state = EventBusState.Suspended
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
} else if (this.state === EventBusState.Automatic && this.userFacade.isFullyLoggedIn()) {
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
if (this.immediateReconnect) {
this.immediateReconnect = false
this.tryReconnect(false, false)
} else {
let reconnectionInterval: readonly [number, number]
if (serverCode === NORMAL_SHUTDOWN_CLOSE_CODE) {
reconnectionInterval = RECONNECT_INTERVAL.LARGE
} else if (this.failedConnectionAttempts === 1) {
reconnectionInterval = RECONNECT_INTERVAL.SMALL
} else if (this.failedConnectionAttempts === 2) {
reconnectionInterval = RECONNECT_INTERVAL.MEDIUM
} else {
reconnectionInterval = RECONNECT_INTERVAL.LARGE
}
this.tryReconnect(false, false, SECOND_MS * randomIntFromInterval(reconnectionInterval[0], reconnectionInterval[1]))
}
}
}
private async initEntityEvents(connectMode: ConnectMode): Promise<void> {
// pause processing entity update message while initializing event queue
this.entityUpdateMessageQueue.pause()
// pause event queue and add all missed entity events first
this.eventQueue.pause()
const existingConnection = connectMode === ConnectMode.Reconnect && this.lastEntityEventIds.size > 0
const p = existingConnection ? this.loadMissedEntityEvents(this.eventQueue) : this.initOnNewConnection()
return p
.then(() => {
this.entityUpdateMessageQueue.resume()
this.eventQueue.resume()
})
.catch(
ofClass(ConnectionError, (e) => {
console.log("ws not connected in connect(), close websocket", e)
this.close(CloseEventBusOption.Reconnect)
}),
)
.catch(
ofClass(CancelledError, () => {
// the processing was aborted due to a reconnect. do not reset any attributes because they might already be in use since reconnection
console.log("ws cancelled retry process entity events after reconnect")
}),
)
.catch(
ofClass(ServiceUnavailableError, async (e) => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
// some EventBatches/missed events are processed already now
// for an existing connection we just keep the current state and continue loading missed events for the other groups
// for a new connection we reset the last entity event ids because otherwise this would not be completed in the next try
if (!existingConnection) {
this.lastEntityEventIds.clear()
}
console.log("ws retry init entity events in ", RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS, e)
let promise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this.serviceUnavailableRetry === promise) {
console.log("ws retry initializing entity events")
return this.initEntityEvents(connectMode)
} else {
console.log("ws cancel initializing entity events")
}
})
this.serviceUnavailableRetry = promise
return promise
}),
)
.catch(
ofClass(OutOfSyncError, async (e) => {
// we did not check for updates for too long, so some missed EntityEventBatches can not be loaded any more
// purge cache if out of sync
await this.cache.purgeStorage()
// We want users to re-login. By the time we get here they probably already have loaded some entities which we cannot update
throw e
}),
)
.catch((e) => {
this.entityUpdateMessageQueue.resume()
this.eventQueue.resume()
this.listener.onError(e)
})
}
private async initOnNewConnection() {
const { lastIds, someIdsWereCached } = await this.retrieveLastEntityEventIds()
// First, we record lastEntityEventIds. We need this to know what we need to re-fetch.
// This is not the same as the cache because we might have already downloaded them but cache might not have processed them yet.
// Important: do it in one step so that we don't have partial IDs in the map in case an error occurs.
this.lastEntityEventIds = lastIds
// Second, we need to initialize the cache too.
if (someIdsWereCached) {
// If some of the last IDs were retrieved from the cache then we want to load from that point to bring cache up-to-date. This is mostly important for
// persistent cache.
await this.loadMissedEntityEvents(this.eventQueue)
} else {
// If the cache is clean then this is a clean cache (either ephemeral after first connect or persistent with empty DB).
// We need to record the time even if we don't process anything to later know if we are out of sync or not.
await this.cache.recordSyncTime()
this.listener.onSyncDone()
}
}
/**
* Gets the latest event batch ids for each of the users groups or min id if there is no event batch yet.
* This is needed to know from where to start loading missed events when we connect.
*/
private async retrieveLastEntityEventIds(): Promise<{ lastIds: Map<Id, Array<Id>>; someIdsWereCached: boolean }> {
// set all last event ids in one step to avoid that we have just set them for a few groups when a ServiceUnavailableError occurs
const lastIds: Map<Id, Array<Id>> = new Map()
let someIdsWereCached = false
for (const groupId of this.eventGroups()) {
const cachedBatchId = await this.cache.getLastEntityEventBatchForGroup(groupId)
if (cachedBatchId != null) {
lastIds.set(groupId, [cachedBatchId])
someIdsWereCached = true
} else {
const batches = await this.entity.loadRange(EntityEventBatchTypeRef, groupId, GENERATED_MAX_ID, 1, true)
const batchId = batches.length === 1 ? getElementId(batches[0]) : GENERATED_MIN_ID
lastIds.set(groupId, [batchId])
// In case we don't receive any events for the group this time we want to still download from this point next time.
await this.cache.setLastEntityEventBatchForGroup(groupId, batchId)
}
}
return { lastIds, someIdsWereCached }
}
/** Load event batches since the last time we were connected to bring cache and other things up-to-date.
* @param eventQueue is passed in for testing
* @VisibleForTesting
* */
async loadMissedEntityEvents(eventQueue: EventQueue): Promise<void> {
if (!this.userFacade.isFullyLoggedIn()) {
console.log("EventBus: skip loadMissedEntityEvents because not fully logged in yet")
return
}
await this.checkOutOfSync()
newSyncMetrics()
let eventBatches: EntityEventBatch[] = []
for (let groupId of this.eventGroups()) {
const eventBatchForGroup = await this.loadEntityEventsForGroup(groupId)
eventBatches = eventBatches.concat(eventBatchForGroup)
}
const timeSortedEventBatches = eventBatches.sort((a, b) => compareOldestFirst(getElementId(a), getElementId(b)))
// Count all batches that will actually be processed so that the progress is correct
let totalExpectedBatches = 0
for (const batch of timeSortedEventBatches) {
const updates = await promiseMap(batch.events, async (event) => {
return entityUpdateToUpdateData(this.typeModelResolver, event)
})
const batchWasAddedToQueue = this.addBatch(getElementId(batch), getListId(batch), updates, eventQueue)
if (batchWasAddedToQueue) {
// Set as last only if it was inserted with success
this.lastInitialEventBatch = getElementId(batch)
totalExpectedBatches++
}
}
const allEventsFlatMap = this.eventQueue.eventQueue.flatMap((eventQ) => eventQ.events)
const allEventsFlatMapSize = allEventsFlatMap.length
// We only have the correct amount of total work after adding all entity event batches.
// The progress for processed batches is tracked inside the event queue.
const progressMonitor = new ProgressMonitorDelegate(this.progressTracker, totalExpectedBatches + allEventsFlatMapSize + 1)
console.log("ws", `progress monitor expects ${totalExpectedBatches} batches`)
await progressMonitor.workDone(1) // show progress right away
eventQueue.setProgressMonitor(progressMonitor)
// We don't have any missing update, we can just set the sync as finished
if (totalExpectedBatches === 0) {
this.eventQueue.getProgressMonitor()?.completed()
this.listener.onSyncDone()
} else {
// preload entity updates
await this.eventInstancePrefetcher.preloadEntities(allEventsFlatMap, progressMonitor)
}
// We've loaded all the batches, we've added them to the queue, we can let the cache remember sync point for us to detect out of sync now.
// It is possible that we will record the time before the batch will be processed but the risk is low.
await this.cache.recordSyncTime()
}
private async loadEntityEventsForGroup(groupId: Id): Promise<EntityEventBatch[]> {
try {
return await this.entity.loadAll(EntityEventBatchTypeRef, groupId, this.getLastEventBatchIdOrMinIdForGroup(groupId))
} catch (e) {
if (e instanceof NotAuthorizedError) {
console.log("ws could not download entity updates, lost permission")
return []
} else {
throw e
}
}
}
private async checkOutOfSync() {
// We try to detect whether event batches have already expired.
// If this happened we don't need to download anything, we need to purge the cache and start all over.
if (await this.cache.isOutOfSync()) {
// We handle it where we initialize the connection and purge the cache there.
throw new OutOfSyncError("some missed EntityEventBatches cannot be loaded any more")
}
}
private async eventQueueCallback(modification: QueuedBatch): Promise<void> {
try {
await this.processEventBatch(modification)
} catch (e) {
console.log("ws error while processing event batches", e)
this.listener.onError(e)
throw e
}
}
private async entityUpdateMessageQueueCallback(batch: QueuedBatch): Promise<void> {
this.addBatch(batch.batchId, batch.groupId, batch.events, this.eventQueue)
this.eventQueue.resume()
}
private unsubscribeFromOldWebsocket() {
if (this.socket) {
// Remove listeners. We don't want old socket to mess our state
this.socket.onopen = this.socket.onclose = this.socket.onerror = this.socket.onmessage = identity
}
}
private async terminate(): Promise<void> {
this.state = EventBusState.Terminated
this.reset()
this.listener.onWebsocketStateChanged(WsConnectionState.terminated)
}
/**
* Tries to reconnect the websocket if it is not connected.
*/
private reconnect(closeIfOpen: boolean, enableAutomaticState: boolean) {
console.log(
"ws reconnect socket.readyState: (CONNECTING=0, OPEN=1, CLOSING=2, CLOSED=3): " + (this.socket ? this.socket.readyState : "null"),
"state:",
this.state,
"closeIfOpen:",
closeIfOpen,
"enableAutomaticState:",
enableAutomaticState,
)
if (this.state !== EventBusState.Terminated && enableAutomaticState) {
this.state = EventBusState.Automatic
}
if (closeIfOpen && this.socket && this.socket.readyState === WebSocket.OPEN) {
this.immediateReconnect = true
this.socket.close()
} else if (
(this.socket == null || this.socket.readyState === WebSocket.CLOSED || this.socket.readyState === WebSocket.CLOSING) &&
this.state !== EventBusState.Terminated &&
this.userFacade.isFullyLoggedIn()
) {
// Don't try to connect right away because connection may not be actually there
// see #1165
if (this.connectTimer) {
clearTimeout(this.connectTimer)
}
this.connectTimer = setTimeout(() => this.connect(ConnectMode.Reconnect), 100)
}
}
private addBatch(batchId: Id, groupId: Id, events: ReadonlyArray<EntityUpdateData>, eventQueue: EventQueue): boolean {
const lastForGroup = this.lastEntityEventIds.get(groupId) || []
// find the position for inserting into last entity events (negative value is considered as not present in the array)
const index = binarySearch(lastForGroup, batchId, compareOldestFirst)
let wasAdded
if (index < 0) {
lastForGroup.splice(-index, 0, batchId)
// only add the batch if it was not processed before
wasAdded = eventQueue.add(batchId, groupId, events)
} else {
wasAdded = false
}
if (lastForGroup.length > MAX_EVENT_IDS_QUEUE_LENGTH) {
lastForGroup.shift()
}
this.lastEntityEventIds.set(groupId, lastForGroup)
if (wasAdded) {
this.lastAddedBatchForGroup.set(groupId, batchId)
}
return wasAdded
}
private async processEventBatch(batch: QueuedBatch): Promise<void> {
try {
if (this.isTerminated()) return
const filteredEvents = await this.cache.entityEventsReceived(batch.events, batch.batchId, batch.groupId)
if (!this.isTerminated()) await this.listener.onEntityEventsReceived(filteredEvents, batch.batchId, batch.groupId)
if (batch.batchId === this.lastInitialEventBatch) {
console.log("Reached final event, sync is done")
this.eventQueue.getProgressMonitor()?.completed()
this.listener.onSyncDone()
}
} catch (e) {
if (e instanceof ServiceUnavailableError) {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
console.log("ws retry processing event in 30s", e)
const retryPromise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this.serviceUnavailableRetry === retryPromise) {
return this.processEventBatch(batch)
} else {
throw new CancelledError("stop retry processing after service unavailable due to reconnect")
}
})
this.serviceUnavailableRetry = retryPromise
return retryPromise
} else {
console.log("EVENT", "error", e)
throw e
}
}
}
private getLastEventBatchIdOrMinIdForGroup(groupId: Id): Id {
const lastIds = this.lastEntityEventIds.get(groupId)
return lastIds && lastIds.length > 0 ? lastThrow(lastIds) : GENERATED_MIN_ID
}
private isTerminated() {
return this.state === EventBusState.Terminated
}
private eventGroups(): Id[] {
return this.userFacade
.getLoggedInUser()
.memberships.filter((membership) => membership.groupType !== GroupType.MailingList)
.map((membership) => membership.group)
.concat(this.userFacade.getLoggedInUser().userGroup.group)
}
}