2022-01-07 15:58:30 +01:00
|
|
|
import o from "ospec"
|
2022-05-12 16:51:15 +02:00
|
|
|
import {ConnectMode, EventBusClient} from "../../../../src/api/worker/EventBusClient.js"
|
|
|
|
|
import {GroupType, OperationType} from "../../../../src/api/common/TutanotaConstants.js"
|
|
|
|
|
import type {EntityUpdate} from "../../../../src/api/entities/sys/TypeRefs.js"
|
|
|
|
|
import {createEntityUpdate} from "../../../../src/api/entities/sys/TypeRefs.js"
|
|
|
|
|
import {EntityRestClientMock} from "./rest/EntityRestClientMock.js"
|
|
|
|
|
import {EntityClient} from "../../../../src/api/common/EntityClient.js"
|
2022-03-01 17:30:19 +01:00
|
|
|
import {defer, noOp} from "@tutao/tutanota-utils"
|
2022-05-12 16:51:15 +02:00
|
|
|
import {WorkerImpl} from "../../../../src/api/worker/WorkerImpl.js"
|
|
|
|
|
import {LoginFacadeImpl} from "../../../../src/api/worker/facades/LoginFacade.js"
|
|
|
|
|
import {createUser, User} from "../../../../src/api/entities/sys/TypeRefs.js"
|
|
|
|
|
import {createGroupMembership} from "../../../../src/api/entities/sys/TypeRefs.js"
|
|
|
|
|
import {InstanceMapper} from "../../../../src/api/worker/crypto/InstanceMapper.js"
|
|
|
|
|
import {EntityRestCache} from "../../../../src/api/worker/rest/EntityRestCache.js"
|
|
|
|
|
import {QueuedBatch} from "../../../../src/api/worker/search/EventQueue.js"
|
|
|
|
|
import {OutOfSyncError} from "../../../../src/api/common/error/OutOfSyncError.js"
|
2022-03-01 17:30:19 +01:00
|
|
|
import {matchers, object, verify, when} from "testdouble"
|
2022-05-12 16:51:15 +02:00
|
|
|
import {MailFacade} from "../../../../src/api/worker/facades/MailFacade.js"
|
|
|
|
|
import {Indexer} from "../../../../src/api/worker/search/Indexer.js"
|
|
|
|
|
import {createWebsocketEntityData, WebsocketEntityData} from "../../../../src/api/entities/sys/TypeRefs.js"
|
|
|
|
|
import {createWebsocketCounterData, WebsocketCounterData} from "../../../../src/api/entities/sys/TypeRefs.js"
|
|
|
|
|
import {createWebsocketCounterValue} from "../../../../src/api/entities/sys/TypeRefs.js"
|
|
|
|
|
import {createEntityEventBatch, EntityEventBatchTypeRef} from "../../../../src/api/entities/sys/TypeRefs.js"
|
|
|
|
|
import {getElementId} from "../../../../src/api/common/utils/EntityUtils.js"
|
|
|
|
|
import {SleepDetector} from "../../../../src/api/worker/utils/SleepDetector.js"
|
|
|
|
|
import {WsConnectionState} from "../../../../src/api/main/WorkerClient.js"
|
2022-01-13 11:57:55 +01:00
|
|
|
|
2022-01-07 15:58:30 +01:00
|
|
|
o.spec("EventBusClient test", function () {
|
2022-01-12 14:43:01 +01:00
|
|
|
let ebc: EventBusClient
|
|
|
|
|
let cacheMock: EntityRestCache
|
|
|
|
|
let restClient: EntityRestClientMock
|
|
|
|
|
let workerMock: WorkerImpl
|
|
|
|
|
let loginMock: LoginFacadeImpl
|
2022-02-28 12:12:01 +01:00
|
|
|
let mailMock: MailFacade
|
|
|
|
|
let indexerMock: Indexer
|
2022-02-28 17:23:22 +01:00
|
|
|
let socket: WebSocket
|
|
|
|
|
let user: User
|
2022-03-01 17:30:19 +01:00
|
|
|
let sleepDetector: SleepDetector
|
|
|
|
|
let socketFactory
|
2022-02-28 17:23:22 +01:00
|
|
|
|
|
|
|
|
function initEventBus() {
|
|
|
|
|
const entityClient = new EntityClient(restClient)
|
2022-03-01 17:30:19 +01:00
|
|
|
const instanceMapper = new InstanceMapper()
|
2022-02-28 17:23:22 +01:00
|
|
|
ebc = new EventBusClient(
|
|
|
|
|
workerMock,
|
|
|
|
|
indexerMock,
|
|
|
|
|
cacheMock,
|
|
|
|
|
mailMock,
|
|
|
|
|
loginMock,
|
|
|
|
|
entityClient,
|
2022-03-01 17:30:19 +01:00
|
|
|
instanceMapper,
|
|
|
|
|
socketFactory,
|
|
|
|
|
sleepDetector,
|
2022-02-28 17:23:22 +01:00
|
|
|
)
|
|
|
|
|
}
|
2022-02-28 12:12:01 +01:00
|
|
|
|
2022-03-01 17:30:19 +01:00
|
|
|
o.before(function () {
|
2022-05-09 18:41:10 +02:00
|
|
|
// Things that are not defined in node but are read-only in Browser
|
|
|
|
|
if (!globalThis.isBrowser) {
|
|
|
|
|
// @ts-ignore
|
|
|
|
|
WebSocket.CONNECTING = WebSocket.CONNECTING ?? 0
|
|
|
|
|
// @ts-ignore
|
|
|
|
|
WebSocket.OPEN = WebSocket.OPEN ?? 1
|
|
|
|
|
// @ts-ignore
|
|
|
|
|
WebSocket.CLOSING = WebSocket.CLOSING ?? 2
|
|
|
|
|
// @ts-ignore
|
|
|
|
|
WebSocket.CLOSED = WebSocket.CLOSED ?? 3
|
|
|
|
|
}
|
2022-03-01 17:30:19 +01:00
|
|
|
})
|
|
|
|
|
|
2022-01-12 14:43:01 +01:00
|
|
|
o.beforeEach(async function () {
|
2022-02-28 12:12:01 +01:00
|
|
|
cacheMock = object({
|
|
|
|
|
async entityEventsReceived(batch: QueuedBatch): Promise<Array<EntityUpdate>> {
|
|
|
|
|
return batch.events.slice()
|
2022-01-12 14:43:01 +01:00
|
|
|
},
|
2022-02-28 12:12:01 +01:00
|
|
|
async getLastEntityEventBatchForGroup(groupId: Id): Promise<Id | null> {
|
|
|
|
|
return null
|
2022-01-12 14:43:01 +01:00
|
|
|
},
|
2022-02-28 17:23:22 +01:00
|
|
|
async recordSyncTime(): Promise<void> {
|
|
|
|
|
return
|
2022-01-12 14:43:01 +01:00
|
|
|
},
|
2022-04-05 12:31:07 +02:00
|
|
|
async timeSinceLastSyncMs(): Promise<number | null> {
|
2022-02-28 17:23:22 +01:00
|
|
|
return null
|
2022-01-12 14:43:01 +01:00
|
|
|
},
|
2022-02-28 12:12:01 +01:00
|
|
|
async purgeStorage(): Promise<void> {
|
2022-03-01 16:19:18 +01:00
|
|
|
},
|
|
|
|
|
async setLastEntityEventBatchForGroup(groupId: Id, batchId: Id): Promise<void> {
|
|
|
|
|
return
|
2022-04-05 12:31:07 +02:00
|
|
|
},
|
|
|
|
|
async isOutOfSync(): Promise<boolean> {
|
|
|
|
|
return false
|
2022-02-28 12:12:01 +01:00
|
|
|
}
|
|
|
|
|
} as EntityRestCache)
|
2022-01-12 14:43:01 +01:00
|
|
|
|
2022-02-28 17:23:22 +01:00
|
|
|
user = createUser({
|
2022-01-12 14:43:01 +01:00
|
|
|
userGroup: createGroupMembership({
|
|
|
|
|
group: "userGroupId",
|
|
|
|
|
}),
|
|
|
|
|
})
|
|
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
loginMock = object("login")
|
|
|
|
|
when(loginMock.entityEventsReceived(matchers.anything())).thenResolve(undefined)
|
|
|
|
|
when(loginMock.getLoggedInUser()).thenReturn(user)
|
|
|
|
|
when(loginMock.isLoggedIn()).thenReturn(true)
|
2022-02-28 17:23:22 +01:00
|
|
|
when(loginMock.createAuthHeaders()).thenReturn({})
|
2022-01-12 14:43:01 +01:00
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
mailMock = object("mail")
|
|
|
|
|
when(mailMock.entityEventsReceived(matchers.anything())).thenResolve(undefined)
|
|
|
|
|
|
|
|
|
|
workerMock = object("worker")
|
|
|
|
|
when(workerMock.entityEventsReceived(matchers.anything(), matchers.anything())).thenResolve(undefined)
|
|
|
|
|
when(workerMock.updateCounter(matchers.anything())).thenResolve(undefined)
|
|
|
|
|
when(workerMock.updateWebSocketState(matchers.anything())).thenResolve(undefined)
|
2022-02-28 17:23:22 +01:00
|
|
|
when(workerMock.createProgressMonitor(matchers.anything())).thenResolve(42)
|
|
|
|
|
when(workerMock.progressWorkDone(matchers.anything(), matchers.anything())).thenResolve(undefined)
|
2022-02-28 12:12:01 +01:00
|
|
|
|
|
|
|
|
indexerMock = object("indexer")
|
2022-01-12 14:43:01 +01:00
|
|
|
|
|
|
|
|
restClient = new EntityRestClientMock()
|
|
|
|
|
|
2022-02-28 17:23:22 +01:00
|
|
|
|
|
|
|
|
socket = object<WebSocket>()
|
2022-03-01 17:30:19 +01:00
|
|
|
sleepDetector = object()
|
|
|
|
|
socketFactory = () => socket
|
2022-02-28 17:23:22 +01:00
|
|
|
|
|
|
|
|
initEventBus()
|
2022-01-12 14:43:01 +01:00
|
|
|
})
|
|
|
|
|
|
2022-02-28 17:23:22 +01:00
|
|
|
o.spec("initEntityEvents ", function () {
|
|
|
|
|
const mailGroupId = "mailGroupId"
|
|
|
|
|
|
|
|
|
|
o.beforeEach(function () {
|
|
|
|
|
user.memberships = [
|
|
|
|
|
createGroupMembership({
|
|
|
|
|
groupType: GroupType.Mail,
|
|
|
|
|
group: mailGroupId,
|
|
|
|
|
})
|
|
|
|
|
]
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
o("initial connect: when the cache is clean it downloads one batch and initializes cache", async function () {
|
|
|
|
|
when(cacheMock.getLastEntityEventBatchForGroup(mailGroupId)).thenResolve(null)
|
2022-04-05 12:31:07 +02:00
|
|
|
when(cacheMock.timeSinceLastSyncMs()).thenResolve(null)
|
2022-02-28 17:23:22 +01:00
|
|
|
const batch = createEntityEventBatch({_id: [mailGroupId, "-----------1"]})
|
|
|
|
|
restClient.addListInstances(batch)
|
|
|
|
|
|
|
|
|
|
await ebc.connect(ConnectMode.Initial)
|
|
|
|
|
await socket.onopen?.(new Event("open"))
|
|
|
|
|
|
2022-03-01 16:19:18 +01:00
|
|
|
verify(cacheMock.recordSyncTime())
|
2022-02-28 17:23:22 +01:00
|
|
|
// Did not download anything besides single batch
|
|
|
|
|
verify(restClient.loadRange(EntityEventBatchTypeRef, mailGroupId, matchers.anything(), matchers.not(1), matchers.anything()), {times: 0})
|
2022-03-01 16:19:18 +01:00
|
|
|
verify(cacheMock.setLastEntityEventBatchForGroup(mailGroupId, getElementId(batch)))
|
2022-02-28 17:23:22 +01:00
|
|
|
})
|
|
|
|
|
|
|
|
|
|
o("initial connect: when the cache is initialized, missed events are loaded", async function () {
|
|
|
|
|
when(cacheMock.getLastEntityEventBatchForGroup(mailGroupId)).thenResolve("------------")
|
2022-04-05 12:31:07 +02:00
|
|
|
when(cacheMock.timeSinceLastSyncMs()).thenResolve(1)
|
2022-02-28 17:23:22 +01:00
|
|
|
const update = createEntityUpdate({
|
|
|
|
|
type: "Mail",
|
|
|
|
|
application: "tutanota",
|
|
|
|
|
instanceListId: mailGroupId,
|
|
|
|
|
instanceId: "newBatchId",
|
|
|
|
|
})
|
|
|
|
|
const batch = createEntityEventBatch({
|
|
|
|
|
_id: [mailGroupId, "-----------1"],
|
|
|
|
|
events: [update],
|
|
|
|
|
})
|
|
|
|
|
restClient.addListInstances(batch)
|
|
|
|
|
|
|
|
|
|
const eventsReceivedDefer = defer()
|
|
|
|
|
when(cacheMock.entityEventsReceived({events: [update], batchId: getElementId(batch), groupId: mailGroupId}))
|
|
|
|
|
.thenDo(() => eventsReceivedDefer.resolve(undefined))
|
|
|
|
|
|
|
|
|
|
await ebc.connect(ConnectMode.Initial)
|
|
|
|
|
await socket.onopen?.(new Event("open"))
|
|
|
|
|
|
|
|
|
|
await eventsReceivedDefer.promise
|
|
|
|
|
|
|
|
|
|
verify(cacheMock.purgeStorage(), {times: 0})
|
|
|
|
|
verify(cacheMock.recordSyncTime())
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
o("reconnect: when the cache is out of sync with the server, the cache is purged", async function () {
|
|
|
|
|
when(cacheMock.getLastEntityEventBatchForGroup(mailGroupId)).thenResolve("lastBatchId")
|
|
|
|
|
// Make initial connection to simulate reconnect (populate lastEntityEventIds
|
|
|
|
|
await ebc.connect(ConnectMode.Initial)
|
|
|
|
|
await socket.onopen?.(new Event("open"))
|
|
|
|
|
|
|
|
|
|
// Make it think that it's actually a reconnect
|
2022-04-05 12:31:07 +02:00
|
|
|
when(cacheMock.isOutOfSync()).thenResolve(true)
|
2022-02-28 17:23:22 +01:00
|
|
|
|
|
|
|
|
// initialize events first as well as current time
|
|
|
|
|
await ebc.connect(ConnectMode.Reconnect)
|
|
|
|
|
await socket.onopen?.(new Event("open"))
|
|
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
verify(cacheMock.purgeStorage(), {times: 1})
|
2022-02-28 17:23:22 +01:00
|
|
|
verify(workerMock.sendError(matchers.isA(OutOfSyncError)))
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
o("initial connect: when the cache is out of sync with the server, the cache is purged", async function () {
|
|
|
|
|
when(cacheMock.getLastEntityEventBatchForGroup(mailGroupId)).thenResolve("lastBatchId")
|
2022-04-05 12:31:07 +02:00
|
|
|
when(cacheMock.isOutOfSync()).thenResolve(true)
|
2022-02-28 17:23:22 +01:00
|
|
|
|
|
|
|
|
await ebc.connect(ConnectMode.Reconnect)
|
|
|
|
|
await socket.onopen?.(new Event("open"))
|
|
|
|
|
|
|
|
|
|
verify(cacheMock.purgeStorage(), {times: 1})
|
|
|
|
|
verify(workerMock.sendError(matchers.isA(OutOfSyncError)))
|
2022-01-12 14:43:01 +01:00
|
|
|
})
|
|
|
|
|
})
|
2022-02-28 17:23:22 +01:00
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
o("parallel received event batches are passed sequentially to the entity rest cache", async function () {
|
2022-01-12 14:43:01 +01:00
|
|
|
o.timeout(500)
|
2022-02-28 17:23:22 +01:00
|
|
|
ebc.connect(ConnectMode.Initial)
|
|
|
|
|
await socket.onopen?.(new Event("open"))
|
2022-01-12 14:43:01 +01:00
|
|
|
|
2022-02-28 17:23:22 +01:00
|
|
|
const messageData1 = createEntityMessage(1)
|
|
|
|
|
const messageData2 = createEntityMessage(2)
|
2022-01-12 14:43:01 +01:00
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
// Casting ot object here because promise stubber doesn't allow you to just return the promise
|
|
|
|
|
// We never resolve the promise
|
|
|
|
|
when(cacheMock.entityEventsReceived(matchers.anything()) as object).thenReturn(new Promise(noOp))
|
2022-01-12 14:43:01 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
// call twice as if it was received in parallel
|
2022-02-28 17:23:22 +01:00
|
|
|
const p1 = socket.onmessage?.({
|
|
|
|
|
data: messageData1,
|
|
|
|
|
} as MessageEvent<string>)
|
2022-01-12 14:43:01 +01:00
|
|
|
|
2022-02-28 17:23:22 +01:00
|
|
|
const p2 = socket.onmessage?.(
|
2022-02-28 15:19:23 +01:00
|
|
|
{
|
2022-01-12 14:43:01 +01:00
|
|
|
data: messageData2,
|
2022-02-28 15:19:23 +01:00
|
|
|
} as MessageEvent<string>,
|
2022-01-12 14:43:01 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
await Promise.all([p1, p2])
|
2022-02-28 12:12:01 +01:00
|
|
|
|
|
|
|
|
// Is waiting for cache to process the first event
|
|
|
|
|
verify(cacheMock.entityEventsReceived(matchers.anything()), {times: 1})
|
|
|
|
|
},
|
2022-01-12 14:43:01 +01:00
|
|
|
)
|
2022-02-28 12:12:01 +01:00
|
|
|
|
2022-03-01 17:30:19 +01:00
|
|
|
o("on counter update it send message to the main thread", async function () {
|
2022-02-28 17:23:22 +01:00
|
|
|
const counterUpdate = createCounterData({mailGroupId: "group1", counterValue: 4, listId: "list1"})
|
|
|
|
|
await ebc.connect(ConnectMode.Initial)
|
2022-01-12 14:43:01 +01:00
|
|
|
|
2022-02-28 17:23:22 +01:00
|
|
|
await socket.onmessage?.(
|
2022-02-28 12:12:01 +01:00
|
|
|
{
|
|
|
|
|
data: createCounterMessage(counterUpdate),
|
|
|
|
|
} as MessageEvent,
|
2022-01-12 14:43:01 +01:00
|
|
|
)
|
2022-02-28 12:12:01 +01:00
|
|
|
verify(workerMock.updateCounter(counterUpdate))
|
|
|
|
|
},
|
2022-01-12 14:43:01 +01:00
|
|
|
)
|
|
|
|
|
|
2022-03-01 17:30:19 +01:00
|
|
|
o.spec("sleep detection", function () {
|
|
|
|
|
o("on connect it starts", async function () {
|
|
|
|
|
verify(sleepDetector.start(matchers.anything()), {times: 0})
|
|
|
|
|
|
|
|
|
|
ebc.connect(ConnectMode.Initial)
|
|
|
|
|
await socket.onopen?.(new Event("open"))
|
|
|
|
|
|
|
|
|
|
verify(sleepDetector.start(matchers.anything()), {times: 1})
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
o("on disconnect it stops", async function () {
|
|
|
|
|
ebc.connect(ConnectMode.Initial)
|
|
|
|
|
await socket.onopen?.(new Event("open"))
|
|
|
|
|
|
|
|
|
|
await socket.onclose?.(new Event("close") as CloseEvent) // there's no CloseEvent in node
|
|
|
|
|
verify(sleepDetector.stop())
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
o("on sleep it reconnects", async function () {
|
|
|
|
|
let passedCb
|
|
|
|
|
when(sleepDetector.start(matchers.anything())).thenDo((cb) => passedCb = cb)
|
|
|
|
|
const firstSocket = socket
|
|
|
|
|
|
|
|
|
|
ebc.connect(ConnectMode.Initial)
|
|
|
|
|
// @ts-ignore
|
|
|
|
|
firstSocket.readyState = WebSocket.OPEN
|
|
|
|
|
await firstSocket.onopen?.(new Event("open"))
|
|
|
|
|
verify(socket.close(), {ignoreExtraArgs: true, times: 0})
|
|
|
|
|
const secondSocket = socket = object()
|
|
|
|
|
passedCb()
|
|
|
|
|
|
|
|
|
|
verify(firstSocket.close(), {ignoreExtraArgs: true, times: 1})
|
|
|
|
|
verify(workerMock.updateWebSocketState(WsConnectionState.connecting))
|
|
|
|
|
await secondSocket.onopen?.(new Event("open"))
|
|
|
|
|
verify(workerMock.updateWebSocketState(WsConnectionState.connected))
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
|
2022-02-28 17:23:22 +01:00
|
|
|
function createEntityMessage(eventBatchId: number): string {
|
2022-02-28 12:12:01 +01:00
|
|
|
const event: WebsocketEntityData = createWebsocketEntityData({
|
|
|
|
|
eventBatchId: String(eventBatchId),
|
|
|
|
|
eventBatchOwner: "ownerId",
|
|
|
|
|
eventBatch: [
|
|
|
|
|
createEntityUpdate({
|
|
|
|
|
_id: "eventbatchid",
|
|
|
|
|
application: "tutanota",
|
|
|
|
|
type: "Mail",
|
|
|
|
|
instanceListId: "listId1",
|
|
|
|
|
instanceId: "id1",
|
|
|
|
|
operation: OperationType.UPDATE,
|
|
|
|
|
}),
|
|
|
|
|
],
|
|
|
|
|
}
|
|
|
|
|
)
|
2022-01-12 14:43:01 +01:00
|
|
|
return "entityUpdate;" + JSON.stringify(event)
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
type CounterMessageParams = {mailGroupId: Id, counterValue: number, listId: Id}
|
|
|
|
|
|
|
|
|
|
function createCounterData({mailGroupId, counterValue, listId}: CounterMessageParams): WebsocketCounterData {
|
|
|
|
|
return createWebsocketCounterData({
|
|
|
|
|
_format: "0",
|
|
|
|
|
mailGroup: mailGroupId,
|
|
|
|
|
counterValues: [
|
|
|
|
|
createWebsocketCounterValue({
|
|
|
|
|
_id: "counterupdateid",
|
|
|
|
|
count: String(counterValue),
|
|
|
|
|
mailListId: listId,
|
|
|
|
|
}),
|
|
|
|
|
],
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function createCounterMessage(event: WebsocketCounterData): string {
|
2022-01-12 14:43:01 +01:00
|
|
|
return "unreadCounterUpdate;" + JSON.stringify(event)
|
|
|
|
|
}
|
2022-01-07 15:58:30 +01:00
|
|
|
})
|