tutanota/src/api/worker/EventBusClient.ts

694 lines
25 KiB
TypeScript
Raw Normal View History

2022-12-28 15:28:28 +01:00
import { assertWorkerOrNode } from "../common/Env"
import {
AccessBlockedError,
AccessDeactivatedError,
ConnectionError,
handleRestError,
NotAuthorizedError,
ServiceUnavailableError,
2021-12-23 14:03:23 +01:00
SessionExpiredError,
} from "../common/error/RestError"
import {
createWebsocketLeaderStatus,
EntityEventBatch,
EntityEventBatchTypeRef,
EntityUpdate,
WebsocketCounterData,
WebsocketCounterDataTypeRef,
WebsocketEntityData,
WebsocketEntityDataTypeRef,
WebsocketLeaderStatus,
2022-12-27 15:37:40 +01:00
WebsocketLeaderStatusTypeRef,
} from "../entities/sys/TypeRefs.js"
2022-12-27 15:37:40 +01:00
import { assertNotNull, binarySearch, delay, identity, lastThrow, ofClass, randomIntFromInterval } from "@tutao/tutanota-utils"
import { OutOfSyncError } from "../common/error/OutOfSyncError"
import { CloseEventBusOption, GroupType, SECOND_MS } from "../common/TutanotaConstants"
import { CancelledError } from "../common/error/CancelledError"
import { EntityClient } from "../common/EntityClient"
import type { QueuedBatch } from "./search/EventQueue"
import { EventQueue } from "./search/EventQueue"
import { ProgressMonitorDelegate } from "./ProgressMonitorDelegate"
import type { IProgressMonitor } from "../common/utils/ProgressMonitor"
import { NoopProgressMonitor } from "../common/utils/ProgressMonitor"
import { compareOldestFirst, firstBiggerThanSecond, GENERATED_MAX_ID, GENERATED_MIN_ID, getElementId, isSameId } from "../common/utils/EntityUtils"
import { InstanceMapper } from "./crypto/InstanceMapper"
import { WsConnectionState } from "../main/WorkerClient"
import { EntityRestCache } from "./rest/DefaultEntityRestCache.js"
import { SleepDetector } from "./utils/SleepDetector.js"
import sysModelInfo from "../entities/sys/ModelInfo.js"
import tutanotaModelInfo from "../entities/tutanota/ModelInfo.js"
2022-12-27 15:37:40 +01:00
import { resolveTypeReference } from "../common/EntityFunctions.js"
2022-12-28 15:28:28 +01:00
import { PhishingMarker, PhishingMarkerWebsocketData, PhishingMarkerWebsocketDataTypeRef } from "../entities/tutanota/TypeRefs"
2022-12-27 15:37:40 +01:00
import { UserFacade } from "./facades/UserFacade"
2022-12-28 15:28:28 +01:00
import { ExposedProgressTracker } from "../main/ProgressTracker.js"
2017-08-15 13:54:22 +02:00
assertWorkerOrNode()
2021-12-23 14:03:23 +01:00
export const enum EventBusState {
Automatic = "automatic",
2021-12-23 14:03:23 +01:00
// automatic reconnection is enabled
Suspended = "suspended",
// automatic reconnection is suspended but can be enabled again
Terminated = "terminated", // automatic reconnection is disabled and websocket is closed but can be opened again by calling connect explicit
}
2020-09-15 11:35:25 +02:00
// EntityEventBatches expire after 45 days. keep a time diff security of one day.
export const ENTITY_EVENT_BATCH_EXPIRE_MS = 44 * 24 * 60 * 60 * 1000
const RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS = 30000
const NORMAL_SHUTDOWN_CLOSE_CODE = 1
2022-02-28 15:19:23 +01:00
/**
* Reconnection interval bounds. When we reconnect we pick a random number of seconds in a range to prevent that all the clients connect at the same time which
* would put unnecessary load on the server.
* The range depends on the number of attempts and the server response.
2022-02-28 15:19:23 +01:00
* */
const RECONNECT_INTERVAL = Object.freeze({
SMALL: [5, 10],
MEDIUM: [20, 40],
LARGE: [60, 120],
} as const)
// we store the last 1000 event ids per group, so we know if an event was already processed.
// it is not sufficient to check the last event id because a smaller event id may arrive later
// than a bigger one if the requests are processed in parallel on the server
const MAX_EVENT_IDS_QUEUE_LENGTH = 1000
/** Known types of messages that can be received over websocket. */
2022-02-28 15:42:16 +01:00
const enum MessageType {
EntityUpdate = "entityUpdate",
UnreadCounterUpdate = "unreadCounterUpdate",
PhishingMarkers = "phishingMarkers",
LeaderStatus = "leaderStatus",
}
export const enum ConnectMode {
Initial,
Reconnect,
}
2022-12-28 15:28:28 +01:00
export interface EventBusListener {
onWebsocketStateChanged(state: WsConnectionState): unknown
onCounterChanged(counter: WebsocketCounterData): unknown
onLeaderStatusChanged(leaderStatus: WebsocketLeaderStatus): unknown
onEntityEventsReceived(events: EntityUpdate[], batchId: Id, groupId: Id): Promise<void>
onPhishingMarkersReceived(markers: PhishingMarker[]): unknown
onError(tutanotaError: Error): void
}
2017-08-15 13:54:22 +02:00
export class EventBusClient {
2022-02-28 15:42:16 +01:00
private state: EventBusState
private socket: WebSocket | null
private immediateReconnect: boolean = false // if true tries to reconnect immediately after the websocket is closed
2021-12-23 14:03:23 +01:00
/**
* Map from group id to last event ids (max. _MAX_EVENT_IDS_QUEUE_LENGTH). We keep them to avoid processing the same event twice if
* it comes out of order from the server) and for requesting missed entity events on reconnect.
*
* We do not have to update these event ids if the groups of the user change because we always take the current users groups from the
* LoginFacade.
*/
private lastEntityEventIds: Map<Id, Array<Id>>
2021-12-23 14:03:23 +01:00
2020-12-11 14:45:42 +01:00
/**
* Last batch which was actually added to the queue. We need it to find out when the group is processed
*/
private lastAddedBatchForGroup: Map<Id, Id>
2021-12-23 14:03:23 +01:00
private lastAntiphishingMarkersId: Id | null = null
2017-08-15 13:54:22 +02:00
2022-02-28 15:42:16 +01:00
/** Queue to process all events. */
private readonly eventQueue: EventQueue
2017-08-15 13:54:22 +02:00
2022-02-28 15:42:16 +01:00
/** Queue that handles incoming websocket messages only. Caches them until we process downloaded ones and then adds them to eventQueue. */
private readonly entityUpdateMessageQueue: EventQueue
private reconnectTimer: TimeoutID | null
private connectTimer: TimeoutID | null
/**
* Represents a currently retried executing due to a ServiceUnavailableError
*/
private serviceUnavailableRetry: Promise<void> | null = null
private failedConnectionAttempts: number = 0
private progressMonitor: IProgressMonitor
2021-12-23 14:03:23 +01:00
constructor(
2022-12-28 15:28:28 +01:00
private readonly listener: EventBusListener,
private readonly cache: EntityRestCache,
private readonly userFacade: UserFacade,
2022-02-28 15:19:23 +01:00
private readonly entity: EntityClient,
private readonly instanceMapper: InstanceMapper,
private readonly socketFactory: (path: string) => WebSocket,
private readonly sleepDetector: SleepDetector,
2022-12-28 15:28:28 +01:00
private readonly progressTracker: ExposedProgressTracker,
2021-12-23 14:03:23 +01:00
) {
// We are not connected by default and will not try to unless connect() is called
this.state = EventBusState.Terminated
this.lastEntityEventIds = new Map()
this.lastAddedBatchForGroup = new Map()
this.socket = null
this.reconnectTimer = null
this.connectTimer = null
this.progressMonitor = new NoopProgressMonitor()
2022-12-27 15:37:40 +01:00
this.eventQueue = new EventQueue(true, (modification) => this.eventQueueCallback(modification))
this.entityUpdateMessageQueue = new EventQueue(false, (batch) => this.entityUpdateMessageQueueCallback(batch))
this.reset()
2020-12-09 16:07:56 +01:00
}
private reset() {
this.immediateReconnect = false
2021-12-23 14:03:23 +01:00
this.lastEntityEventIds.clear()
2021-12-23 14:03:23 +01:00
this.lastAddedBatchForGroup.clear()
2021-12-23 14:03:23 +01:00
this.eventQueue.pause()
2021-12-23 14:03:23 +01:00
this.eventQueue.clear()
2020-12-09 16:07:56 +01:00
this.serviceUnavailableRetry = null
2017-12-20 16:48:36 +01:00
}
2017-08-15 13:54:22 +02:00
/**
* Opens a WebSocket connection to receive server events.
* @param connectMode
2017-08-15 13:54:22 +02:00
*/
connect(connectMode: ConnectMode) {
console.log("ws connect reconnect:", connectMode === ConnectMode.Reconnect, "state:", this.state)
// make sure a retry will be cancelled by setting _serviceUnavailableRetry to null
this.serviceUnavailableRetry = null
2021-12-23 14:03:23 +01:00
2022-12-28 15:28:28 +01:00
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
2020-12-07 17:02:35 +01:00
// Task for updating events are number of groups + 2. Use 2 as base for reconnect state.
if (this.progressMonitor) {
// Say that the old monitor is completed so that we don't calculate its amount as still to do.
this.progressMonitor.completed()
}
2021-12-23 14:03:23 +01:00
2022-12-28 15:28:28 +01:00
this.progressMonitor = new ProgressMonitorDelegate(this.progressTracker, this.eventGroups().length + 2)
this.progressMonitor.workDone(1)
2021-12-23 14:03:23 +01:00
2022-02-28 15:42:16 +01:00
this.state = EventBusState.Automatic
this.connectTimer = null
2018-07-31 17:07:41 +02:00
const authHeaders = this.userFacade.createAuthHeaders()
2021-12-23 14:03:23 +01:00
2019-01-17 15:42:16 +01:00
// Native query building is not supported in old browser, mithril is not available in the worker
const authQuery =
"modelVersions=" +
sysModelInfo.version +
"." +
tutanotaModelInfo.version +
"&clientVersion=" +
env.versionNumber +
"&userId=" +
this.userFacade.getLoggedInUser()._id +
"&accessToken=" +
authHeaders.accessToken +
(this.lastAntiphishingMarkersId ? "&lastPhishingMarkersId=" + this.lastAntiphishingMarkersId : "")
const path = "/event?" + authQuery
this.unsubscribeFromOldWebsocket()
2021-12-23 14:03:23 +01:00
this.socket = this.socketFactory(path)
this.socket.onopen = () => this.onOpen(connectMode)
2022-02-28 15:42:16 +01:00
this.socket.onclose = (event: CloseEvent) => this.onClose(event)
this.socket.onerror = (error: any) => this.onError(error)
this.socket.onmessage = (message: MessageEvent<string>) => this.onMessage(message)
this.sleepDetector.start(() => {
console.log("ws sleep detected, reconnecting...")
this.tryReconnect(true, true)
})
2017-08-15 13:54:22 +02:00
}
/**
2017-09-28 11:32:02 +02:00
* Sends a close event to the server and finally closes the connection.
2017-12-20 16:48:36 +01:00
* The state of this event bus client is reset and the client is terminated (does not automatically reconnect) except reconnect == true
2017-08-15 13:54:22 +02:00
*/
async close(closeOption: CloseEventBusOption): Promise<void> {
console.log("ws close closeOption: ", closeOption, "state:", this.state)
2021-12-23 14:03:23 +01:00
switch (closeOption) {
case CloseEventBusOption.Terminate:
this.terminate()
2021-12-23 14:03:23 +01:00
break
2021-12-23 14:03:23 +01:00
case CloseEventBusOption.Pause:
2022-02-28 15:42:16 +01:00
this.state = EventBusState.Suspended
2021-12-23 14:03:23 +01:00
2022-12-28 15:28:28 +01:00
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
2021-12-23 14:03:23 +01:00
break
2021-12-23 14:03:23 +01:00
case CloseEventBusOption.Reconnect:
2022-12-28 15:28:28 +01:00
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
2021-12-23 14:03:23 +01:00
break
}
this.socket?.close()
2017-08-15 13:54:22 +02:00
}
async tryReconnect(closeIfOpen: boolean, enableAutomaticState: boolean, delay: number | null = null): Promise<void> {
2022-03-01 16:27:14 +01:00
console.log("ws tryReconnect closeIfOpen:", closeIfOpen, "enableAutomaticState:", enableAutomaticState, "delay:", delay)
2022-03-01 16:27:14 +01:00
if (this.reconnectTimer) {
// prevent reconnect race-condition
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
2022-03-01 16:27:14 +01:00
if (!delay) {
this.reconnect(closeIfOpen, enableAutomaticState)
} else {
this.reconnectTimer = setTimeout(() => this.reconnect(closeIfOpen, enableAutomaticState), delay)
}
}
2022-03-01 16:27:14 +01:00
// Returning promise for tests
private onOpen(connectMode: ConnectMode): Promise<void> {
this.failedConnectionAttempts = 0
console.log("ws open state:", this.state)
2022-03-01 16:27:14 +01:00
// Indicate some progress right away
this.progressMonitor.workDone(1)
2022-03-01 16:27:14 +01:00
const p = this.initEntityEvents(connectMode)
2021-12-23 14:03:23 +01:00
2022-12-28 15:28:28 +01:00
this.listener.onWebsocketStateChanged(WsConnectionState.connected)
2021-12-23 14:03:23 +01:00
2022-03-01 16:27:14 +01:00
return p
2017-12-20 16:48:36 +01:00
}
2022-02-28 15:42:16 +01:00
private onError(error: any) {
console.log("ws error:", error, JSON.stringify(error), "state:", this.state)
2017-08-15 13:54:22 +02:00
}
private async onMessage(message: MessageEvent<string>): Promise<void> {
2022-02-28 15:42:16 +01:00
const [type, value] = message.data.split(";")
2021-12-23 14:03:23 +01:00
2022-02-28 15:42:16 +01:00
switch (type) {
case MessageType.EntityUpdate: {
const data: WebsocketEntityData = await this.instanceMapper.decryptAndMapToInstance(
await resolveTypeReference(WebsocketEntityDataTypeRef),
2022-02-28 15:42:16 +01:00
JSON.parse(value),
null,
)
this.entityUpdateMessageQueue.add(data.eventBatchId, data.eventBatchOwner, data.eventBatch)
2022-02-28 15:42:16 +01:00
break
}
case MessageType.UnreadCounterUpdate: {
const counterData: WebsocketCounterData = await this.instanceMapper.decryptAndMapToInstance(
await resolveTypeReference(WebsocketCounterDataTypeRef),
2022-02-28 15:42:16 +01:00
JSON.parse(value),
null,
)
2022-12-28 15:28:28 +01:00
this.listener.onCounterChanged(counterData)
2022-02-28 15:42:16 +01:00
break
}
case MessageType.PhishingMarkers: {
const data: PhishingMarkerWebsocketData = await this.instanceMapper.decryptAndMapToInstance(
await resolveTypeReference(PhishingMarkerWebsocketDataTypeRef),
2022-02-28 15:42:16 +01:00
JSON.parse(value),
null,
)
this.lastAntiphishingMarkersId = data.lastId
2022-12-28 15:28:28 +01:00
this.listener.onPhishingMarkersReceived(data.markers)
2022-02-28 15:42:16 +01:00
break
}
case MessageType.LeaderStatus:
const data: WebsocketLeaderStatus = await this.instanceMapper.decryptAndMapToInstance(
await resolveTypeReference(WebsocketLeaderStatusTypeRef),
2022-02-28 15:42:16 +01:00
JSON.parse(value),
null,
)
await this.userFacade.setLeaderStatus(data)
2022-12-28 15:28:28 +01:00
await this.listener.onLeaderStatusChanged(data)
2022-02-28 15:42:16 +01:00
break
default:
console.log("ws message with unknown type", type)
break
2019-01-17 15:42:16 +01:00
}
2017-08-15 13:54:22 +02:00
}
2022-02-28 15:42:16 +01:00
private onClose(event: CloseEvent) {
this.failedConnectionAttempts++
console.log("ws close event:", event, "state:", this.state)
2021-12-23 14:03:23 +01:00
this.userFacade.setLeaderStatus(
createWebsocketLeaderStatus({
leaderStatus: false,
}),
2021-12-23 14:03:23 +01:00
)
this.sleepDetector.stop()
// Avoid running into penalties when trying to authenticate with an invalid session
// NotAuthenticatedException 401, AccessDeactivatedException 470, AccessBlocked 472
// do not catch session expired here because websocket will be reused when we authenticate again
const serverCode = event.code - 4000
2021-12-23 14:03:23 +01:00
if ([NotAuthorizedError.CODE, AccessDeactivatedError.CODE, AccessBlockedError.CODE].includes(serverCode)) {
this.terminate()
2022-12-28 15:28:28 +01:00
this.listener.onError(handleRestError(serverCode, "web socket error", null, null))
} else if (serverCode === SessionExpiredError.CODE) {
// session is expired. do not try to reconnect until the user creates a new session
2022-02-28 15:42:16 +01:00
this.state = EventBusState.Suspended
2022-12-28 15:28:28 +01:00
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
Offline login A lot of commits were squashed: these were the messages Adjust for partial login state on the worker side, #3888 Co-authored-by: tih<tih@tutao.de> Handle partial and full login on the client side, #3888 Co-authored-by: tih<tih@tutao.de> Attempt to log in when coming online if async login fails, #3888 Co-authored-by: tih<tih@tutao.de> Allow only Premium customers to log in offline, #3888 We return result from resumeLogin to not add a bespoke error class for our use case. We cannot use UserError there either. We should probably convert more expected errors to results as it's hard to know what should be handled when we reach UI layer. Co-authored-by: tih <tih@tutao.de> Allow fixing credentials after offline login, #3888 Also improve progress display in password prompt dialog Co-authored-by: tih <tih@tutao.de> Allow re-initializing cache storage when the first login fails, #3888 Co-authored-by: tih<tih@tutao.de> Improve LoginFacadeTest Do not keep credentials for retry if they cannot be used again, #3888 Co-authored-by: tih<tih@tutao.de> Fix initialising indexer from LoginFacade, #3888 Co-authored-by: tih<tih@tutao.de> Add offlineLoginRequiresPremium_msg for de, de_sie, #3888 Cleanup facades fields after review with jom Co-authored-by: tih <tih@tutao.de> Cleanup UserFacade, fix tests Co-authored-by: tih <tih@tutao.de> Fix waiting for full login in LoginController Co-authored-by: tih<tih@tutao.de> Split AsyncActions into partial and full login Co-authored-by: tih <tih@tutao.de> Cleanup registering of post-login actions Co-authored-by: tih <tih@tutao.de> Change offlineDb parameter on CredentialsProvider after review with jom Co-authored-by: tih <tih@tutao.de> Remove out-of-sync handling on cache init Current approach does not work in offline, we are going to reimplement it. See #4067 Suppress connection error during offline login, #3888 Do not wait for ConnectionError in LoginFacadeTest * ConnectionError is suppressed in offline login Prevent endless waiting for contacts after offline login, fix #4078 * When the user is logged in offline the indexer has not been initialized and searching for contacts will never resolve. Handle UserGroupKeyNotFoundError, see #4078 The userGroupKey is saved after login. However, when the user logs in offline, the key is not saved and cannot be used to encrypt. We handle this in the same way we handle ConnectionErrors.
2022-04-13 10:23:09 +02:00
} else if (this.state === EventBusState.Automatic && this.userFacade.isFullyLoggedIn()) {
2022-12-28 15:28:28 +01:00
this.listener.onWebsocketStateChanged(WsConnectionState.connecting)
if (this.immediateReconnect) {
this.immediateReconnect = false
2021-12-23 14:03:23 +01:00
this.tryReconnect(false, false)
} else {
2021-12-28 13:53:11 +01:00
let reconnectionInterval: readonly [number, number]
if (serverCode === NORMAL_SHUTDOWN_CLOSE_CODE) {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.LARGE
} else if (this.failedConnectionAttempts === 1) {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.SMALL
} else if (this.failedConnectionAttempts === 2) {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.MEDIUM
} else {
2022-02-28 15:19:23 +01:00
reconnectionInterval = RECONNECT_INTERVAL.LARGE
}
2021-12-23 14:03:23 +01:00
this.tryReconnect(false, false, SECOND_MS * randomIntFromInterval(reconnectionInterval[0], reconnectionInterval[1]))
2017-08-15 13:54:22 +02:00
}
}
}
2022-03-01 16:27:14 +01:00
private async initEntityEvents(connectMode: ConnectMode): Promise<void> {
// pause processing entity update message while initializing event queue
this.entityUpdateMessageQueue.pause()
2021-12-23 14:03:23 +01:00
2022-03-01 16:27:14 +01:00
// pause event queue and add all missed entity events first
this.eventQueue.pause()
2022-03-01 16:27:14 +01:00
const existingConnection = connectMode == ConnectMode.Reconnect && this.lastEntityEventIds.size > 0
2022-12-27 15:37:40 +01:00
const p = existingConnection ? this.loadMissedEntityEvents() : this.initOnNewConnection()
2017-08-15 13:54:22 +02:00
2022-03-01 16:27:14 +01:00
return p
.then(() => {
this.entityUpdateMessageQueue.resume()
this.eventQueue.resume()
})
2022-12-27 15:37:40 +01:00
.catch(
ofClass(ConnectionError, (e) => {
console.log("ws not connected in connect(), close websocket", e)
this.close(CloseEventBusOption.Reconnect)
}),
)
.catch(
ofClass(CancelledError, () => {
// the processing was aborted due to a reconnect. do not reset any attributes because they might already be in use since reconnection
console.log("ws cancelled retry process entity events after reconnect")
}),
)
.catch(
ofClass(ServiceUnavailableError, async (e) => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
// some EventBatches/missed events are processed already now
// for an existing connection we just keep the current state and continue loading missed events for the other groups
// for a new connection we reset the last entity event ids because otherwise this would not be completed in the next try
if (!existingConnection) {
this.lastEntityEventIds.clear()
2022-03-01 16:27:14 +01:00
}
2022-12-27 15:37:40 +01:00
console.log("ws retry init entity events in ", RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS, e)
let promise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this.serviceUnavailableRetry === promise) {
console.log("ws retry initializing entity events")
return this.initEntityEvents(connectMode)
} else {
console.log("ws cancel initializing entity events")
}
})
this.serviceUnavailableRetry = promise
return promise
}),
)
.catch(
ofClass(OutOfSyncError, async (e) => {
// we did not check for updates for too long, so some missed EntityEventBatches can not be loaded any more
// purge cache if out of sync
await this.cache.purgeStorage()
// We want users to re-login. By the time we get here they probably already have loaded some entities which we cannot update
throw e
}),
)
.catch((e) => {
2022-03-01 16:27:14 +01:00
this.entityUpdateMessageQueue.resume()
2021-12-23 14:03:23 +01:00
2022-03-01 16:27:14 +01:00
this.eventQueue.resume()
2021-12-23 14:03:23 +01:00
2022-12-28 15:28:28 +01:00
this.listener.onError(e)
2022-03-01 16:27:14 +01:00
})
}
private async initOnNewConnection() {
2022-12-27 15:37:40 +01:00
const { lastIds, someIdsWereCached } = await this.retrieveLastEntityEventIds()
2022-03-01 16:27:14 +01:00
// First, we record lastEntityEventIds. We need this to know what we need to re-fetch.
// This is not the same as the cache because we might have already downloaded them but cache might not have processed them yet.
// Important: do it in one step so that we don't have partial IDs in the map in case an error occurs.
this.lastEntityEventIds = lastIds
// Second, we need to initialize the cache too.
if (someIdsWereCached) {
// If some of the last IDs were retrieved from the cache then we want to load from that point to bring cache up-to-date. This is mostly important for
// persistent cache.
await this.loadMissedEntityEvents()
} else {
// If the cache is clean then this is a clean cache (either ephemeral after first connect or persistent with empty DB).
// We need to record the time even if we don't process anything to later know if we are out of sync or not.
await this.cache.recordSyncTime()
2017-08-15 13:54:22 +02:00
}
}
/**
* Gets the latest event batch ids for each of the users groups or min id if there is no event batch yet.
* This is needed to know from where to start loading missed events when we connect.
2017-08-15 13:54:22 +02:00
*/
2022-12-27 15:37:40 +01:00
private async retrieveLastEntityEventIds(): Promise<{ lastIds: Map<Id, Array<Id>>; someIdsWereCached: boolean }> {
// set all last event ids in one step to avoid that we have just set them for a few groups when a ServiceUnavailableError occurs
2020-12-11 14:45:42 +01:00
const lastIds: Map<Id, Array<Id>> = new Map()
let someIdsWereCached = false
for (const groupId of this.eventGroups()) {
const cachedBatchId = await this.cache.getLastEntityEventBatchForGroup(groupId)
if (cachedBatchId != null) {
lastIds.set(groupId, [cachedBatchId])
someIdsWereCached = true
} else {
const batches = await this.entity.loadRange(EntityEventBatchTypeRef, groupId, GENERATED_MAX_ID, 1, true)
const batchId = batches.length === 1 ? getElementId(batches[0]) : GENERATED_MIN_ID
lastIds.set(groupId, [batchId])
// In case we don't receive any events for the group this time we want to still download from this point next time.
await this.cache.setLastEntityEventBatchForGroup(groupId, batchId)
// We will not process any entities for this group so we consider this group "done"
this.progressMonitor.workDone(1)
}
}
2021-12-23 14:03:23 +01:00
2022-12-27 15:37:40 +01:00
return { lastIds, someIdsWereCached }
2017-08-15 13:54:22 +02:00
}
/** Load event batches since the last time we were connected to bring cache and other things up-to-date. */
private async loadMissedEntityEvents(): Promise<void> {
Offline login A lot of commits were squashed: these were the messages Adjust for partial login state on the worker side, #3888 Co-authored-by: tih<tih@tutao.de> Handle partial and full login on the client side, #3888 Co-authored-by: tih<tih@tutao.de> Attempt to log in when coming online if async login fails, #3888 Co-authored-by: tih<tih@tutao.de> Allow only Premium customers to log in offline, #3888 We return result from resumeLogin to not add a bespoke error class for our use case. We cannot use UserError there either. We should probably convert more expected errors to results as it's hard to know what should be handled when we reach UI layer. Co-authored-by: tih <tih@tutao.de> Allow fixing credentials after offline login, #3888 Also improve progress display in password prompt dialog Co-authored-by: tih <tih@tutao.de> Allow re-initializing cache storage when the first login fails, #3888 Co-authored-by: tih<tih@tutao.de> Improve LoginFacadeTest Do not keep credentials for retry if they cannot be used again, #3888 Co-authored-by: tih<tih@tutao.de> Fix initialising indexer from LoginFacade, #3888 Co-authored-by: tih<tih@tutao.de> Add offlineLoginRequiresPremium_msg for de, de_sie, #3888 Cleanup facades fields after review with jom Co-authored-by: tih <tih@tutao.de> Cleanup UserFacade, fix tests Co-authored-by: tih <tih@tutao.de> Fix waiting for full login in LoginController Co-authored-by: tih<tih@tutao.de> Split AsyncActions into partial and full login Co-authored-by: tih <tih@tutao.de> Cleanup registering of post-login actions Co-authored-by: tih <tih@tutao.de> Change offlineDb parameter on CredentialsProvider after review with jom Co-authored-by: tih <tih@tutao.de> Remove out-of-sync handling on cache init Current approach does not work in offline, we are going to reimplement it. See #4067 Suppress connection error during offline login, #3888 Do not wait for ConnectionError in LoginFacadeTest * ConnectionError is suppressed in offline login Prevent endless waiting for contacts after offline login, fix #4078 * When the user is logged in offline the indexer has not been initialized and searching for contacts will never resolve. Handle UserGroupKeyNotFoundError, see #4078 The userGroupKey is saved after login. However, when the user logs in offline, the key is not saved and cannot be used to encrypt. We handle this in the same way we handle ConnectionErrors.
2022-04-13 10:23:09 +02:00
if (!this.userFacade.isFullyLoggedIn()) {
return
}
2022-03-01 16:42:57 +01:00
await this.checkOutOfSync()
for (let groupId of this.eventGroups()) {
const eventBatches = await this.loadEntityEventsForGroup(groupId)
if (eventBatches.length === 0) {
// There won't be a callback from the queue to process the event so we mark this group as
// completed right away
this.progressMonitor.workDone(1)
} else {
for (const batch of eventBatches) {
this.addBatch(getElementId(batch), groupId, batch.events)
}
}
}
// We've loaded all the batches, we've added them to the queue, we can let the cache remember sync point for us to detect out of sync now.
// It is possible that we will record the time before the batch will be processed but the risk is low.
await this.cache.recordSyncTime()
}
private async loadEntityEventsForGroup(groupId: Id): Promise<EntityEventBatch[]> {
try {
return await this.entity.loadAll(EntityEventBatchTypeRef, groupId, this.getLastEventBatchIdOrMinIdForGroup(groupId))
} catch (e) {
if (e instanceof NotAuthorizedError) {
console.log("ws could not download entity updates, lost permission")
return []
} else {
throw e
}
}
}
private async checkOutOfSync() {
// We try to detect whether event batches have already expired.
// If this happened we don't need to download anything, we need to purge the cache and start all over.
if (await this.cache.isOutOfSync()) {
// Allow the progress bar to complete
this.progressMonitor.completed()
// We handle it where we initialize the connection and purge the cache there.
throw new OutOfSyncError("some missed EntityEventBatches cannot be loaded any more")
2017-08-15 13:54:22 +02:00
}
}
2022-03-01 16:27:14 +01:00
private async eventQueueCallback(modification: QueuedBatch): Promise<void> {
try {
await this.processEventBatch(modification)
} catch (e) {
console.log("ws error while processing event batches", e)
2022-12-28 15:28:28 +01:00
this.listener.onError(e)
2022-03-01 16:27:14 +01:00
throw e
}
// If we completed the event, it was added before
const lastForGroup = assertNotNull(this.lastAddedBatchForGroup.get(modification.groupId))
if (isSameId(modification.batchId, lastForGroup) || firstBiggerThanSecond(modification.batchId, lastForGroup)) {
this.progressMonitor && this.progressMonitor.workDone(1)
}
}
private async entityUpdateMessageQueueCallback(batch: QueuedBatch): Promise<void> {
this.addBatch(batch.batchId, batch.groupId, batch.events)
this.eventQueue.resume()
}
private unsubscribeFromOldWebsocket() {
if (this.socket) {
// Remove listeners. We don't want old socket to mess our state
this.socket.onopen = this.socket.onclose = this.socket.onerror = this.socket.onmessage = identity
}
}
private async terminate(): Promise<void> {
this.state = EventBusState.Terminated
this.reset()
2022-12-28 15:28:28 +01:00
this.listener.onWebsocketStateChanged(WsConnectionState.terminated)
2022-03-01 16:27:14 +01:00
}
/**
* Tries to reconnect the websocket if it is not connected.
*/
private reconnect(closeIfOpen: boolean, enableAutomaticState: boolean) {
console.log(
"ws reconnect socket.readyState: (CONNECTING=0, OPEN=1, CLOSING=2, CLOSED=3): " + (this.socket ? this.socket.readyState : "null"),
"state:",
this.state,
"closeIfOpen:",
closeIfOpen,
"enableAutomaticState:",
enableAutomaticState,
)
if (this.state !== EventBusState.Terminated && enableAutomaticState) {
this.state = EventBusState.Automatic
}
if (closeIfOpen && this.socket && this.socket.readyState === WebSocket.OPEN) {
this.immediateReconnect = true
this.socket.close()
} else if (
(this.socket == null || this.socket.readyState === WebSocket.CLOSED || this.socket.readyState === WebSocket.CLOSING) &&
this.state !== EventBusState.Terminated &&
Offline login A lot of commits were squashed: these were the messages Adjust for partial login state on the worker side, #3888 Co-authored-by: tih<tih@tutao.de> Handle partial and full login on the client side, #3888 Co-authored-by: tih<tih@tutao.de> Attempt to log in when coming online if async login fails, #3888 Co-authored-by: tih<tih@tutao.de> Allow only Premium customers to log in offline, #3888 We return result from resumeLogin to not add a bespoke error class for our use case. We cannot use UserError there either. We should probably convert more expected errors to results as it's hard to know what should be handled when we reach UI layer. Co-authored-by: tih <tih@tutao.de> Allow fixing credentials after offline login, #3888 Also improve progress display in password prompt dialog Co-authored-by: tih <tih@tutao.de> Allow re-initializing cache storage when the first login fails, #3888 Co-authored-by: tih<tih@tutao.de> Improve LoginFacadeTest Do not keep credentials for retry if they cannot be used again, #3888 Co-authored-by: tih<tih@tutao.de> Fix initialising indexer from LoginFacade, #3888 Co-authored-by: tih<tih@tutao.de> Add offlineLoginRequiresPremium_msg for de, de_sie, #3888 Cleanup facades fields after review with jom Co-authored-by: tih <tih@tutao.de> Cleanup UserFacade, fix tests Co-authored-by: tih <tih@tutao.de> Fix waiting for full login in LoginController Co-authored-by: tih<tih@tutao.de> Split AsyncActions into partial and full login Co-authored-by: tih <tih@tutao.de> Cleanup registering of post-login actions Co-authored-by: tih <tih@tutao.de> Change offlineDb parameter on CredentialsProvider after review with jom Co-authored-by: tih <tih@tutao.de> Remove out-of-sync handling on cache init Current approach does not work in offline, we are going to reimplement it. See #4067 Suppress connection error during offline login, #3888 Do not wait for ConnectionError in LoginFacadeTest * ConnectionError is suppressed in offline login Prevent endless waiting for contacts after offline login, fix #4078 * When the user is logged in offline the indexer has not been initialized and searching for contacts will never resolve. Handle UserGroupKeyNotFoundError, see #4078 The userGroupKey is saved after login. However, when the user logs in offline, the key is not saved and cannot be used to encrypt. We handle this in the same way we handle ConnectionErrors.
2022-04-13 10:23:09 +02:00
this.userFacade.isFullyLoggedIn()
2022-03-01 16:27:14 +01:00
) {
// Don't try to connect right away because connection may not be actually there
// see #1165
if (this.connectTimer) {
clearTimeout(this.connectTimer)
}
this.connectTimer = setTimeout(() => this.connect(ConnectMode.Reconnect), 100)
}
}
private addBatch(batchId: Id, groupId: Id, events: ReadonlyArray<EntityUpdate>) {
const lastForGroup = this.lastEntityEventIds.get(groupId) || []
2020-12-10 17:18:19 +01:00
// find the position for inserting into last entity events (negative value is considered as not present in the array)
const index = binarySearch(lastForGroup, batchId, compareOldestFirst)
2020-12-11 14:45:42 +01:00
let wasAdded
2021-12-23 14:03:23 +01:00
if (index < 0) {
lastForGroup.splice(-index, 0, batchId)
2020-12-10 17:18:19 +01:00
// only add the batch if it was not process before
wasAdded = this.eventQueue.add(batchId, groupId, events)
2020-12-11 14:45:42 +01:00
} else {
wasAdded = false
}
2021-12-23 14:03:23 +01:00
2022-02-28 15:19:23 +01:00
if (lastForGroup.length > MAX_EVENT_IDS_QUEUE_LENGTH) {
lastForGroup.shift()
2017-08-15 13:54:22 +02:00
}
2021-12-23 14:03:23 +01:00
this.lastEntityEventIds.set(batchId, lastForGroup)
2020-12-11 14:45:42 +01:00
if (wasAdded) {
this.lastAddedBatchForGroup.set(groupId, batchId)
2020-12-11 14:45:42 +01:00
}
}
2022-03-01 16:42:57 +01:00
private async processEventBatch(batch: QueuedBatch): Promise<void> {
try {
if (this.isTerminated()) return
const filteredEvents = await this.cache.entityEventsReceived(batch)
2022-12-28 15:28:28 +01:00
if (!this.isTerminated()) await this.listener.onEntityEventsReceived(filteredEvents, batch.batchId, batch.groupId)
2022-03-01 16:42:57 +01:00
} catch (e) {
if (e instanceof ServiceUnavailableError) {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
console.log("ws retry processing event in 30s", e)
const retryPromise = delay(RETRY_AFTER_SERVICE_UNAVAILABLE_ERROR_MS).then(() => {
// if we have a websocket reconnect we have to stop retrying
if (this.serviceUnavailableRetry === retryPromise) {
return this.processEventBatch(batch)
} else {
throw new CancelledError("stop retry processing after service unavailable due to reconnect")
}
})
2022-03-01 16:42:57 +01:00
this.serviceUnavailableRetry = retryPromise
return retryPromise
} else {
throw e
}
2022-03-01 16:42:57 +01:00
}
2017-08-15 13:54:22 +02:00
}
private getLastEventBatchIdOrMinIdForGroup(groupId: Id): Id {
const lastIds = this.lastEntityEventIds.get(groupId)
2021-12-23 14:03:23 +01:00
return lastIds && lastIds.length > 0 ? lastThrow(lastIds) : GENERATED_MIN_ID
}
2022-03-01 16:42:57 +01:00
private isTerminated() {
return this.state === EventBusState.Terminated
2017-08-15 13:54:22 +02:00
}
private eventGroups(): Id[] {
return this.userFacade
2022-12-27 15:37:40 +01:00
.getLoggedInUser()
.memberships.filter((membership) => membership.groupType !== GroupType.MailingList)
.map((membership) => membership.group)
.concat(this.userFacade.getLoggedInUser().userGroup.group)
}
2022-12-27 15:37:40 +01:00
}