2017-08-15 13:54:22 +02:00
// @flow
2017-11-23 13:30:17 +01:00
import type { LoginFacade } from "./facades/LoginFacade"
import type { MailFacade } from "./facades/MailFacade"
import type { WorkerImpl } from "./WorkerImpl"
2019-01-17 15:42:16 +01:00
import { decryptAndMapToInstance } from "./crypto/CryptoFacade"
2019-03-14 11:55:37 +01:00
import { assertWorkerOrNode , getWebsocketOrigin , isAdminClient , isTest , Mode } from "../Env"
2017-08-15 13:54:22 +02:00
import { _TypeModel as MailTypeModel } from "../entities/tutanota/Mail"
2020-11-04 15:52:09 +01:00
import {
compareOldestFirst ,
firstBiggerThanSecond ,
GENERATED _MAX _ID ,
GENERATED _MIN _ID ,
getElementId ,
getLetId ,
isSameId
} from "../common/EntityFunctions"
2019-10-14 15:47:46 +02:00
import {
AccessBlockedError ,
AccessDeactivatedError ,
ConnectionError ,
handleRestError ,
NotAuthorizedError ,
2020-03-16 17:37:50 +01:00
ServiceUnavailableError ,
SessionExpiredError
2019-10-14 15:47:46 +02:00
} from "../common/error/RestError"
2017-08-15 13:54:22 +02:00
import { EntityEventBatchTypeRef } from "../entities/sys/EntityEventBatch"
2020-12-07 12:26:39 +01:00
import { downcast , identity , neverNull , randomIntFromInterval } from "../common/utils/Utils"
2017-08-15 13:54:22 +02:00
import { OutOfSyncError } from "../common/error/OutOfSyncError"
2020-11-04 15:52:09 +01:00
import { binarySearch , contains , lastThrow } from "../common/utils/ArrayUtils"
2017-11-23 13:30:17 +01:00
import type { Indexer } from "./search/Indexer"
2018-08-02 13:16:24 +02:00
import type { CloseEventBusOptionEnum } from "../common/TutanotaConstants"
2019-10-14 15:47:46 +02:00
import { CloseEventBusOption , GroupType , SECOND _MS } from "../common/TutanotaConstants"
2020-06-02 13:39:22 +02:00
import type { WebsocketEntityData } from "../entities/sys/WebsocketEntityData"
2019-01-17 15:42:16 +01:00
import { _TypeModel as WebsocketEntityDataTypeModel } from "../entities/sys/WebsocketEntityData"
2019-07-29 15:49:41 +02:00
import { CancelledError } from "../common/error/CancelledError"
2020-03-16 17:37:50 +01:00
import { _TypeModel as PhishingMarkerWebsocketDataTypeModel } from "../entities/tutanota/PhishingMarkerWebsocketData"
2019-08-30 14:00:15 +02:00
import type { EntityUpdate } from "../entities/sys/EntityUpdate"
2019-08-22 18:24:32 +02:00
import type { EntityRestInterface } from "./rest/EntityRestClient"
2020-10-05 14:49:34 +02:00
import { EntityClient } from "../common/EntityClient"
2020-11-04 15:52:09 +01:00
import type { QueuedBatch } from "./search/EventQueue"
import { EventQueue } from "./search/EventQueue"
2020-10-13 17:15:52 +02:00
import { _TypeModel as WebsocketLeaderStatusTypeModel , createWebsocketLeaderStatus } from "../entities/sys/WebsocketLeaderStatus"
2020-11-17 16:29:19 +01:00
import { ProgressMonitorDelegate } from "./ProgressMonitorDelegate"
2020-12-07 12:26:39 +01:00
import type { IProgressMonitor } from "../common/utils/ProgressMonitor"
import { NoopProgressMonitor } from "../common/utils/ProgressMonitor"
2017-08-15 13:54:22 +02:00
assertWorkerOrNode ( )
2018-08-02 13:16:24 +02:00
2019-02-05 17:26:36 +01:00
const EventBusState = Object . freeze ( {
2018-08-02 13:16:24 +02:00
Automatic : "automatic" , // automatic reconnection is enabled
Suspended : "suspended" , // automatic reconnection is suspended but can be enabled again
Terminated : "terminated" // automatic reconnection is disabled and websocket is closed but can be opened again by calling connect explicit
2019-02-05 17:26:36 +01:00
} )
2018-08-02 13:16:24 +02:00
type EventBusStateEnum = $Values < typeof EventBusState > ;
2020-09-15 11:35:25 +02:00
// EntityEventBatches expire after 45 days. keep a time diff security of one day.
const ENTITY _EVENT _BATCH _EXPIRE _MS = 44 * 24 * 60 * 60 * 1000
2019-07-29 15:49:41 +02:00
const RETRY _AFTER _SERVICE _UNAVAILABLE _ERROR _MS = 30000
2019-10-14 15:47:46 +02:00
const NORMAL _SHUTDOWN _CLOSE _CODE = 1
const LARGE _RECONNECT _INTERVAL = [ 60 , 120 ]
const SMALL _RECONNECT _INTERVAL = [ 5 , 10 ]
const MEDIUM _RECONNECT _INTERVAL = [ 20 , 40 ]
2018-08-02 13:16:24 +02:00
2017-08-15 13:54:22 +02:00
export class EventBusClient {
2017-09-19 14:54:04 +02:00
_MAX _EVENT _IDS _QUEUE _LENGTH : number ;
2017-11-23 13:30:17 +01:00
_indexer : Indexer ;
2019-04-26 16:14:16 +02:00
_cache : EntityRestInterface ;
2020-10-05 14:49:34 +02:00
_entity : EntityClient ;
2017-11-23 13:30:17 +01:00
_worker : WorkerImpl ;
_mail : MailFacade ;
_login : LoginFacade ;
2018-08-02 13:16:24 +02:00
_state : EventBusStateEnum ;
2017-08-15 13:54:22 +02:00
_socket : ? WebSocket ;
_immediateReconnect : boolean ; // if true tries to reconnect immediately after the websocket is closed
2020-11-04 15:52:09 +01:00
/ * *
* Map from group id to last event ids ( max . _MAX _EVENT _IDS _QUEUE _LENGTH ) . We keep them to avoid processing the same event twice if
* it comes out of order from the server ) and for requesting missed entity events on reconnect .
*
* We do not have to update these event ids if the groups of the user change because we always take the current users groups from the
* LoginFacade .
*
* @ private
* /
_lastEntityEventIds : { [ groupId : Id ] : Array < Id > } ;
2020-09-15 11:35:25 +02:00
_lastUpdateTime : number ; // the last time we received an EntityEventBatch or checked for updates. we use this to find out if our data has expired, see ENTITY_EVENT_BATCH_EXPIRE_MS
2020-03-16 17:37:50 +01:00
_lastAntiphishingMarkersId : ? Id ;
2017-08-15 13:54:22 +02:00
2020-12-10 17:18:19 +01:00
/* queue to process all events. */
2020-11-04 15:52:09 +01:00
_eventQueue : EventQueue ;
2020-12-10 17:18:19 +01:00
/* queue that handles incoming websocket messages while. */
_entityUpdateMessageQueue : EventQueue ;
2017-08-15 13:54:22 +02:00
2019-07-29 15:49:41 +02:00
_reconnectTimer : ? TimeoutID ;
2019-06-08 17:07:33 +02:00
_connectTimer : ? TimeoutID ;
2019-07-29 15:49:41 +02:00
/ * *
* Represents a currently retried executing due to a ServiceUnavailableError
* /
_serviceUnavailableRetry : ? Promise < void > ;
2019-10-14 15:47:46 +02:00
_failedConnectionAttempts : number = 0 ;
2020-11-04 15:52:09 +01:00
_progressMonitor : IProgressMonitor ;
2019-07-29 15:49:41 +02:00
2019-04-26 16:14:16 +02:00
constructor ( worker : WorkerImpl , indexer : Indexer , cache : EntityRestInterface , mail : MailFacade , login : LoginFacade ) {
2017-11-23 13:30:17 +01:00
this . _worker = worker
this . _indexer = indexer
this . _cache = cache
2020-10-05 14:49:34 +02:00
this . _entity = new EntityClient ( cache )
2017-11-23 13:30:17 +01:00
this . _mail = mail
this . _login = login
2017-08-15 13:54:22 +02:00
this . _socket = null
2018-08-02 13:16:24 +02:00
this . _state = EventBusState . Automatic
2019-06-08 17:07:33 +02:00
this . _reconnectTimer = null
this . _connectTimer = null
2020-11-04 15:52:09 +01:00
this . _progressMonitor = new NoopProgressMonitor ( )
2017-08-15 13:54:22 +02:00
2020-12-10 17:18:19 +01:00
this . _eventQueue = new EventQueue ( true , ( modification ) => {
2020-11-04 15:52:09 +01:00
return this . _processEventBatch ( modification )
. catch ( ( e ) => {
console . log ( "Error while processing event batches" , e )
this . _worker . sendError ( e )
2020-12-10 17:18:19 +01:00
throw e
2020-11-04 15:52:09 +01:00
} )
. then ( ( ) => {
// If we completed the event, it cannot be empty
const lastForGroup = lastThrow ( this . _lastEntityEventIds [ modification . groupId ] )
if ( isSameId ( modification . batchId , lastForGroup ) || firstBiggerThanSecond ( modification . batchId , lastForGroup ) ) {
this . _progressMonitor && this . _progressMonitor . workDone ( 1 )
}
} )
} )
2020-12-09 16:07:56 +01:00
2020-12-10 17:18:19 +01:00
this . _entityUpdateMessageQueue = new EventQueue ( false , ( batch ) => {
return Promise . resolve ( this . _addBatch ( batch . batchId , batch . groupId , batch . events ) )
. then ( ( ) => this . _eventQueue . resume ( ) )
} )
2020-12-09 16:07:56 +01:00
this . _reset ( )
// we store the last 1000 event ids per group, so we know if an event was already processed.
// it is not sufficient to check the last event id because a smaller event id may arrive later
// than a bigger one if the requests are processed in parallel on the server
this . _MAX _EVENT _IDS _QUEUE _LENGTH = 1000
}
_reset ( ) : void {
this . _immediateReconnect = false
this . _lastEntityEventIds = { }
this . _lastUpdateTime = 0
this . _eventQueue . pause ( )
this . _eventQueue . clear ( )
2019-07-29 15:49:41 +02:00
this . _serviceUnavailableRetry = null
2017-12-20 16:48:36 +01:00
}
2017-08-15 13:54:22 +02:00
/ * *
* Opens a WebSocket connection to receive server events .
* @ param reconnect Set to true if the connection has been opened before .
* @ returns The event bus client object .
* /
connect ( reconnect : boolean ) {
if ( env . mode === Mode . Test ) {
return
}
2018-10-19 18:13:58 +02:00
2019-09-19 16:11:33 +02:00
console . log ( new Date ( ) . toISOString ( ) , "ws connect reconnect=" , reconnect , "state:" , this . _state ) ;
2019-07-29 15:49:41 +02:00
// make sure a retry will be cancelled by setting _serviceUnavailableRetry to null
this . _serviceUnavailableRetry = null
2018-08-15 13:56:56 +02:00
this . _worker . updateWebSocketState ( "connecting" )
2020-11-04 15:52:09 +01:00
2020-12-07 17:02:35 +01:00
// Task for updating events are number of groups + 2. Use 2 as base for reconnect state.
2020-11-04 15:52:09 +01:00
this . _progressMonitor = ( reconnect )
2020-12-07 17:02:35 +01:00
? new ProgressMonitorDelegate ( this . _eventGroups ( ) . length + 2 , this . _worker )
2020-12-07 12:26:39 +01:00
: new NoopProgressMonitor ( )
2020-11-04 15:52:09 +01:00
this . _progressMonitor . workDone ( 1 )
2018-08-03 14:10:56 +02:00
this . _state = EventBusState . Automatic
2019-06-08 17:07:33 +02:00
this . _connectTimer = null
2018-07-31 17:07:41 +02:00
2019-01-17 15:42:16 +01:00
const authHeaders = this . _login . createAuthHeaders ( )
// Native query building is not supported in old browser, mithril is not available in the worker
const authQuery =
"modelVersions=" + WebsocketEntityDataTypeModel . version + "." + MailTypeModel . version
+ "&clientVersion=" + env . versionNumber
+ "&userId=" + this . _login . getLoggedInUser ( ) . _id
2020-03-16 17:37:50 +01:00
+ "&accessToken=" + authHeaders . accessToken
+ ( this . _lastAntiphishingMarkersId ? "&lastPhishingMarkersId=" + this . _lastAntiphishingMarkersId : "" )
2019-01-17 15:42:16 +01:00
let url = getWebsocketOrigin ( ) + "/event?" + authQuery ;
2018-08-15 13:56:56 +02:00
this . _unsubscribeFromOldWebsocket ( )
2017-08-15 13:54:22 +02:00
this . _socket = new WebSocket ( url ) ;
this . _socket . onopen = ( ) => {
2019-10-14 15:47:46 +02:00
this . _failedConnectionAttempts = 0
2018-08-03 14:10:56 +02:00
console . log ( "ws open: " , new Date ( ) , "state:" , this . _state ) ;
2020-10-05 14:57:11 +02:00
// Indicate some progress right away
2020-11-04 15:52:09 +01:00
this . _progressMonitor . workDone ( 1 )
this . _initEntityEvents ( reconnect )
2018-08-02 09:27:37 +02:00
this . _worker . updateWebSocketState ( "connected" )
2017-08-15 13:54:22 +02:00
} ;
this . _socket . onclose = ( event : CloseEvent ) => this . _close ( event ) ;
this . _socket . onerror = ( error : any ) => this . _error ( error ) ;
this . _socket . onmessage = ( message : MessageEvent ) => this . _message ( message ) ;
}
2020-11-04 15:52:09 +01:00
_initEntityEvents ( reconnect : boolean ) {
2020-12-10 17:18:19 +01:00
// pause processing entity update message while initializing event queue
this . _entityUpdateMessageQueue . pause ( )
// pause event queue and add all missed entity events first
this . _eventQueue . pause ( )
2020-09-15 16:37:42 +02:00
let existingConnection = reconnect && Object . keys ( this . _lastEntityEventIds ) . length > 0
2020-11-04 15:52:09 +01:00
let p = existingConnection ? this . _loadMissedEntityEvents ( ) : this . _setLatestEntityEventIds ( )
2020-12-10 17:18:19 +01:00
p . then ( ( ) => {
this . _entityUpdateMessageQueue . resume ( )
this . _eventQueue . resume ( )
} ) . catch ( ConnectionError , e => {
2019-07-29 15:49:41 +02:00
console . log ( "not connected in connect(), close websocket" , e )
this . close ( CloseEventBusOption . Reconnect )
} ) . catch ( CancelledError , e => {
// the processing was aborted due to a reconnect. do not reset any attributes because they might already be in use since reconnection
console . log ( "cancelled retry process entity events after reconnect" )
} ) . catch ( ServiceUnavailableError , e => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
2020-09-15 16:37:42 +02:00
// some EventBatches/missed events are processed already now
// for an existing connection we just keep the current state and continue loading missed events for the other groups
// for a new connection we reset the last entity event ids because otherwise this would not be completed in the next try
if ( ! existingConnection ) {
this . _lastEntityEventIds = { }
this . _lastUpdateTime = 0
}
2019-07-29 15:49:41 +02:00
console . log ( "retry init entity events in 30s" , e )
let promise = Promise . delay ( RETRY _AFTER _SERVICE _UNAVAILABLE _ERROR _MS ) . then ( ( ) => {
// if we have a websocket reconnect we have to stop retrying
if ( this . _serviceUnavailableRetry === promise ) {
console . log ( "retry initializing entity events" )
2020-11-04 15:52:09 +01:00
return this . _initEntityEvents ( reconnect )
2019-07-29 15:49:41 +02:00
} else {
console . log ( "cancel initializing entity events" )
}
} )
this . _serviceUnavailableRetry = promise
return promise
} ) . catch ( e => {
2020-12-10 17:18:19 +01:00
this . _entityUpdateMessageQueue . resume ( )
this . _eventQueue . resume ( )
2019-07-29 15:49:41 +02:00
this . _worker . sendError ( e )
2020-11-04 15:52:09 +01:00
} )
2019-07-29 15:49:41 +02:00
}
2017-08-15 13:54:22 +02:00
/ * *
2017-09-28 11:32:02 +02:00
* Sends a close event to the server and finally closes the connection .
2017-12-20 16:48:36 +01:00
* The state of this event bus client is reset and the client is terminated ( does not automatically reconnect ) except reconnect == true
2017-08-15 13:54:22 +02:00
* /
2018-08-02 13:16:24 +02:00
close ( closeOption : CloseEventBusOptionEnum ) {
2019-09-19 16:11:33 +02:00
console . log ( new Date ( ) . toISOString ( ) , "ws close closeOption: " , closeOption , "state:" , this . _state ) ;
2018-08-02 13:16:24 +02:00
switch ( closeOption ) {
case CloseEventBusOption . Terminate :
this . _terminate ( )
break
case CloseEventBusOption . Pause :
this . _state = EventBusState . Suspended
this . _worker . updateWebSocketState ( "connecting" )
break
case CloseEventBusOption . Reconnect :
this . _worker . updateWebSocketState ( "connecting" )
break ;
2017-09-19 14:54:04 +02:00
}
2018-08-02 13:16:24 +02:00
2017-08-15 13:54:22 +02:00
if ( this . _socket && this . _socket . close ) { // close is undefined in node tests
2018-08-02 13:16:24 +02:00
this . _socket . close ( )
2017-08-15 13:54:22 +02:00
}
}
2018-08-15 13:56:56 +02:00
_unsubscribeFromOldWebsocket ( ) {
if ( this . _socket ) {
// Remove listeners. We don't want old socket to mess our state
this . _socket . onopen = this . _socket . onclose = this . _socket . onerror = this . _socket . onmessage = identity
}
}
2017-12-20 16:48:36 +01:00
_terminate ( ) : void {
2018-08-02 13:16:24 +02:00
this . _state = EventBusState . Terminated
2017-12-20 16:48:36 +01:00
this . _reset ( )
2018-08-02 13:16:24 +02:00
this . _worker . updateWebSocketState ( "terminated" )
2017-12-20 16:48:36 +01:00
}
2017-08-15 13:54:22 +02:00
_error ( error : any ) {
2019-09-19 16:11:33 +02:00
console . log ( new Date ( ) . toISOString ( ) , "ws error: " , error , JSON . stringify ( error ) , "state:" , this . _state ) ;
2017-08-15 13:54:22 +02:00
}
_message ( message : MessageEvent ) : Promise < void > {
2019-03-14 11:55:37 +01:00
//console.log("ws message: ", message.data);
2019-01-17 15:42:16 +01:00
const [ type , value ] = downcast ( message . data ) . split ( ";" )
if ( type === "entityUpdate" ) {
2020-11-04 15:52:09 +01:00
// specify type of decrypted entity explicitly because decryptAndMapToInstance effectively returns `any`
return decryptAndMapToInstance ( WebsocketEntityDataTypeModel , JSON . parse ( value ) , null ) . then ( ( data : WebsocketEntityData ) => {
2020-12-10 17:18:19 +01:00
this . _entityUpdateMessageQueue . add ( data . eventBatchId , data . eventBatchOwner , data . eventBatch )
2020-11-04 15:52:09 +01:00
} )
2019-01-21 10:48:07 +01:00
} else if ( type === "unreadCounterUpdate" ) {
this . _worker . updateCounter ( JSON . parse ( value ) )
2020-03-16 17:37:50 +01:00
} else if ( type === "phishingMarkers" ) {
return decryptAndMapToInstance ( PhishingMarkerWebsocketDataTypeModel , JSON . parse ( value ) , null )
. then ( ( data ) => {
this . _lastAntiphishingMarkersId = data . lastId
2020-06-02 13:39:22 +02:00
this . _mail . phishingMarkersUpdateReceived ( data . markers )
2020-03-16 17:37:50 +01:00
} )
2020-10-13 17:15:52 +02:00
} else if ( type === "leaderStatus" ) {
return decryptAndMapToInstance ( WebsocketLeaderStatusTypeModel , JSON . parse ( value ) , null )
. then ( status => {
return this . _login . setLeaderStatus ( status )
} )
2020-03-16 17:37:50 +01:00
} else {
console . log ( "ws message with unknown type" , type )
2019-01-17 15:42:16 +01:00
}
return Promise . resolve ( )
2017-08-15 13:54:22 +02:00
}
_close ( event : CloseEvent ) {
2019-10-14 15:47:46 +02:00
this . _failedConnectionAttempts ++
2019-09-19 16:11:33 +02:00
console . log ( new Date ( ) . toISOString ( ) , "ws _close: " , event , "state:" , this . _state ) ;
2020-10-13 17:15:52 +02:00
this . _login . setLeaderStatus ( createWebsocketLeaderStatus ( { leaderStatus : false } ) )
2017-09-21 10:35:10 +02:00
// Avoid running into penalties when trying to authenticate with an invalid session
// NotAuthenticatedException 401, AccessDeactivatedException 470, AccessBlocked 472
// do not catch session expired here because websocket will be reused when we authenticate again
2019-10-14 15:47:46 +02:00
const serverCode = event . code - 4000
if ( [ NotAuthorizedError . CODE , AccessDeactivatedError . CODE , AccessBlockedError . CODE ] . includes ( serverCode ) ) {
2017-12-20 16:48:36 +01:00
this . _terminate ( )
2020-04-01 20:39:29 +02:00
this . _worker . sendError ( handleRestError ( serverCode , "web socket error" ) )
2019-10-14 15:47:46 +02:00
} else if ( serverCode === SessionExpiredError . CODE ) {
2018-12-07 14:12:49 +01:00
// session is expired. do not try to reconnect until the user creates a new session
this . _state = EventBusState . Suspended
this . _worker . updateWebSocketState ( "connecting" )
} else if ( this . _state === EventBusState . Automatic && this . _login . isLoggedIn ( ) ) {
2018-08-02 13:16:24 +02:00
this . _worker . updateWebSocketState ( "connecting" )
2019-03-14 11:55:37 +01:00
if ( this . _immediateReconnect ) {
2017-08-15 13:54:22 +02:00
this . _immediateReconnect = false
2018-08-03 14:10:56 +02:00
this . tryReconnect ( false , false ) ;
2019-06-08 17:07:33 +02:00
} else {
2019-10-14 15:47:46 +02:00
let reconnectionInterval : [ number , number ]
if ( serverCode === NORMAL _SHUTDOWN _CLOSE _CODE ) {
reconnectionInterval = LARGE _RECONNECT _INTERVAL
} else if ( this . _failedConnectionAttempts === 1 ) {
reconnectionInterval = SMALL _RECONNECT _INTERVAL
} else if ( this . _failedConnectionAttempts === 2 ) {
reconnectionInterval = MEDIUM _RECONNECT _INTERVAL
} else {
reconnectionInterval = LARGE _RECONNECT _INTERVAL
}
this . tryReconnect ( false , false , SECOND _MS * randomIntFromInterval ( reconnectionInterval [ 0 ] , reconnectionInterval [ 1 ] ) )
2017-08-15 13:54:22 +02:00
}
2019-06-08 17:07:33 +02:00
}
}
tryReconnect ( closeIfOpen : boolean , enableAutomaticState : boolean , delay : ? number = null ) {
2019-10-14 15:47:46 +02:00
console . log ( "tryReconnect, closeIfOpen" , closeIfOpen , "enableAutomaticState" , enableAutomaticState , "delay" , delay )
2019-06-08 17:07:33 +02:00
if ( this . _reconnectTimer ) {
// prevent reconnect race-condition
clearTimeout ( this . _reconnectTimer )
this . _reconnectTimer = null
}
if ( ! delay ) {
this . _reconnect ( closeIfOpen , enableAutomaticState )
} else {
2019-10-01 13:44:31 +02:00
this . _reconnectTimer = setTimeout ( ( ) => this . _reconnect ( closeIfOpen , enableAutomaticState ) , delay ) ;
2017-08-15 13:54:22 +02:00
}
}
/ * *
* Tries to reconnect the websocket if it is not connected .
* /
2019-06-08 17:07:33 +02:00
_reconnect ( closeIfOpen : boolean , enableAutomaticState : boolean ) {
2019-09-19 16:11:33 +02:00
console . log ( new Date ( ) . toISOString ( ) , "ws _reconnect socket state (CONNECTING=0, OPEN=1, CLOSING=2, CLOSED=3): "
2019-03-14 11:55:37 +01:00
+ ( ( this . _socket ) ? this . _socket . readyState : "null" ) , "state:" , this . _state ,
"closeIfOpen" , closeIfOpen , "enableAutomaticState" , enableAutomaticState ) ;
2018-08-03 14:10:56 +02:00
if ( this . _state !== EventBusState . Terminated && enableAutomaticState ) {
this . _state = EventBusState . Automatic
}
2018-07-26 14:25:29 +02:00
if ( closeIfOpen && this . _socket && this . _socket . readyState === WebSocket . OPEN ) {
2017-08-15 13:54:22 +02:00
this . _immediateReconnect = true
neverNull ( this . _socket ) . close ( ) ;
2018-08-02 13:16:24 +02:00
} else if (
( this . _socket == null || this . _socket . readyState === WebSocket . CLOSED
2018-08-03 14:10:56 +02:00
|| this . _socket . readyState === WebSocket . CLOSING )
2018-08-02 13:16:24 +02:00
&& this . _state !== EventBusState . Terminated
2018-07-26 17:37:44 +02:00
&& this . _login . isLoggedIn ( ) ) {
2019-03-13 14:53:13 +01:00
// Don't try to connect right away because connection may not be actually there
// see #1165
2019-06-08 17:07:33 +02:00
if ( this . _connectTimer ) {
clearTimeout ( this . _connectTimer )
}
this . _connectTimer = setTimeout ( ( ) => this . connect ( true ) , 100 )
2017-08-15 13:54:22 +02:00
}
}
2019-07-29 15:49:41 +02:00
2017-08-15 13:54:22 +02:00
/ * *
* stores the latest event batch ids for each of the users groups or min id if there is no event batch yet .
* this is needed to know from where to start loading missed events after a reconnect
* /
_setLatestEntityEventIds ( ) : Promise < void > {
2019-07-29 15:49:41 +02:00
// set all last event ids in one step to avoid that we have just set them for a few groups when a ServiceUnavailableError occurs
let lastIds : { [ key : Id ] : Id [ ] } = { }
2018-12-11 09:41:02 +01:00
return Promise . each ( this . _eventGroups ( ) , groupId => {
2020-10-05 14:49:34 +02:00
return this . _entity . loadRange ( EntityEventBatchTypeRef , groupId , GENERATED _MAX _ID , 1 , true ) . then ( batches => {
2019-07-29 15:49:41 +02:00
lastIds [ groupId ] = [
2018-07-26 17:37:44 +02:00
( batches . length === 1 ) ? getLetId ( batches [ 0 ] ) [ 1 ] : GENERATED _MIN _ID
]
2017-08-15 13:54:22 +02:00
} )
} ) . then ( ( ) => {
2019-07-29 15:49:41 +02:00
this . _lastEntityEventIds = lastIds
2020-09-15 16:37:42 +02:00
this . _lastUpdateTime = Date . now ( )
2020-12-07 17:02:35 +01:00
this . _eventQueue . resume ( )
2017-08-15 13:54:22 +02:00
} )
}
2020-11-04 15:52:09 +01:00
_loadMissedEntityEvents ( ) : Promise < void > {
2017-11-23 13:30:17 +01:00
if ( this . _login . isLoggedIn ( ) ) {
2020-09-15 16:37:42 +02:00
if ( Date . now ( ) > this . _lastUpdateTime + ENTITY _EVENT _BATCH _EXPIRE _MS ) {
2020-09-15 11:35:25 +02:00
// we did not check for updates for too long, so some missed EntityEventBatches can not be loaded any more
return this . _worker . sendError ( new OutOfSyncError ( ) )
} else {
2020-11-04 15:52:09 +01:00
return Promise . each ( this . _eventGroups ( ) , ( groupId ) => {
return this . _entity
. loadAll ( EntityEventBatchTypeRef , groupId , this . _getLastEventBatchIdOrMinIdForGroup ( groupId ) )
. then ( ( eventBatches ) => {
2020-12-07 17:02:35 +01:00
if ( eventBatches . length === 0 ) {
// There won't be a callback from the queue to process the event so we mark this group as
// completed right away
this . _progressMonitor . workDone ( 1 )
} else {
for ( const batch of eventBatches ) {
this . _addBatch ( getElementId ( batch ) , groupId , batch . events )
}
2020-11-04 15:52:09 +01:00
}
}
)
2020-10-05 14:49:34 +02:00
. catch ( NotAuthorizedError , ( ) => {
console . log ( "could not download entity updates => lost permission" )
2020-11-04 15:52:09 +01:00
} )
2020-09-15 11:35:25 +02:00
} ) . then ( ( ) => {
2020-09-15 16:37:42 +02:00
this . _lastUpdateTime = Date . now ( )
2020-12-07 17:02:35 +01:00
this . _eventQueue . resume ( )
2020-09-15 11:35:25 +02:00
} )
}
2017-08-15 13:54:22 +02:00
} else {
return Promise . resolve ( )
}
}
2020-11-04 15:52:09 +01:00
_addBatch ( batchId : Id , groupId : Id , events : $ReadOnlyArray < EntityUpdate > ) {
const lastForGroup = this . _lastEntityEventIds [ groupId ] || [ ]
2020-12-10 17:18:19 +01:00
// find the position for inserting into last entity events (negative value is considered as not present in the array)
2020-11-04 15:52:09 +01:00
const index = binarySearch ( lastForGroup , batchId , compareOldestFirst )
if ( index < 0 ) {
lastForGroup . splice ( - index , 0 , batchId )
2020-12-10 17:18:19 +01:00
// only add the batch if it was not process before
this . _eventQueue . add ( batchId , groupId , events )
2020-11-04 15:52:09 +01:00
}
if ( lastForGroup . length > this . _MAX _EVENT _IDS _QUEUE _LENGTH ) {
lastForGroup . shift ( )
2017-08-15 13:54:22 +02:00
}
2020-11-04 15:52:09 +01:00
this . _lastEntityEventIds [ batchId ] = lastForGroup
this . _eventQueue . add ( batchId , groupId , events )
}
_processEventBatch ( batch : QueuedBatch ) : Promise < void > {
2019-03-21 15:06:59 +01:00
return this . _executeIfNotTerminated ( ( ) => {
2020-11-04 15:52:09 +01:00
return this . _cache . entityEventsReceived ( batch . events )
2019-04-26 16:14:16 +02:00
. then ( filteredEvents => {
return this . _executeIfNotTerminated ( ( ) => this . _login . entityEventsReceived ( filteredEvents ) )
. then ( ( ) => this . _executeIfNotTerminated ( ( ) => this . _mail . entityEventsReceived ( filteredEvents ) ) )
2020-11-04 15:52:09 +01:00
. then ( ( ) => this . _executeIfNotTerminated ( ( ) => this . _worker . entityEventsReceived ( filteredEvents , batch . groupId ) ) )
2019-04-26 16:14:16 +02:00
. return ( filteredEvents )
} )
. then ( filteredEvents => {
// Call the indexer in this last step because now the processed event is stored and the indexer has a separate event queue that
// shall not receive the event twice.
if ( ! isTest ( ) && ! isAdminClient ( ) ) {
this . _executeIfNotTerminated ( ( ) => {
2020-11-04 15:52:09 +01:00
this . _indexer . addBatchesToQueue ( [
{
groupId : batch . groupId ,
batchId : batch . batchId ,
events : filteredEvents
}
] )
2019-04-26 16:14:16 +02:00
this . _indexer . startProcessing ( )
} )
}
} )
2019-07-29 15:49:41 +02:00
} ) . catch ( ServiceUnavailableError , e => {
// a ServiceUnavailableError is a temporary error and we have to retry to avoid data inconsistencies
console . log ( "retry processing event in 30s" , e )
let promise = Promise . delay ( RETRY _AFTER _SERVICE _UNAVAILABLE _ERROR _MS ) . then ( ( ) => {
// if we have a websocket reconnect we have to stop retrying
if ( this . _serviceUnavailableRetry === promise ) {
2020-11-04 15:52:09 +01:00
return this . _processEventBatch ( batch )
2019-07-29 15:49:41 +02:00
} else {
throw new CancelledError ( "stop retry processing after service unavailable due to reconnect" )
}
} )
this . _serviceUnavailableRetry = promise
return promise
2019-03-21 15:06:59 +01:00
} )
2017-08-15 13:54:22 +02:00
}
_getLastEventBatchIdOrMinIdForGroup ( groupId : Id ) : Id {
2017-11-09 11:07:02 +01:00
// TODO handle lost updates (old event surpassed by newer one, we store the new id and retrieve instances from the newer one on next login
2018-07-26 17:37:44 +02:00
return ( this . _lastEntityEventIds [ groupId ] && this . _lastEntityEventIds [ groupId ] . length > 0 ) ?
this . _lastEntityEventIds [ groupId ] [ this . _lastEntityEventIds [ groupId ] . length - 1 ] : GENERATED _MIN _ID
2017-09-19 14:54:04 +02:00
}
2017-08-15 13:54:22 +02:00
_executeIfNotTerminated ( call : Function ) : Promise < void > {
2018-08-02 13:16:24 +02:00
if ( this . _state !== EventBusState . Terminated ) {
2017-08-15 13:54:22 +02:00
return call ( )
} else {
return Promise . resolve ( )
}
}
2018-12-11 09:41:02 +01:00
_eventGroups ( ) : Id [ ] {
return this . _login . getLoggedInUser ( ) . memberships
. filter ( membership => membership . groupType !== GroupType . MailingList )
. map ( membership => membership . group )
2019-02-04 17:04:30 +01:00
. concat ( this . _login . getLoggedInUser ( ) . userGroup . group )
2018-12-11 09:41:02 +01:00
}
2020-11-04 15:52:09 +01:00
}