tutanota/src/api/worker/EventBusClient.ts

668 lines
25 KiB
TypeScript
Raw Normal View History

import {LoginFacadeImpl} from "./facades/LoginFacade"
import type {MailFacade} from "./facades/MailFacade"
import type {WorkerImpl} from "./WorkerImpl"
import {assertWorkerOrNode, isAdminClient, isTest} from "../common/Env"
2017-08-15 13:54:22 +02:00
import {_TypeModel as MailTypeModel} from "../entities/tutanota/Mail"
import {
AccessBlockedError,
AccessDeactivatedError,
ConnectionError,
handleRestError,
NotAuthorizedError,
ServiceUnavailableError,
2021-12-23 14:03:23 +01:00
SessionExpiredError,
} from "../common/error/RestError"
2017-08-15 13:54:22 +02:00
import {EntityEventBatchTypeRef} from "../entities/sys/EntityEventBatch"
import {assertNotNull, binarySearch, delay, identity, lastThrow, ofClass, randomIntFromInterval} from "@tutao/tutanota-utils"
2017-08-15 13:54:22 +02:00
import {OutOfSyncError} from "../common/error/OutOfSyncError"
import type {Indexer} from "./search/Indexer"
import {CloseEventBusOption, GroupType, SECOND_MS} from "../common/TutanotaConstants"
import type {WebsocketEntityData} from "../entities/sys/WebsocketEntityData"
2019-01-17 15:42:16 +01:00
import {_TypeModel as WebsocketEntityDataTypeModel} from "../entities/sys/WebsocketEntityData"
import {CancelledError} from "../common/error/CancelledError"
import {_TypeModel as PhishingMarkerWebsocketDataTypeModel, PhishingMarkerWebsocketData} from "../entities/tutanota/PhishingMarkerWebsocketData"
import {_TypeModel as WebsocketCounterDataTypeModel, WebsocketCounterData} from "../entities/sys/WebsocketCounterData"
import type {EntityUpdate} from "../entities/sys/EntityUpdate"
import {EntityClient} from "../common/EntityClient"
import type {QueuedBatch} from "./search/EventQueue"
import {EventQueue} from "./search/EventQueue"
import {_TypeModel as WebsocketLeaderStatusTypeModel, createWebsocketLeaderStatus, WebsocketLeaderStatus} from "../entities/sys/WebsocketLeaderStatus"
2020-11-17 16:29:19 +01:00
import {ProgressMonitorDelegate} from "./ProgressMonitorDelegate"
import type {IProgressMonitor} from "../common/utils/ProgressMonitor"
import {NoopProgressMonitor} from "../common/utils/ProgressMonitor"
import {compareOldestFirst, firstBiggerThanSecond, GENERATED_MAX_ID, GENERATED_MIN_ID, getElementId, getLetId, isSameId} from "../common/utils/EntityUtils"
2021-12-15 16:07:07 +01:00
import {InstanceMapper} from "./crypto/InstanceMapper"
2021-12-28 13:53:11 +01:00
import {WsConnectionState} from "../main/WorkerClient";
import {IEntityRestCache} from "./rest/EntityRestCache"
2017-08-15 13:54:22 +02:00
assertWorkerOrNode()
2021-12-23 14:03:23 +01:00
export const enum EventBusState {
Automatic = "automatic",
2021-12-23 14:03:23 +01:00
// automatic reconnection is enabled
Suspended = "suspended",
// automatic reconnection is suspended but can be enabled again
Terminated = "terminated", // automatic reconnection is disabled and websocket is closed but can be opened again by calling connect explicit
}
2020-09-15 11:35:25 +02:00
// EntityEventBatches expire after 45 days. keep a time diff security of one day.
export const ENTITY_EVENT_BATCH_EXPIRE_MS = 44 * 24 * 60 * 60 * 1000
const RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS = 30000
const NORMAL_SHUTDOWN_CLOSE_CODE = 1
2022-02-28 15:19:23 +01:00
/**
* Reconnection interval bounds. When we reconnect we pick a random number of seconds in a range. The range depends on the number of attempts and server
* return.
* */
const RECONNECT_INTERVAL = Object.freeze({
SMALL: [5, 10],
MEDIUM: [20, 40],
LARGE: [60, 120],
} as const)
// we store the last 1000 event ids per group, so we know if an event was already processed.
// it is not sufficient to check the last event id because a smaller event id may arrive later
// than a bigger one if the requests are processed in parallel on the server
const MAX_EVENT_IDS_QUEUE_LENGTH = 1000
2022-02-28 15:42:16 +01:00
const enum MessageType {
EntityUpdate = "entityUpdate",
UnreadCounterUpdate = "unreadCounterUpdate",
PhishingMarkers = "phishingMarkers",
LeaderStatus = "leaderStatus",
}
export const enum ConnectMode {
Initial,
Reconnect,
}
2017-08-15 13:54:22 +02:00
export class EventBusClient {
2022-02-28 15:42:16 +01:00
private state: EventBusState
private socket: WebSocket | null
private immediateReconnect: boolean = false // if true tries to reconnect immediately after the websocket is closed
2021-12-23 14:03:23 +01:00
/**
* Map from group id to last event ids (max. _MAX_EVENT_IDS_QUEUE_LENGTH). We keep them to avoid processing the same event twice if
* it comes out of order from the server) and for requesting missed entity events on reconnect.
*
* We do not have to update these event ids if the groups of the user change because we always take the current users groups from the
* LoginFacade.
*/
private lastEntityEventIds: Map<Id, Array<Id>>
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
/**
* Last batch which was actually added to the queue. We need it to find out when the group is processed
*/
private lastAddedBatchForGroup: Map<Id, Id>
2021-12-23 14:03:23 +01:00
private lastAntiphishingMarkersId: Id | null = null
2017-08-15 13:54:22 +02:00
2022-02-28 15:42:16 +01:00
/** Queue to process all events. */
private readonly eventQueue: EventQueue
2017-08-15 13:54:22 +02:00
2022-02-28 15:42:16 +01:00
/** Queue that handles incoming websocket messages only. Caches them until we process downloaded ones and then adds them to eventQueue. */
private readonly entityUpdateMessageQueue: EventQueue
private reconnectTimer: TimeoutID | null
private connectTimer: TimeoutID | null
/**
* Represents a currently retried executing due to a ServiceUnavailableError
*/
private serviceUnavailableRetry: Promise<void> | null = null
private failedConnectionAttempts: number = 0
private progressMonitor: IProgressMonitor
2021-12-23 14:03:23 +01:00
constructor(
2022-02-28 15:19:23 +01:00
private readonly worker: WorkerImpl,
private readonly indexer: Indexer,
private readonly cache: IEntityRestCache,
private readonly mail: MailFacade,
private readonly login: LoginFacadeImpl,
private readonly entity: EntityClient,
private readonly instanceMapper: InstanceMapper,
private readonly socketFactory: (path: string) => WebSocket,
2021-12-23 14:03:23 +01:00
) {
2022-02-28 15:42:16 +01:00
this.state = EventBusState.Automatic
this.lastEntityEventIds = new Map()
this.lastAddedBatchForGroup = new Map()
this.socket = null
this.reconnectTimer = null
this.connectTimer = null
this.progressMonitor = new NoopProgressMonitor()
this.eventQueue = new EventQueue(true, modification => this.eventQueueCallback(modification))
this.entityUpdateMessageQueue = new EventQueue(false, (batch) => this.entityUpdateMessageQueueCallback(batch))
this.reset()
2020-12-09 16:07:56 +01:00
}
private reset() {
this.immediateReconnect = false
2021-12-23 14:03:23 +01:00
this.lastEntityEventIds.clear()
2021-12-23 14:03:23 +01:00
this.lastAddedBatchForGroup.clear()
2021-12-23 14:03:23 +01:00
this.eventQueue.pause()
2021-12-23 14:03:23 +01:00
this.eventQueue.clear()
2020-12-09 16:07:56 +01:00
this.serviceUnavailableRetry = null
2017-12-20 16:48:36 +01:00
}
2017-08-15 13:54:22 +02:00
/**
* Opens a WebSocket connection to receive server events.
* @param connectMode
2017-08-15 13:54:22 +02:00
*/
connect(connectMode: ConnectMode) {
console.log("ws connect reconnect:", connectMode === ConnectMode.Reconnect, "state:", this.state)
// make sure a retry will be cancelled by setting _serviceUnavailableRetry to null
this.serviceUnavailableRetry = null
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
2020-12-07 17:02:35 +01:00
// Task for updating events are number of groups + 2. Use 2 as base for reconnect state.
if (this.progressMonitor) {
// Say that the old monitor is completed so that we don't calculate its amount as still to do.
this.progressMonitor.completed()
}
2021-12-23 14:03:23 +01:00
this.progressMonitor = connectMode === ConnectMode.Reconnect
? new ProgressMonitorDelegate(this.eventGroups().length + 2, this.worker)
: new NoopProgressMonitor()
2021-12-23 14:03:23 +01:00
this.progressMonitor.workDone(1)
2021-12-23 14:03:23 +01:00
2022-02-28 15:42:16 +01:00
this.state = EventBusState.Automatic
this.connectTimer = null
2018-07-31 17:07:41 +02:00
const authHeaders = this.login.createAuthHeaders()
2021-12-23 14:03:23 +01:00
2019-01-17 15:42:16 +01:00
// Native query building is not supported in old browser, mithril is not available in the worker
const authQuery =
"modelVersions=" +
WebsocketEntityDataTypeModel.version +
"." +
MailTypeModel.version +
"&clientVersion=" +
env.versionNumber +
"&userId=" +
this.login.getLoggedInUser()._id +
"&accessToken=" +
authHeaders.accessToken +
(this.lastAntiphishingMarkersId ? "&lastPhishingMarkersId=" + this.lastAntiphishingMarkersId : "")
const path = "/event?" + authQuery
this.unsubscribeFromOldWebsocket()
2021-12-23 14:03:23 +01:00
this.socket = this.socketFactory(path)
this.socket.onopen = () => this.onOpen(connectMode)
2022-02-28 15:42:16 +01:00
this.socket.onclose = (event: CloseEvent) => this.onClose(event)
this.socket.onerror = (error: any) => this.onError(error)
this.socket.onmessage = (message: MessageEvent<string>) => this.onMessage(message)
2017-08-15 13:54:22 +02:00
}
/**
2017-09-28 11:32:02 +02:00
* Sends a close event to the server and finally closes the connection.
2017-12-20 16:48:36 +01:00
* The state of this event bus client is reset and the client is terminated (does not automatically reconnect) except reconnect == true
2017-08-15 13:54:22 +02:00
*/
2021-12-23 14:03:23 +01:00
close(closeOption: CloseEventBusOption) {
console.log("ws close closeOption: ", closeOption, "state:", this.state)
2021-12-23 14:03:23 +01:00
switch (closeOption) {
case CloseEventBusOption.Terminate:
this.terminate()
2021-12-23 14:03:23 +01:00
break
2021-12-23 14:03:23 +01:00
case CloseEventBusOption.Pause:
2022-02-28 15:42:16 +01:00
this.state = EventBusState.Suspended
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
2021-12-23 14:03:23 +01:00
break
2021-12-23 14:03:23 +01:00
case CloseEventBusOption.Reconnect:
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
2021-12-23 14:03:23 +01:00
break
}
this.socket?.close()
2017-08-15 13:54:22 +02:00
}
2022-03-01 16:27:14 +01:00
tryReconnect(closeIfOpen: boolean, enableAutomaticState: boolean, delay: number | null = null) {
console.log("ws tryReconnect closeIfOpen:", closeIfOpen, "enableAutomaticState:", enableAutomaticState, "delay:", delay)
2022-03-01 16:27:14 +01:00
if (this.reconnectTimer) {
// prevent reconnect race-condition
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
2022-03-01 16:27:14 +01:00
if (!delay) {
this.reconnect(closeIfOpen, enableAutomaticState)
} else {
this.reconnectTimer = setTimeout(() => this.reconnect(closeIfOpen, enableAutomaticState), delay)
}
}
2022-03-01 16:27:14 +01:00
// Returning promise for tests
private onOpen(connectMode: ConnectMode): Promise<void> {
this.failedConnectionAttempts = 0
console.log("ws open state:", this.state)
2022-03-01 16:27:14 +01:00
// Indicate some progress right away
this.progressMonitor.workDone(1)
2022-03-01 16:27:14 +01:00
const p = this.initEntityEvents(connectMode)
2021-12-23 14:03:23 +01:00
2022-03-01 16:27:14 +01:00
this.worker.updateWebSocketState(WsConnectionState.connected)
2021-12-23 14:03:23 +01:00
2022-03-01 16:27:14 +01:00
return p
2017-12-20 16:48:36 +01:00
}
2022-02-28 15:42:16 +01:00
private onError(error: any) {
console.log("ws error:", error, JSON.stringify(error), "state:", this.state)
2017-08-15 13:54:22 +02:00
}
private async onMessage(message: MessageEvent<string>): Promise<void> {
2022-02-28 15:42:16 +01:00
const [type, value] = message.data.split(";")
2021-12-23 14:03:23 +01:00
2022-02-28 15:42:16 +01:00
switch (type) {
case MessageType.EntityUpdate: {
const data: WebsocketEntityData = await this.instanceMapper.decryptAndMapToInstance(
WebsocketEntityDataTypeModel,
JSON.parse(value),
null,
)
this.entityUpdateMessageQueue.add(data.eventBatchId, data.eventBatchOwner, data.eventBatch)
2022-02-28 15:42:16 +01:00
break
}
case MessageType.UnreadCounterUpdate: {
const counterData: WebsocketCounterData = await this.instanceMapper.decryptAndMapToInstance(
WebsocketCounterDataTypeModel,
JSON.parse(value),
null,
)
this.worker.updateCounter(counterData)
break
}
case MessageType.PhishingMarkers: {
const data: PhishingMarkerWebsocketData = await this.instanceMapper.decryptAndMapToInstance(
PhishingMarkerWebsocketDataTypeModel,
JSON.parse(value),
null,
)
this.lastAntiphishingMarkersId = data.lastId
this.mail.phishingMarkersUpdateReceived(data.markers)
2022-02-28 15:42:16 +01:00
break
}
case MessageType.LeaderStatus:
const data: WebsocketLeaderStatus = await this.instanceMapper.decryptAndMapToInstance(
WebsocketLeaderStatusTypeModel,
JSON.parse(value),
null,
)
await this.login.setLeaderStatus(data)
break
default:
console.log("ws message with unknown type", type)
break
2019-01-17 15:42:16 +01:00
}
2017-08-15 13:54:22 +02:00
}
2022-02-28 15:42:16 +01:00
private onClose(event: CloseEvent) {
this.failedConnectionAttempts++
console.log("ws close event:", event, "state:", this.state)
2021-12-23 14:03:23 +01:00
this.login.setLeaderStatus(
createWebsocketLeaderStatus({
leaderStatus: false,
}),
2021-12-23 14:03:23 +01:00
)
// Avoid running into penalties when trying to authenticate with an invalid session
// NotAuthenticatedException 401, AccessDeactivatedException 470, AccessBlocked 472
// do not catch session expired here because websocket will be reused when we authenticate again
const serverCode = event.code - 4000
2021-12-23 14:03:23 +01:00
if ([NotAuthorizedError.CODE, AccessDeactivatedError.CODE, AccessBlockedError.CODE].includes(serverCode)) {
this.terminate()
2022-02-28 15:19:23 +01:00
this.worker.sendError(handleRestError(serverCode, "web socket error", null, null))
} else if (serverCode === SessionExpiredError.CODE) {
// session is expired. do not try to reconnect until the user creates a new session
2022-02-28 15:42:16 +01:00
this.state = EventBusState.Suspended
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
2022-02-28 15:42:16 +01:00
} else if (this.state === EventBusState.Automatic && this.login.isLoggedIn()) {
2022-02-28 15:19:23 +01:00
this.worker.updateWebSocketState(WsConnectionState.connecting)
if (this.immediateReconnect) {
this.immediateReconnect = false
2021-12-23 14:03:23 +01:00
this.tryReconnect(false, false)
} else {
2021-12-28 13:53:11 +01:00
let reconnectionInterval: readonly [number, number]
if (serverCode === NORMAL_SHUTDOWN_CLOSE_CODE) {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.LARGE
} else if (this.failedConnectionAttempts === 1) {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.SMALL
} else if (this.failedConnectionAttempts === 2) {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.MEDIUM
} else {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.LARGE
}
2021-12-23 14:03:23 +01:00
this.tryReconnect(false, false, SECOND_MS * randomIntFromInterval(reconnectionInterval[0], reconnectionInterval[1]))
2017-08-15 13:54:22 +02:00
}
}
}
2022-03-01 16:27:14 +01:00
private async initEntityEvents(connectMode: ConnectMode): Promise<void> {
// pause processing entity update message while initializing event queue
this.entityUpdateMessageQueue.pause()
2021-12-23 14:03:23 +01:00
2022-03-01 16:27:14 +01:00
// pause event queue and add all missed entity events first
this.eventQueue.pause()
2022-03-01 16:27:14 +01:00
const existingConnection = connectMode == ConnectMode.Reconnect && this.lastEntityEventIds.size > 0
const p = (existingConnection) ? this.loadMissedEntityEvents() : this.initOnNewConnection()
2017-08-15 13:54:22 +02:00
2022-03-01 16:27:14 +01:00
return p
.then(() => {
this.entityUpdateMessageQueue.resume()
this.eventQueue.resume()
})
.catch(ofClass(ConnectionError, e => {
console.log("ws not connected in connect(), close websocket", e)
this.close(CloseEventBusOption.Reconnect)
}))
.catch(ofClass(CancelledError, () => {
// the processing was aborted due to a reconnect. do not reset any attributes because they might already be in use since reconnection
console.log("ws cancelled retry process entity events after reconnect")
}))
.catch(ofClass(ServiceUnavailableError, async e => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
// some EventBatches/missed events are processed already now
// for an existing connection we just keep the current state and continue loading missed events for the other groups
// for a new connection we reset the last entity event ids because otherwise this would not be completed in the next try
if (!existingConnection) {
this.lastEntityEventIds.clear()
}
2021-12-23 14:03:23 +01:00
2022-03-01 16:27:14 +01:00
console.log("ws retry init entity events in ", RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS, e)
let promise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this.serviceUnavailableRetry === promise) {
console.log("ws retry initializing entity events")
return this.initEntityEvents(connectMode)
} else {
console.log("ws cancel initializing entity events")
}
})
this.serviceUnavailableRetry = promise
return promise
}))
.catch(ofClass(OutOfSyncError, async (e) => {
// we did not check for updates for too long, so some missed EntityEventBatches can not be loaded any more
// purge cache if out of sync
await this.cache.purgeStorage()
// We want users to re-login. By the time we get here they probably already have loaded some entities which we cannot update
throw e
}))
.catch(e => {
this.entityUpdateMessageQueue.resume()
2021-12-23 14:03:23 +01:00
2022-03-01 16:27:14 +01:00
this.eventQueue.resume()
2021-12-23 14:03:23 +01:00
2022-03-01 16:27:14 +01:00
this.worker.sendError(e)
})
}
private async initOnNewConnection() {
const {lastIds, someIdsWereCached} = await this.retrieveLastEntityEventIds()
// First, we record lastEntityEventIds. We need this to know what we need to re-fetch.
// This is not the same as the cache because we might have already downloaded them but cache might not have processed them yet.
// Important: do it in one step so that we don't have partial IDs in the map in case an error occurs.
this.lastEntityEventIds = lastIds
// Second, we need to initialize the cache too.
if (someIdsWereCached) {
// If some of the last IDs were retrieved from the cache then we want to load from that point to bring cache up-to-date. This is mostly important for
// persistent cache.
await this.loadMissedEntityEvents()
} else {
// If the cache is clean then this is a clean cache (either ephemeral after first connect or persistent with empty DB).
// We need to record the time even if we don't process anything to later know if we are out of sync or not.
await this.cache.recordSyncTime()
2017-08-15 13:54:22 +02:00
}
}
/**
* Gets the latest event batch ids for each of the users groups or min id if there is no event batch yet.
* This is needed to know from where to start loading missed events when we connect.
2017-08-15 13:54:22 +02:00
*/
private async retrieveLastEntityEventIds(): Promise<{lastIds: Map<Id, Array<Id>>, someIdsWereCached: boolean}> {
// set all last event ids in one step to avoid that we have just set them for a few groups when a ServiceUnavailableError occurs
2020-12-11 14:45:42 +01:00
const lastIds: Map<Id, Array<Id>> = new Map()
let someIdsWereCached = false
for (const groupId of this.eventGroups()) {
const cachedBatchId = await this.cache.getLastEntityEventBatchForGroup(groupId)
if (cachedBatchId != null) {
lastIds.set(groupId, [cachedBatchId])
someIdsWereCached = true
} else {
const batches = await this.entity.loadRange(EntityEventBatchTypeRef, groupId, GENERATED_MAX_ID, 1, true)
const batchId = batches.length === 1 ? getElementId(batches[0]) : GENERATED_MIN_ID
lastIds.set(groupId, [batchId])
// In case we don't receive any events for the group this time we want to still download from this point next time.
await this.cache.setLastEntityEventBatchForGroup(groupId, batchId)
}
}
2021-12-23 14:03:23 +01:00
return {lastIds, someIdsWereCached}
2017-08-15 13:54:22 +02:00
}
/** Load event batches since the last time we were connected to bring cache and other things up-to-date. */
private async loadMissedEntityEvents(): Promise<void> {
if (!this.login.isLoggedIn()) {
return
}
// We try to detect whether event batches have already expired.
// If this happened we don't need to download anything, we need to purge the cache and start all over.
const sinceLastUpdate = await this.cache.timeSinceLastSync()
if (sinceLastUpdate != null && sinceLastUpdate > ENTITY_EVENT_BATCH_EXPIRE_MS) {
console.log("ws cache is out of sync, purging...")
// Allow the progress bar to complete
this.progressMonitor.workDone(this.eventGroups().length)
throw new OutOfSyncError("some missed EntityEventBatches cannot be loaded any more")
} else {
for (let groupId of this.eventGroups()) {
let eventBatches
try {
eventBatches = await this.entity.loadAll(EntityEventBatchTypeRef, groupId, this.getLastEventBatchIdOrMinIdForGroup(groupId))
} catch (e) {
if (e instanceof NotAuthorizedError) {
console.log("ws could not download entity updates, lost permission")
// We need to do this to mark group as "processed", otherwise progress bar will get stuck
this.progressMonitor.workDone(1)
continue
} else {
throw e
}
}
if (eventBatches.length === 0) {
// There won't be a callback from the queue to process the event so we mark this group as
// completed right away
this.progressMonitor.workDone(1)
} else {
for (const batch of eventBatches) {
this.addBatch(getElementId(batch), groupId, batch.events)
}
}
2020-09-15 11:35:25 +02:00
}
// We've loaded all the batches, we've added them to the queue, we can let the cache remember sync point for us to detect out of sync now.
// It is possible that we will record the time before the batch will be processed but the risk is low.
await this.cache.recordSyncTime()
2017-08-15 13:54:22 +02:00
}
}
2022-03-01 16:27:14 +01:00
private async eventQueueCallback(modification: QueuedBatch): Promise<void> {
try {
await this.processEventBatch(modification)
} catch (e) {
console.log("ws error while processing event batches", e)
this.worker.sendError(e)
throw e
}
// If we completed the event, it was added before
const lastForGroup = assertNotNull(this.lastAddedBatchForGroup.get(modification.groupId))
if (isSameId(modification.batchId, lastForGroup) || firstBiggerThanSecond(modification.batchId, lastForGroup)) {
this.progressMonitor && this.progressMonitor.workDone(1)
}
}
private async entityUpdateMessageQueueCallback(batch: QueuedBatch): Promise<void> {
this.addBatch(batch.batchId, batch.groupId, batch.events)
this.eventQueue.resume()
}
private unsubscribeFromOldWebsocket() {
if (this.socket) {
// Remove listeners. We don't want old socket to mess our state
this.socket.onopen = this.socket.onclose = this.socket.onerror = this.socket.onmessage = identity
}
}
private async terminate(): Promise<void> {
this.state = EventBusState.Terminated
this.reset()
this.worker.updateWebSocketState(WsConnectionState.terminated)
}
/**
* Tries to reconnect the websocket if it is not connected.
*/
private reconnect(closeIfOpen: boolean, enableAutomaticState: boolean) {
console.log(
"ws reconnect socket.readyState: (CONNECTING=0, OPEN=1, CLOSING=2, CLOSED=3): " + (this.socket ? this.socket.readyState : "null"),
"state:",
this.state,
"closeIfOpen:",
closeIfOpen,
"enableAutomaticState:",
enableAutomaticState,
)
if (this.state !== EventBusState.Terminated && enableAutomaticState) {
this.state = EventBusState.Automatic
}
if (closeIfOpen && this.socket && this.socket.readyState === WebSocket.OPEN) {
this.immediateReconnect = true
this.socket.close()
} else if (
(this.socket == null || this.socket.readyState === WebSocket.CLOSED || this.socket.readyState === WebSocket.CLOSING) &&
this.state !== EventBusState.Terminated &&
this.login.isLoggedIn()
) {
// Don't try to connect right away because connection may not be actually there
// see #1165
if (this.connectTimer) {
clearTimeout(this.connectTimer)
}
this.connectTimer = setTimeout(() => this.connect(ConnectMode.Reconnect), 100)
}
}
private addBatch(batchId: Id, groupId: Id, events: ReadonlyArray<EntityUpdate>) {
const lastForGroup = this.lastEntityEventIds.get(groupId) || []
2020-12-10 17:18:19 +01:00
// find the position for inserting into last entity events (negative value is considered as not present in the array)
const index = binarySearch(lastForGroup, batchId, compareOldestFirst)
2020-12-11 14:45:42 +01:00
let wasAdded
2021-12-23 14:03:23 +01:00
if (index < 0) {
lastForGroup.splice(-index, 0, batchId)
2020-12-10 17:18:19 +01:00
// only add the batch if it was not process before
wasAdded = this.eventQueue.add(batchId, groupId, events)
2020-12-11 14:45:42 +01:00
} else {
wasAdded = false
}
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
if (lastForGroup.length > MAX_EVENT_IDS_QUEUE_LENGTH) {
lastForGroup.shift()
2017-08-15 13:54:22 +02:00
}
2021-12-23 14:03:23 +01:00
this.lastEntityEventIds.set(batchId, lastForGroup)
2020-12-11 14:45:42 +01:00
if (wasAdded) {
this.lastAddedBatchForGroup.set(groupId, batchId)
2020-12-11 14:45:42 +01:00
}
}
private processEventBatch(batch: QueuedBatch): Promise<void> {
return this.executeIfNotTerminated(async () => {
const filteredEvents = await this.cache.entityEventsReceived(batch)
await this.executeIfNotTerminated(() => this.login.entityEventsReceived(filteredEvents))
await this.executeIfNotTerminated(() => this.mail.entityEventsReceived(filteredEvents))
2022-02-28 15:19:23 +01:00
await this.executeIfNotTerminated(() => this.worker.entityEventsReceived(filteredEvents, batch.groupId))
// Call the indexer in this last step because now the processed event is stored and the indexer has a separate event queue that
// shall not receive the event twice.
if (!isTest() && !isAdminClient()) {
this.executeIfNotTerminated(() => {
this.indexer.addBatchesToQueue([
{
groupId: batch.groupId,
batchId: batch.batchId,
events: filteredEvents,
},
])
this.indexer.startProcessing()
})
}
}).catch(ofClass(ServiceUnavailableError, e => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
console.log("ws retry processing event in 30s", e)
let promise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this.serviceUnavailableRetry === promise) {
return this.processEventBatch(batch)
} else {
throw new CancelledError("stop retry processing after service unavailable due to reconnect")
}
})
this.serviceUnavailableRetry = promise
return promise
}))
2017-08-15 13:54:22 +02:00
}
private getLastEventBatchIdOrMinIdForGroup(groupId: Id): Id {
const lastIds = this.lastEntityEventIds.get(groupId)
2021-12-23 14:03:23 +01:00
return lastIds && lastIds.length > 0 ? lastThrow(lastIds) : GENERATED_MIN_ID
}
private executeIfNotTerminated(call: (...args: Array<any>) => any): Promise<void> {
2022-02-28 15:42:16 +01:00
if (this.state !== EventBusState.Terminated) {
2017-08-15 13:54:22 +02:00
return call()
} else {
return Promise.resolve()
}
}
private eventGroups(): Id[] {
return this.login
.getLoggedInUser()
.memberships.filter(membership => membership.groupType !== GroupType.MailingList)
.map(membership => membership.group)
.concat(this.login.getLoggedInUser().userGroup.group)
}
}