threaded sqlcipher

#5112
This commit is contained in:
nig 2023-04-17 16:56:50 +02:00 committed by Willow
parent 9c9c5da8a0
commit e57b9c7ff2
16 changed files with 162 additions and 39 deletions

View file

@ -111,7 +111,7 @@ export async function buildDesktop({ dirname, version, platform, updateUrl, name
async function rollupDesktop(dirname, outDir, version, platform, disableMinify) {
platform = getCanonicalPlatformName(platform)
const mainBundle = await rollup({
input: path.join(dirname, "src/desktop/DesktopMain.ts"),
input: [path.join(dirname, "src/desktop/DesktopMain.ts"), path.join(dirname, "src/desktop/db/sqlworker.ts")],
// some transitive dep of a transitive dev-dep requires https://www.npmjs.com/package/url
// which rollup for some reason won't distinguish from the node builtin.
external: ["url", "util", "path", "fs", "os", "http", "https", "crypto", "child_process", "electron"],
@ -119,7 +119,7 @@ async function rollupDesktop(dirname, outDir, version, platform, disableMinify)
plugins: [
copyNativeModulePlugin({
rootDir: projectRoot,
dstPath: "./build/dist/desktop/",
dstPath: "./build/dist/desktop/db/",
platform,
nodeModule: "better-sqlite3",
}),

View file

@ -81,7 +81,7 @@ importScripts("./worker.js")
async function buildDesktopPart({ version }) {
await runStep("Desktop: Esbuild", async () => {
await esbuild({
entryPoints: ["src/desktop/DesktopMain.ts"],
entryPoints: ["src/desktop/DesktopMain.ts", "src/desktop/db/sqlworker.ts"],
outdir: "./build/desktop",
// Why we bundle at the moment:
// - We need to include all the imports: we currently use some node_modules directly, without pre-bundling them like rest of libs we can't avoid it
@ -96,7 +96,7 @@ async function buildDesktopPart({ version }) {
environment: "electron",
dstPath: "./build/desktop/better_sqlite3.node",
platform: process.platform,
nativeBindingPath: "./better_sqlite3.node",
nativeBindingPath: "../better_sqlite3.node",
}),
keytarNativePlugin({
environment: "electron",

View file

@ -27,19 +27,32 @@ export interface Transport<OutgoingCommandType, IncomingCommandType> {
/**
* Queue transport for both WorkerClient and WorkerImpl
*/
export class WorkerTransport<OutgoingCommandType, IncomingCommandType> implements Transport<OutgoingCommandType, IncomingCommandType> {
_worker: Worker | DedicatedWorkerGlobalScope
constructor(worker: Worker | DedicatedWorkerGlobalScope) {
this._worker = worker
}
export class WebWorkerTransport<OutgoingCommandType, IncomingCommandType> implements Transport<OutgoingCommandType, IncomingCommandType> {
constructor(private readonly worker: Worker | DedicatedWorkerGlobalScope) {}
postMessage(message: Message<OutgoingCommandType>): void {
return this._worker.postMessage(message)
return downcast(this.worker).postMessage(message)
}
setMessageHandler(handler: (message: Message<IncomingCommandType>) => unknown) {
this._worker.onmessage = (ev: MessageEvent) => handler(downcast(ev.data))
this.worker.onmessage = (ev: any) => handler(downcast(ev.data))
}
}
type NodeWorkerPort<O, I> = {
postMessage: (msg: Message<O>) => void
on: (channel: "message", listener: (ev: Message<I>) => unknown) => unknown
}
export class NodeWorkerTransport<OutgoingCommandType, IncomingCommandType> implements Transport<OutgoingCommandType, IncomingCommandType> {
constructor(private readonly worker: NodeWorkerPort<OutgoingCommandType, IncomingCommandType>) {}
postMessage(message: Message<OutgoingCommandType>): void {
return this.worker.postMessage(message)
}
setMessageHandler(handler: (message: Message<IncomingCommandType>) => unknown) {
this.worker.on("message", (ev: Message<IncomingCommandType>) => handler(ev))
}
}
@ -115,7 +128,7 @@ export class MessageDispatcher<OutgoingRequestType extends string, IncomingReque
try {
this._transport.postMessage(msg)
} catch (e) {
console.log("error payload:", msg.id, msg.type)
console.log("error payload:", msg)
throw e
}
})

View file

@ -1,6 +1,6 @@
import { CryptoError } from "../common/error/CryptoError"
import type { Commands, Transport } from "../common/MessageDispatcher"
import { MessageDispatcher, Request, WorkerTransport } from "../common/MessageDispatcher"
import { MessageDispatcher, Request, WebWorkerTransport } from "../common/MessageDispatcher"
import { assertMainOrNode } from "../common/Env"
import type { IMainLocator } from "./MainLocator"
import { client } from "../../misc/ClientDetector"
@ -49,7 +49,7 @@ export class WorkerClient {
// Service worker has similar logic but it has luxury of knowing that it's served as sw.js.
const workerUrl = prefixWithoutFile + "/worker-bootstrap.js"
const worker = new Worker(workerUrl)
this._dispatcher = new MessageDispatcher(new WorkerTransport(worker), this.queueCommands(locator))
this._dispatcher = new MessageDispatcher(new WebWorkerTransport(worker), this.queueCommands(locator))
await this._dispatcher.postRequest(new Request("setup", [window.env, this.getInitialEntropy(), client.browserData()]))
worker.onerror = (e: any) => {

View file

@ -1,5 +1,5 @@
import type { Commands } from "../common/MessageDispatcher"
import { errorToObj, MessageDispatcher, Request, WorkerTransport } from "../common/MessageDispatcher"
import { errorToObj, MessageDispatcher, Request, WebWorkerTransport } from "../common/MessageDispatcher"
import { CryptoError } from "../common/error/CryptoError"
import { BookingFacade } from "./facades/lazy/BookingFacade.js"
import { NotAuthenticatedError } from "../common/error/RestError"
@ -102,7 +102,7 @@ export class WorkerImpl implements NativeInterface {
constructor(self: DedicatedWorkerGlobalScope) {
this._scope = self
this._dispatcher = new MessageDispatcher(new WorkerTransport(this._scope), this.queueCommands(this.exposedInterface))
this._dispatcher = new MessageDispatcher(new WebWorkerTransport(this._scope), this.queueCommands(this.exposedInterface))
}
async init(browserData: BrowserData): Promise<void> {

View file

@ -4,6 +4,8 @@
import { WorkerImpl } from "./WorkerImpl"
import { Logger, replaceNativeLogger } from "../common/Logger"
console.log("native app", self.nativeApp)
/**
* Receives the first message from the client and initializes the WorkerImpl to receive all future messages. Sends a response to the client on this first message.
*/

View file

@ -55,7 +55,7 @@ import { DesktopPostLoginActions } from "./DesktopPostLoginActions.js"
import { DesktopInterWindowEventFacade } from "./ipc/DesktopInterWindowEventFacade.js"
import { OfflineDbFactory, OfflineDbManager, PerWindowSqlCipherFacade } from "./db/PerWindowSqlCipherFacade.js"
import { SqlCipherFacade } from "../native/common/generatedipc/SqlCipherFacade.js"
import { DesktopSqlCipher } from "./DesktopSqlCipher.js"
import { WorkerSqlCipher } from "./db/WorkerSqlCipher.js"
import { lazyMemoized } from "@tutao/tutanota-utils"
import dns from "node:dns"
import { getConfigFile } from "./config/ConfigFile.js"
@ -158,11 +158,12 @@ async function createComponents(): Promise<Components> {
const offlineDbFactory: OfflineDbFactory = {
async create(userId: string, key: Uint8Array, retry: boolean = true): Promise<SqlCipherFacade> {
const db = new DesktopSqlCipher(buildOptions.sqliteNativePath, makeDbPath(userId), true)
const db = new WorkerSqlCipher(buildOptions.sqliteNativePath, makeDbPath(userId), true)
try {
await db.openDb(userId, key)
} catch (e) {
if (!retry) throw e
console.log("retrying")
await this.delete(userId)
return this.create(userId, key, false)
}

View file

@ -1,10 +1,10 @@
import { Database, default as Sqlite } from "better-sqlite3"
import { CryptoError } from "@tutao/tutanota-crypto"
import { mapNullable, uint8ArrayToBase64 } from "@tutao/tutanota-utils"
import { SqlCipherFacade } from "../native/common/generatedipc/SqlCipherFacade.js"
import { TaggedSqlValue, tagSqlObject, untagSqlValue } from "../api/worker/offline/SqlValue.js"
import { ProgrammingError } from "../api/common/error/ProgrammingError.js"
import { OfflineDbClosedError } from "../api/common/error/OfflineDbClosedError.js"
import { SqlCipherFacade } from "../../native/common/generatedipc/SqlCipherFacade.js"
import { OfflineDbClosedError } from "../../api/common/error/OfflineDbClosedError.js"
import { ProgrammingError } from "../../api/common/error/ProgrammingError.js"
import { TaggedSqlValue, tagSqlObject, untagSqlValue } from "../../api/worker/offline/SqlValue.js"
export class DesktopSqlCipher implements SqlCipherFacade {
private _db: Database | null = null

View file

@ -8,7 +8,7 @@ import { OfflineDbClosedError } from "../../api/common/error/OfflineDbClosedErro
const MAX_WAIT_FOR_DB_CLOSE_MS = 1000
export class PerWindowSqlCipherFacade implements SqlCipherFacade {
private state: { userId: string; db: SqlCipherFacade } | null = null
private state: { userId: string; db: Promise<SqlCipherFacade> } | null = null
constructor(private readonly manager: OfflineDbManager) {}
@ -18,7 +18,7 @@ export class PerWindowSqlCipherFacade implements SqlCipherFacade {
}
this.state = {
userId,
db: await this.manager.getOrCreateDb(userId, dbKey),
db: this.manager.getOrCreateDb(userId, dbKey),
}
}
@ -43,16 +43,16 @@ export class PerWindowSqlCipherFacade implements SqlCipherFacade {
await this.manager.deleteDb(userId)
}
get(query: string, params: ReadonlyArray<TaggedSqlValue>): Promise<Record<string, TaggedSqlValue> | null> {
return this.db().get(query, params)
async get(query: string, params: ReadonlyArray<TaggedSqlValue>): Promise<Record<string, TaggedSqlValue> | null> {
return (await this.db()).get(query, params)
}
all(query: string, params: ReadonlyArray<TaggedSqlValue>): Promise<ReadonlyArray<Record<string, TaggedSqlValue>>> {
return this.db().all(query, params)
async all(query: string, params: ReadonlyArray<TaggedSqlValue>): Promise<ReadonlyArray<Record<string, TaggedSqlValue>>> {
return (await this.db()).all(query, params)
}
run(query: string, params: ReadonlyArray<TaggedSqlValue>): Promise<void> {
return this.db().run(query, params)
async run(query: string, params: ReadonlyArray<TaggedSqlValue>): Promise<void> {
return (await this.db()).run(query, params)
}
/**
@ -72,11 +72,11 @@ export class PerWindowSqlCipherFacade implements SqlCipherFacade {
return this.manager.unlockRangesDbAccess(listId)
}
private db(): SqlCipherFacade {
private async db(): Promise<SqlCipherFacade> {
if (this.state == null) {
throw new OfflineDbClosedError()
}
return this.state.db
return await this.state.db
}
}

View file

@ -0,0 +1,54 @@
import { SqlCipherFacade } from "../../native/common/generatedipc/SqlCipherFacade.js"
import { TaggedSqlValue } from "../../api/worker/offline/SqlValue.js"
import { Worker } from "node:worker_threads"
import path from "node:path"
import { MessageDispatcher, NodeWorkerTransport, Request } from "../../api/common/MessageDispatcher.js"
import { SqlCipherCommand } from "./sqlworker.js"
const TAG = "[WorkerSqlCipher]"
/** impl for SqlCipherFacade that passes any requests to a node worker thread that's running the sqlite db for the given user id
* this code is running in the main thread of the node process. */
export class WorkerSqlCipher implements SqlCipherFacade {
private readonly dispatcher: MessageDispatcher<SqlCipherCommand, never>
constructor(private readonly nativeBindingPath: string, private readonly dbPath: string, private readonly integrityCheck: boolean) {
console.log("new sqlcipherworker")
const worker = new Worker(path.join(__dirname, "db", "sqlworker.js"), {
workerData: { nativeBindingPath, dbPath, integrityCheck },
})
this.dispatcher = new MessageDispatcher<SqlCipherCommand, never>(new NodeWorkerTransport<SqlCipherCommand, never>(worker), {})
}
async all(query: string, params: ReadonlyArray<TaggedSqlValue>): Promise<ReadonlyArray<Record<string, TaggedSqlValue>>> {
return this.dispatcher.postRequest(new Request("all", [query, params]))
}
async closeDb(): Promise<void> {
return this.dispatcher.postRequest(new Request("closeDb", []))
}
async deleteDb(userId: string): Promise<void> {
return this.dispatcher.postRequest(new Request("deleteDb", []))
}
async get(query: string, params: ReadonlyArray<TaggedSqlValue>): Promise<Record<string, TaggedSqlValue> | null> {
return this.dispatcher.postRequest(new Request("get", [query, params]))
}
async lockRangesDbAccess(listId: string): Promise<void> {
return this.dispatcher.postRequest(new Request("lockRangesDbAccess", [listId]))
}
async openDb(userId: string, dbKey: Uint8Array): Promise<void> {
return this.dispatcher.postRequest(new Request("openDb", [userId, dbKey]))
}
async run(query: string, params: ReadonlyArray<TaggedSqlValue>): Promise<void> {
return this.dispatcher.postRequest(new Request("run", [query, params]))
}
async unlockRangesDbAccess(listId: string): Promise<void> {
return this.dispatcher.postRequest(new Request("unlockRangesDbAccess", [listId]))
}
}

View file

@ -0,0 +1,53 @@
/**
* entry point to the sqlite worker threads. one is created for each user's offline database.
* it's possible for multiple windows to access the same sqlite database.
* */
import { parentPort, workerData } from "node:worker_threads"
import { DesktopSqlCipher } from "./DesktopSqlCipher.js"
import { MessageDispatcher, NodeWorkerTransport, Request } from "../../api/common/MessageDispatcher.js"
import { SqlCipherFacade } from "../../native/common/generatedipc/SqlCipherFacade.js"
console.log("hello from worker", workerData)
export type SqlCipherCommand = keyof SqlCipherFacade | "exit"
if (parentPort != null) {
const sqlCipherFacade = new DesktopSqlCipher(workerData.nativeBindingPath, workerData.dbPath, workerData.integrityCheck)
const workerTransport = new MessageDispatcher(new NodeWorkerTransport<never, SqlCipherCommand>(parentPort), {
all(msg: Request<"all">): Promise<any> {
return sqlCipherFacade.all(msg.args[0], msg.args[1])
},
closeDb(_: Request<"closeDb">): Promise<any> {
return sqlCipherFacade.closeDb()
},
deleteDb(msg: Request<"openDb" | "closeDb" | "deleteDb" | "run" | "get" | "all" | "lockRangesDbAccess" | "unlockRangesDbAccess">): Promise<any> {
return sqlCipherFacade.deleteDb(msg.args[0])
},
get(msg: Request<"openDb" | "closeDb" | "deleteDb" | "run" | "get" | "all" | "lockRangesDbAccess" | "unlockRangesDbAccess">): Promise<any> {
return sqlCipherFacade.get(msg.args[0], msg.args[1])
},
lockRangesDbAccess(
msg: Request<"openDb" | "closeDb" | "deleteDb" | "run" | "get" | "all" | "lockRangesDbAccess" | "unlockRangesDbAccess">,
): Promise<any> {
return sqlCipherFacade.lockRangesDbAccess(msg.args[0])
},
openDb(msg: Request<"openDb" | "closeDb" | "deleteDb" | "run" | "get" | "all" | "lockRangesDbAccess" | "unlockRangesDbAccess">): Promise<any> {
return sqlCipherFacade.openDb(msg.args[0], msg.args[1])
},
run(msg: Request<"openDb" | "closeDb" | "deleteDb" | "run" | "get" | "all" | "lockRangesDbAccess" | "unlockRangesDbAccess">): Promise<any> {
return sqlCipherFacade.run(msg.args[0], msg.args[1])
},
unlockRangesDbAccess(
msg: Request<"openDb" | "closeDb" | "deleteDb" | "run" | "get" | "all" | "lockRangesDbAccess" | "unlockRangesDbAccess">,
): Promise<any> {
return sqlCipherFacade.unlockRangesDbAccess(msg.args[0])
},
exit(msg: Request<"openDb" | "closeDb" | "deleteDb" | "run" | "get" | "all" | "lockRangesDbAccess" | "unlockRangesDbAccess">): Promise<any> {
process.exit()
},
})
console.log("set up sql cipher")
} else {
process.exit(0)
}

View file

@ -178,7 +178,7 @@ export async function reloginForExpiredSession() {
if (loginDialogActive) {
return
}
const { logins, loginFacade, secondFactorHandler, credentialsProvider, sqlCipherFacade, cacheStorage } = locator
const { logins, loginFacade, secondFactorHandler, credentialsProvider, cacheStorage } = locator
// Make sure that partial login part is complete before we will try to make a new session.
// Otherwise we run into a race condition where login failure arrives before we initialize userController.
await logins.waitForPartialLogin()

View file

@ -1,6 +1,6 @@
#!/usr/bin/env bash
if [[ $OSTYPE = darwin* ]]; then
ELECTRON_ENABLE_SECURITY_WARNINGS=TRUE ./node_modules/.bin/electron --inspect=5858 ./build/ $1
ELECTRON_ENABLE_SECURITY_WARNINGS=TRUE ./node_modules/.bin/electron --inspect-brk=5858 ./build/ $1
else
ELECTRON_ENABLE_SECURITY_WARNINGS=TRUE ./node_modules/.bin/electron --inspect=5858 ./build/ $1
ELECTRON_ENABLE_SECURITY_WARNINGS=TRUE ./node_modules/.bin/electron --inspect-brk=5858 ./build/ $1
fi

View file

@ -24,7 +24,6 @@ import {
} from "../../../../../src/api/entities/tutanota/TypeRefs.js"
import { OfflineStorageMigrator } from "../../../../../src/api/worker/offline/OfflineStorageMigrator.js"
import { InterWindowEventFacadeSendDispatcher } from "../../../../../src/native/common/generatedipc/InterWindowEventFacadeSendDispatcher.js"
import { DesktopSqlCipher } from "../../../../../src/desktop/DesktopSqlCipher.js"
import * as fs from "node:fs"
import { untagSqlObject } from "../../../../../src/api/worker/offline/SqlValue.js"
import { MailFolderType } from "../../../../../src/api/common/TutanotaConstants.js"
@ -34,6 +33,7 @@ import { Type as TypeId } from "../../../../../src/api/common/EntityConstants.js
import { expandId } from "../../../../../src/api/worker/rest/DefaultEntityRestCache.js"
import { WorkerImpl } from "../../../../../src/api/worker/WorkerImpl.js"
import { createUser, UserTypeRef } from "../../../../../src/api/entities/sys/TypeRefs.js"
import { DesktopSqlCipher } from "../../../../../src/desktop/db/DesktopSqlCipher.js"
function incrementId(id: Id, ms: number) {
const timestamp = generatedIdToTimestamp(id)

View file

@ -65,7 +65,7 @@ const offlineDatabaseTestKey = new Uint8Array([3957386659, 354339016, 3786337319
async function getOfflineStorage(userId: Id): Promise<CacheStorage> {
const { OfflineDbManager, PerWindowSqlCipherFacade } = await import("../../../../../src/desktop/db/PerWindowSqlCipherFacade.js")
const { DesktopSqlCipher } = await import("../../../../../src/desktop/DesktopSqlCipher.js")
const { DesktopSqlCipher } = await import("../../../../../src/desktop/db/DesktopSqlCipher.js")
const odbManager = new (class extends OfflineDbManager {
async getOrCreateDb(userId: string, key: Uint8Array) {

View file

@ -2,8 +2,8 @@ import o from "@tutao/otest"
import { object, when } from "testdouble"
import { verify } from "@tutao/tutanota-test-utils"
import { OfflineDbFactory, OfflineDbManager } from "../../../../src/desktop/db/PerWindowSqlCipherFacade.js"
import { DesktopSqlCipher } from "../../../../src/desktop/DesktopSqlCipher.js"
import { delay } from "@tutao/tutanota-utils"
import { DesktopSqlCipher } from "../../../../src/desktop/db/DesktopSqlCipher.js"
o.spec("OfflineDbFacade", function () {
let factory: OfflineDbFactory