tutanota/test/tests/api/worker/EventBusClientTest.ts

307 lines
10 KiB
TypeScript
Raw Normal View History

import o from "ospec"
2022-12-28 15:28:28 +01:00
import { ConnectMode, EventBusClient, EventBusListener } from "../../../../src/api/worker/EventBusClient.js"
import {GroupType, OperationType} from "../../../../src/api/common/TutanotaConstants.js"
import type {EntityUpdate} from "../../../../src/api/entities/sys/TypeRefs.js"
import {
createEntityEventBatch,
createEntityUpdate,
createGroupMembership,
createUser,
createWebsocketCounterData,
createWebsocketCounterValue,
createWebsocketEntityData,
EntityEventBatchTypeRef,
User,
WebsocketCounterData,
2022-12-27 15:37:40 +01:00
WebsocketEntityData,
} from "../../../../src/api/entities/sys/TypeRefs.js"
import {EntityRestClientMock} from "./rest/EntityRestClientMock.js"
import {EntityClient} from "../../../../src/api/common/EntityClient.js"
import {defer, noOp} from "@tutao/tutanota-utils"
import {InstanceMapper} from "../../../../src/api/worker/crypto/InstanceMapper.js"
import {DefaultEntityRestCache} from "../../../../src/api/worker/rest/DefaultEntityRestCache.js"
import {QueuedBatch} from "../../../../src/api/worker/search/EventQueue.js"
import {OutOfSyncError} from "../../../../src/api/common/error/OutOfSyncError.js"
import {matchers, object, verify, when} from "testdouble"
import {getElementId} from "../../../../src/api/common/utils/EntityUtils.js"
import {SleepDetector} from "../../../../src/api/worker/utils/SleepDetector.js"
import {WsConnectionState} from "../../../../src/api/main/WorkerClient.js"
import {UserFacade} from "../../../../src/api/worker/facades/UserFacade"
2022-12-28 15:28:28 +01:00
import { ExposedProgressTracker } from "../../../../src/api/main/ProgressTracker.js"
o.spec("EventBusClient test", function () {
let ebc: EventBusClient
let cacheMock: DefaultEntityRestCache
let restClient: EntityRestClientMock
let userMock: UserFacade
let socket: WebSocket
let user: User
let sleepDetector: SleepDetector
2022-12-28 15:28:28 +01:00
let listenerMock: EventBusListener
let progressTrackerMock: ExposedProgressTracker
let socketFactory
function initEventBus() {
const entityClient = new EntityClient(restClient)
const instanceMapper = new InstanceMapper()
2022-12-28 15:28:28 +01:00
ebc = new EventBusClient(listenerMock, cacheMock, userMock, entityClient, instanceMapper, socketFactory, sleepDetector, progressTrackerMock)
}
o.before(function () {
2022-05-09 18:41:10 +02:00
// Things that are not defined in node but are read-only in Browser
if (!globalThis.isBrowser) {
// @ts-ignore
WebSocket.CONNECTING = WebSocket.CONNECTING ?? 0
// @ts-ignore
WebSocket.OPEN = WebSocket.OPEN ?? 1
// @ts-ignore
WebSocket.CLOSING = WebSocket.CLOSING ?? 2
// @ts-ignore
WebSocket.CLOSED = WebSocket.CLOSED ?? 3
}
})
o.beforeEach(async function () {
2022-12-28 15:28:28 +01:00
listenerMock = object()
progressTrackerMock = object()
cacheMock = object({
async entityEventsReceived(batch: QueuedBatch): Promise<Array<EntityUpdate>> {
return batch.events.slice()
},
async getLastEntityEventBatchForGroup(groupId: Id): Promise<Id | null> {
return null
},
async recordSyncTime(): Promise<void> {
return
},
async timeSinceLastSyncMs(): Promise<number | null> {
return null
},
2022-12-27 15:37:40 +01:00
async purgeStorage(): Promise<void> {},
async setLastEntityEventBatchForGroup(groupId: Id, batchId: Id): Promise<void> {
return
},
async isOutOfSync(): Promise<boolean> {
return false
2022-12-27 15:37:40 +01:00
},
} as DefaultEntityRestCache)
user = createUser({
userGroup: createGroupMembership({
group: "userGroupId",
}),
})
userMock = object("user")
when(userMock.getLoggedInUser()).thenReturn(user)
Offline login A lot of commits were squashed: these were the messages Adjust for partial login state on the worker side, #3888 Co-authored-by: tih<tih@tutao.de> Handle partial and full login on the client side, #3888 Co-authored-by: tih<tih@tutao.de> Attempt to log in when coming online if async login fails, #3888 Co-authored-by: tih<tih@tutao.de> Allow only Premium customers to log in offline, #3888 We return result from resumeLogin to not add a bespoke error class for our use case. We cannot use UserError there either. We should probably convert more expected errors to results as it's hard to know what should be handled when we reach UI layer. Co-authored-by: tih <tih@tutao.de> Allow fixing credentials after offline login, #3888 Also improve progress display in password prompt dialog Co-authored-by: tih <tih@tutao.de> Allow re-initializing cache storage when the first login fails, #3888 Co-authored-by: tih<tih@tutao.de> Improve LoginFacadeTest Do not keep credentials for retry if they cannot be used again, #3888 Co-authored-by: tih<tih@tutao.de> Fix initialising indexer from LoginFacade, #3888 Co-authored-by: tih<tih@tutao.de> Add offlineLoginRequiresPremium_msg for de, de_sie, #3888 Cleanup facades fields after review with jom Co-authored-by: tih <tih@tutao.de> Cleanup UserFacade, fix tests Co-authored-by: tih <tih@tutao.de> Fix waiting for full login in LoginController Co-authored-by: tih<tih@tutao.de> Split AsyncActions into partial and full login Co-authored-by: tih <tih@tutao.de> Cleanup registering of post-login actions Co-authored-by: tih <tih@tutao.de> Change offlineDb parameter on CredentialsProvider after review with jom Co-authored-by: tih <tih@tutao.de> Remove out-of-sync handling on cache init Current approach does not work in offline, we are going to reimplement it. See #4067 Suppress connection error during offline login, #3888 Do not wait for ConnectionError in LoginFacadeTest * ConnectionError is suppressed in offline login Prevent endless waiting for contacts after offline login, fix #4078 * When the user is logged in offline the indexer has not been initialized and searching for contacts will never resolve. Handle UserGroupKeyNotFoundError, see #4078 The userGroupKey is saved after login. However, when the user logs in offline, the key is not saved and cannot be used to encrypt. We handle this in the same way we handle ConnectionErrors.
2022-04-13 10:23:09 +02:00
when(userMock.isFullyLoggedIn()).thenReturn(true)
when(userMock.createAuthHeaders()).thenReturn({})
restClient = new EntityRestClientMock()
socket = object<WebSocket>()
sleepDetector = object()
socketFactory = () => socket
initEventBus()
})
o.spec("initEntityEvents ", function () {
const mailGroupId = "mailGroupId"
o.beforeEach(function () {
user.memberships = [
createGroupMembership({
groupType: GroupType.Mail,
group: mailGroupId,
2022-12-27 15:37:40 +01:00
}),
]
})
o("initial connect: when the cache is clean it downloads one batch and initializes cache", async function () {
when(cacheMock.getLastEntityEventBatchForGroup(mailGroupId)).thenResolve(null)
when(cacheMock.timeSinceLastSyncMs()).thenResolve(null)
2022-12-27 15:37:40 +01:00
const batch = createEntityEventBatch({ _id: [mailGroupId, "-----------1"] })
restClient.addListInstances(batch)
await ebc.connect(ConnectMode.Initial)
await socket.onopen?.(new Event("open"))
verify(cacheMock.recordSyncTime())
// Did not download anything besides single batch
2022-12-27 15:37:40 +01:00
verify(restClient.loadRange(EntityEventBatchTypeRef, mailGroupId, matchers.anything(), matchers.not(1), matchers.anything()), { times: 0 })
verify(cacheMock.setLastEntityEventBatchForGroup(mailGroupId, getElementId(batch)))
})
o("initial connect: when the cache is initialized, missed events are loaded", async function () {
when(cacheMock.getLastEntityEventBatchForGroup(mailGroupId)).thenResolve("------------")
when(cacheMock.timeSinceLastSyncMs()).thenResolve(1)
const update = createEntityUpdate({
type: "Mail",
application: "tutanota",
instanceListId: mailGroupId,
instanceId: "newBatchId",
})
const batch = createEntityEventBatch({
_id: [mailGroupId, "-----------1"],
events: [update],
})
restClient.addListInstances(batch)
const eventsReceivedDefer = defer()
2022-12-27 15:37:40 +01:00
when(cacheMock.entityEventsReceived({ events: [update], batchId: getElementId(batch), groupId: mailGroupId })).thenDo(() =>
eventsReceivedDefer.resolve(undefined),
)
await ebc.connect(ConnectMode.Initial)
await socket.onopen?.(new Event("open"))
await eventsReceivedDefer.promise
2022-12-27 15:37:40 +01:00
verify(cacheMock.purgeStorage(), { times: 0 })
verify(cacheMock.recordSyncTime())
})
o("reconnect: when the cache is out of sync with the server, the cache is purged", async function () {
when(cacheMock.getLastEntityEventBatchForGroup(mailGroupId)).thenResolve("lastBatchId")
// Make initial connection to simulate reconnect (populate lastEntityEventIds
await ebc.connect(ConnectMode.Initial)
await socket.onopen?.(new Event("open"))
// Make it think that it's actually a reconnect
when(cacheMock.isOutOfSync()).thenResolve(true)
// initialize events first as well as current time
await ebc.connect(ConnectMode.Reconnect)
await socket.onopen?.(new Event("open"))
2022-12-27 15:37:40 +01:00
verify(cacheMock.purgeStorage(), { times: 1 })
2022-12-28 15:28:28 +01:00
verify(listenerMock.onError(matchers.isA(OutOfSyncError)))
})
o("initial connect: when the cache is out of sync with the server, the cache is purged", async function () {
when(cacheMock.getLastEntityEventBatchForGroup(mailGroupId)).thenResolve("lastBatchId")
when(cacheMock.isOutOfSync()).thenResolve(true)
await ebc.connect(ConnectMode.Reconnect)
await socket.onopen?.(new Event("open"))
2022-12-27 15:37:40 +01:00
verify(cacheMock.purgeStorage(), { times: 1 })
2022-12-28 15:28:28 +01:00
verify(listenerMock.onError(matchers.isA(OutOfSyncError)))
})
})
o("parallel received event batches are passed sequentially to the entity rest cache", async function () {
2022-12-27 15:37:40 +01:00
o.timeout(500)
ebc.connect(ConnectMode.Initial)
await socket.onopen?.(new Event("open"))
2022-12-27 15:37:40 +01:00
const messageData1 = createEntityMessage(1)
const messageData2 = createEntityMessage(2)
2022-12-27 15:37:40 +01:00
// Casting ot object here because promise stubber doesn't allow you to just return the promise
// We never resolve the promise
when(cacheMock.entityEventsReceived(matchers.anything()) as object).thenReturn(new Promise(noOp))
2022-12-27 15:37:40 +01:00
// call twice as if it was received in parallel
const p1 = socket.onmessage?.({
data: messageData1,
} as MessageEvent<string>)
2022-12-27 15:37:40 +01:00
const p2 = socket.onmessage?.({
data: messageData2,
} as MessageEvent<string>)
2022-12-27 15:37:40 +01:00
await Promise.all([p1, p2])
2022-12-27 15:37:40 +01:00
// Is waiting for cache to process the first event
verify(cacheMock.entityEventsReceived(matchers.anything()), { times: 1 })
})
o("on counter update it send message to the main thread", async function () {
2022-12-27 15:37:40 +01:00
const counterUpdate = createCounterData({ mailGroupId: "group1", counterValue: 4, listId: "list1" })
await ebc.connect(ConnectMode.Initial)
2022-12-27 15:37:40 +01:00
await socket.onmessage?.({
data: createCounterMessage(counterUpdate),
} as MessageEvent)
2022-12-28 15:28:28 +01:00
verify(listenerMock.onCounterChanged(counterUpdate))
2022-12-27 15:37:40 +01:00
})
o.spec("sleep detection", function () {
o("on connect it starts", async function () {
2022-12-27 15:37:40 +01:00
verify(sleepDetector.start(matchers.anything()), { times: 0 })
ebc.connect(ConnectMode.Initial)
await socket.onopen?.(new Event("open"))
2022-12-27 15:37:40 +01:00
verify(sleepDetector.start(matchers.anything()), { times: 1 })
})
o("on disconnect it stops", async function () {
ebc.connect(ConnectMode.Initial)
await socket.onopen?.(new Event("open"))
await socket.onclose?.(new Event("close") as CloseEvent) // there's no CloseEvent in node
verify(sleepDetector.stop())
})
o("on sleep it reconnects", async function () {
let passedCb
2022-12-27 15:37:40 +01:00
when(sleepDetector.start(matchers.anything())).thenDo((cb) => (passedCb = cb))
const firstSocket = socket
ebc.connect(ConnectMode.Initial)
// @ts-ignore
firstSocket.readyState = WebSocket.OPEN
await firstSocket.onopen?.(new Event("open"))
2022-12-27 15:37:40 +01:00
verify(socket.close(), { ignoreExtraArgs: true, times: 0 })
const secondSocket = (socket = object())
passedCb()
2022-12-27 15:37:40 +01:00
verify(firstSocket.close(), { ignoreExtraArgs: true, times: 1 })
2022-12-28 15:28:28 +01:00
verify(listenerMock.onWebsocketStateChanged(WsConnectionState.connecting))
await secondSocket.onopen?.(new Event("open"))
2022-12-28 15:28:28 +01:00
verify(listenerMock.onWebsocketStateChanged(WsConnectionState.connected))
})
})
function createEntityMessage(eventBatchId: number): string {
const event: WebsocketEntityData = createWebsocketEntityData({
2022-12-27 15:37:40 +01:00
eventBatchId: String(eventBatchId),
eventBatchOwner: "ownerId",
eventBatch: [
createEntityUpdate({
_id: "eventbatchid",
application: "tutanota",
type: "Mail",
instanceListId: "listId1",
instanceId: "id1",
operation: OperationType.UPDATE,
}),
],
})
return "entityUpdate;" + JSON.stringify(event)
}
2022-12-27 15:37:40 +01:00
type CounterMessageParams = { mailGroupId: Id; counterValue: number; listId: Id }
2022-12-27 15:37:40 +01:00
function createCounterData({ mailGroupId, counterValue, listId }: CounterMessageParams): WebsocketCounterData {
return createWebsocketCounterData({
2022-12-27 15:37:40 +01:00
_format: "0",
mailGroup: mailGroupId,
counterValues: [
createWebsocketCounterValue({
_id: "counterupdateid",
count: String(counterValue),
mailListId: listId,
}),
],
})
}
function createCounterMessage(event: WebsocketCounterData): string {
return "unreadCounterUpdate;" + JSON.stringify(event)
}
2022-12-27 15:37:40 +01:00
})