2022-01-07 15:58:30 +01:00
|
|
|
import o from "ospec"
|
|
|
|
|
import {EventBusClient, EventBusState} from "../../../src/api/worker/EventBusClient"
|
|
|
|
|
import {OperationType} from "../../../src/api/common/TutanotaConstants"
|
|
|
|
|
import type {EntityUpdate} from "../../../src/api/entities/sys/EntityUpdate"
|
2022-02-28 12:12:01 +01:00
|
|
|
import {createEntityUpdate} from "../../../src/api/entities/sys/EntityUpdate"
|
2022-01-07 15:58:30 +01:00
|
|
|
import {EntityRestClientMock} from "./EntityRestClientMock"
|
|
|
|
|
import {EntityClient} from "../../../src/api/common/EntityClient"
|
2022-02-28 12:12:01 +01:00
|
|
|
import {downcast, noOp} from "@tutao/tutanota-utils"
|
2022-01-07 15:58:30 +01:00
|
|
|
import {WorkerImpl} from "../../../src/api/worker/WorkerImpl"
|
2022-01-13 11:57:55 +01:00
|
|
|
import {LoginFacadeImpl} from "../../../src/api/worker/facades/LoginFacade"
|
2022-01-07 15:58:30 +01:00
|
|
|
import {createUser} from "../../../src/api/entities/sys/User"
|
|
|
|
|
import {createGroupMembership} from "../../../src/api/entities/sys/GroupMembership"
|
|
|
|
|
import {InstanceMapper} from "../../../src/api/worker/crypto/InstanceMapper"
|
2022-01-12 14:43:01 +01:00
|
|
|
import {EntityRestCache} from "../../../src/api/worker/rest/EntityRestCache"
|
|
|
|
|
import {QueuedBatch} from "../../../src/api/worker/search/EventQueue"
|
2022-02-28 12:12:01 +01:00
|
|
|
import {assertThrows} from "@tutao/tutanota-test-utils"
|
2022-01-12 14:43:01 +01:00
|
|
|
import {OutOfSyncError} from "../../../src/api/common/error/OutOfSyncError"
|
2022-02-28 12:12:01 +01:00
|
|
|
import {matchers, object, verify, when} from "testdouble"
|
|
|
|
|
import {MailFacade} from "../../../src/api/worker/facades/MailFacade"
|
|
|
|
|
import {Indexer} from "../../../src/api/worker/search/Indexer"
|
|
|
|
|
import {createWebsocketEntityData, WebsocketEntityData} from "../../../src/api/entities/sys/WebsocketEntityData"
|
|
|
|
|
import {createWebsocketCounterData, WebsocketCounterData} from "../../../src/api/entities/sys/WebsocketCounterData"
|
|
|
|
|
import {createWebsocketCounterValue} from "../../../src/api/entities/sys/WebsocketCounterValue"
|
2022-01-13 11:57:55 +01:00
|
|
|
|
2022-01-07 15:58:30 +01:00
|
|
|
o.spec("EventBusClient test", function () {
|
2022-01-12 14:43:01 +01:00
|
|
|
let ebc: EventBusClient
|
|
|
|
|
let cacheMock: EntityRestCache
|
|
|
|
|
let restClient: EntityRestClientMock
|
|
|
|
|
let workerMock: WorkerImpl
|
|
|
|
|
let loginMock: LoginFacadeImpl
|
2022-02-28 12:12:01 +01:00
|
|
|
let mailMock: MailFacade
|
|
|
|
|
let indexerMock: Indexer
|
|
|
|
|
|
2022-01-12 14:43:01 +01:00
|
|
|
o.beforeEach(async function () {
|
2022-02-28 12:12:01 +01:00
|
|
|
cacheMock = object({
|
|
|
|
|
async entityEventsReceived(batch: QueuedBatch): Promise<Array<EntityUpdate>> {
|
|
|
|
|
return batch.events.slice()
|
2022-01-12 14:43:01 +01:00
|
|
|
},
|
2022-02-28 12:12:01 +01:00
|
|
|
async getLastEntityEventBatchForGroup(groupId: Id): Promise<Id | null> {
|
|
|
|
|
return null
|
2022-01-12 14:43:01 +01:00
|
|
|
},
|
|
|
|
|
getServerTimestampMs(): number {
|
|
|
|
|
return 0
|
|
|
|
|
},
|
2022-02-28 12:12:01 +01:00
|
|
|
async getLastUpdateTime(): Promise<number | null> {
|
2022-01-12 14:43:01 +01:00
|
|
|
return 0
|
|
|
|
|
},
|
|
|
|
|
async setLastUpdateTime(value: number): Promise<void> {
|
|
|
|
|
},
|
2022-02-28 12:12:01 +01:00
|
|
|
async purgeStorage(): Promise<void> {
|
|
|
|
|
}
|
|
|
|
|
} as EntityRestCache)
|
2022-01-12 14:43:01 +01:00
|
|
|
|
|
|
|
|
const user = createUser({
|
|
|
|
|
userGroup: createGroupMembership({
|
|
|
|
|
group: "userGroupId",
|
|
|
|
|
}),
|
|
|
|
|
})
|
|
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
loginMock = object("login")
|
|
|
|
|
when(loginMock.entityEventsReceived(matchers.anything())).thenResolve(undefined)
|
|
|
|
|
when(loginMock.getLoggedInUser()).thenReturn(user)
|
|
|
|
|
when(loginMock.isLoggedIn()).thenReturn(true)
|
2022-01-12 14:43:01 +01:00
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
mailMock = object("mail")
|
|
|
|
|
when(mailMock.entityEventsReceived(matchers.anything())).thenResolve(undefined)
|
|
|
|
|
|
|
|
|
|
workerMock = object("worker")
|
|
|
|
|
when(workerMock.entityEventsReceived(matchers.anything(), matchers.anything())).thenResolve(undefined)
|
|
|
|
|
when(workerMock.updateCounter(matchers.anything())).thenResolve(undefined)
|
|
|
|
|
when(workerMock.updateWebSocketState(matchers.anything())).thenResolve(undefined)
|
|
|
|
|
when(workerMock.sendError(matchers.anything())).thenDo((e) => {
|
|
|
|
|
throw e
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
indexerMock = object("indexer")
|
|
|
|
|
// TODO: ???
|
|
|
|
|
// when(indexerMock.processEntityEvents(matchers.anything(), matchers.anything(), matchers.anything())).thenResolve(undefined)
|
2022-01-12 14:43:01 +01:00
|
|
|
|
|
|
|
|
restClient = new EntityRestClientMock()
|
|
|
|
|
const entityClient = new EntityClient(restClient)
|
|
|
|
|
const intanceMapper = new InstanceMapper()
|
|
|
|
|
ebc = new EventBusClient(workerMock, indexerMock, cacheMock, mailMock, loginMock, entityClient, intanceMapper)
|
|
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
// TODO
|
|
|
|
|
let e = ebc as any
|
2022-01-12 14:43:01 +01:00
|
|
|
e.connect = function (reconnect: boolean) {
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
o.spec("loadMissedEntityEvents ", function () {
|
|
|
|
|
o("When the cache is out of sync with the server, the cache is purged", async function () {
|
2022-02-28 12:12:01 +01:00
|
|
|
when(cacheMock.getServerTimestampMs()).thenReturn(Date.now())
|
|
|
|
|
await assertThrows(OutOfSyncError, () => ebc.loadMissedEntityEvents())
|
|
|
|
|
verify(cacheMock.purgeStorage(), {times: 1})
|
2022-01-12 14:43:01 +01:00
|
|
|
})
|
|
|
|
|
})
|
2022-02-28 12:12:01 +01:00
|
|
|
o("parallel received event batches are passed sequentially to the entity rest cache", async function () {
|
2022-01-12 14:43:01 +01:00
|
|
|
o.timeout(500)
|
|
|
|
|
ebc._state = EventBusState.Automatic
|
|
|
|
|
await ebc._onOpen(false)
|
|
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
let messageData1 = createMessageData(1)
|
|
|
|
|
let messageData2 = createMessageData(2)
|
2022-01-12 14:43:01 +01:00
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
// Casting ot object here because promise stubber doesn't allow you to just return the promise
|
|
|
|
|
// We never resolve the promise
|
|
|
|
|
when(cacheMock.entityEventsReceived(matchers.anything()) as object).thenReturn(new Promise(noOp))
|
2022-01-12 14:43:01 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
// call twice as if it was received in parallel
|
|
|
|
|
let p1 = ebc._message(
|
|
|
|
|
downcast({
|
|
|
|
|
data: messageData1,
|
|
|
|
|
}),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
let p2 = ebc._message(
|
|
|
|
|
downcast({
|
|
|
|
|
data: messageData2,
|
|
|
|
|
}),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
await Promise.all([p1, p2])
|
2022-02-28 12:12:01 +01:00
|
|
|
|
|
|
|
|
// Is waiting for cache to process the first event
|
|
|
|
|
verify(cacheMock.entityEventsReceived(matchers.anything()), {times: 1})
|
|
|
|
|
},
|
2022-01-12 14:43:01 +01:00
|
|
|
)
|
2022-02-28 12:12:01 +01:00
|
|
|
|
|
|
|
|
o("counter update", async function () {
|
|
|
|
|
let counterUpdate = createCounterData({mailGroupId: "group1", counterValue: 4, listId: "list1"})
|
2022-01-12 14:43:01 +01:00
|
|
|
|
|
|
|
|
downcast(workerMock).updateCounter = o.spy(ebc._worker.updateCounter)
|
|
|
|
|
await ebc._message(
|
2022-02-28 12:12:01 +01:00
|
|
|
{
|
|
|
|
|
data: createCounterMessage(counterUpdate),
|
|
|
|
|
} as MessageEvent,
|
2022-01-12 14:43:01 +01:00
|
|
|
)
|
2022-02-28 12:12:01 +01:00
|
|
|
verify(workerMock.updateCounter(counterUpdate))
|
|
|
|
|
},
|
2022-01-12 14:43:01 +01:00
|
|
|
)
|
|
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
function createMessageData(eventBatchId: number): string {
|
|
|
|
|
const event: WebsocketEntityData = createWebsocketEntityData({
|
|
|
|
|
eventBatchId: String(eventBatchId),
|
|
|
|
|
eventBatchOwner: "ownerId",
|
|
|
|
|
eventBatch: [
|
|
|
|
|
createEntityUpdate({
|
|
|
|
|
_id: "eventbatchid",
|
|
|
|
|
application: "tutanota",
|
|
|
|
|
type: "Mail",
|
|
|
|
|
instanceListId: "listId1",
|
|
|
|
|
instanceId: "id1",
|
|
|
|
|
operation: OperationType.UPDATE,
|
|
|
|
|
}),
|
|
|
|
|
],
|
|
|
|
|
}
|
|
|
|
|
)
|
2022-01-12 14:43:01 +01:00
|
|
|
return "entityUpdate;" + JSON.stringify(event)
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-28 12:12:01 +01:00
|
|
|
type CounterMessageParams = {mailGroupId: Id, counterValue: number, listId: Id}
|
|
|
|
|
|
|
|
|
|
function createCounterData({mailGroupId, counterValue, listId}: CounterMessageParams): WebsocketCounterData {
|
|
|
|
|
return createWebsocketCounterData({
|
|
|
|
|
_format: "0",
|
|
|
|
|
mailGroup: mailGroupId,
|
|
|
|
|
counterValues: [
|
|
|
|
|
createWebsocketCounterValue({
|
|
|
|
|
_id: "counterupdateid",
|
|
|
|
|
count: String(counterValue),
|
|
|
|
|
mailListId: listId,
|
|
|
|
|
}),
|
|
|
|
|
],
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function createCounterMessage(event: WebsocketCounterData): string {
|
2022-01-12 14:43:01 +01:00
|
|
|
return "unreadCounterUpdate;" + JSON.stringify(event)
|
|
|
|
|
}
|
2022-01-07 15:58:30 +01:00
|
|
|
})
|