From b3a5cc43fafc3bbf36a75b7bc181d15dbe93aeb9 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Thu, 11 Apr 2024 17:00:52 -0700 Subject: [PATCH] warcwriter operations result in the write promise being put on a queue. ensure the process is awaited and any rejections are logged if logdetails is provided, also log successful writes (only for resource records) --- src/crawler.ts | 5 +---- src/replaycrawler.ts | 15 +++++++++------ src/util/logger.ts | 3 +++ src/util/recorder.ts | 15 +++++++++------ src/util/screenshots.ts | 38 ++++++++++++++++++++++---------------- src/util/textextract.ts | 19 +++++++++++-------- src/util/warcwriter.ts | 30 +++++++++++++++++++++++++----- 7 files changed, 80 insertions(+), 45 deletions(-) diff --git a/src/crawler.ts b/src/crawler.ts index 58f8f922..8229542c 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -31,7 +31,7 @@ import { import { ScreenCaster, WSTransport } from "./util/screencaster.js"; import { Screenshots } from "./util/screenshots.js"; import { initRedis } from "./util/redis.js"; -import { logger, formatErr } from "./util/logger.js"; +import { logger, formatErr, LogDetails } from "./util/logger.js"; import { WorkerOpts, WorkerState, @@ -89,9 +89,6 @@ const POST_CRAWL_STATES = [ "generate-warc", ]; -// eslint-disable-next-line @typescript-eslint/no-explicit-any -type LogDetails = Record; - type PageEntry = { id: string; url: string; diff --git a/src/replaycrawler.ts b/src/replaycrawler.ts index 0e84b295..7f6eb5ea 100644 --- a/src/replaycrawler.ts +++ b/src/replaycrawler.ts @@ -748,12 +748,15 @@ export class ReplayCrawler extends Crawler { (state as ComparisonPageState).comparison = comparison; } - this.infoWriter?.writeNewResourceRecord({ - buffer: new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)), - resourceType: "pageinfo", - contentType: "application/json", - url: pageInfo.url, - }); + this.infoWriter?.writeNewResourceRecord( + { + buffer: new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)), + resourceType: "pageinfo", + contentType: "application/json", + url: pageInfo.url, + }, + { type: "pageinfo", url: pageInfo.url }, + ); this.pageInfos.delete(page); } diff --git a/src/util/logger.ts b/src/util/logger.ts index 1ae95d37..9f5ad309 100644 --- a/src/util/logger.ts +++ b/src/util/logger.ts @@ -9,6 +9,9 @@ Object.defineProperty(RegExp.prototype, "toJSON", { value: RegExp.prototype.toString, }); +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type LogDetails = Record; + // =========================================================================== // eslint-disable-next-line @typescript-eslint/no-explicit-any export function formatErr(e: unknown): Record { diff --git a/src/util/recorder.ts b/src/util/recorder.ts index df71899f..d1b6175a 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -725,12 +725,15 @@ export class Recorder { const url = this.pageUrl; - this.writer.writeNewResourceRecord({ - buffer: new TextEncoder().encode(text), - resourceType: "pageinfo", - contentType: "application/json", - url, - }); + this.writer.writeNewResourceRecord( + { + buffer: new TextEncoder().encode(text), + resourceType: "pageinfo", + contentType: "application/json", + url, + }, + { type: "pageinfo", url }, + ); return this.pageInfo.ts; } diff --git a/src/util/screenshots.ts b/src/util/screenshots.ts index 693779b7..2ad3c939 100644 --- a/src/util/screenshots.ts +++ b/src/util/screenshots.ts @@ -74,15 +74,18 @@ export class Screenshots { if (state && screenshotType === "view") { state.screenshotView = screenshotBuffer; } - this.writer.writeNewResourceRecord({ - buffer: screenshotBuffer, - resourceType: screenshotType, - contentType: "image/" + options.type, - url: this.url, - }); - logger.info( - `Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.writer.filename}`, + this.writer.writeNewResourceRecord( + { + buffer: screenshotBuffer, + resourceType: screenshotType, + contentType: "image/" + options.type, + url: this.url, + }, + { type: screenshotType, url: this.url, filename: this.writer.filename }, ); + // logger.info( + // `Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.writer.filename}`, + // ); } catch (e) { logger.error( "Taking screenshot failed", @@ -106,15 +109,18 @@ export class Screenshots { // 16:9 thumbnail .resize(640, 360) .toBuffer(); - this.writer.writeNewResourceRecord({ - buffer: thumbnailBuffer, - resourceType: screenshotType, - contentType: "image/" + options.type, - url: this.url, - }); - logger.info( - `Screenshot (type: thumbnail) for ${this.url} written to ${this.writer.filename}`, + this.writer.writeNewResourceRecord( + { + buffer: thumbnailBuffer, + resourceType: screenshotType, + contentType: "image/" + options.type, + url: this.url, + }, + { type: screenshotType, url: this.url, filename: this.writer.filename }, ); + // logger.info( + // `Screenshot (type: thumbnail) for ${this.url} written to ${this.writer.filename}`, + // ); } catch (e) { logger.error( "Taking screenshot failed", diff --git a/src/util/textextract.ts b/src/util/textextract.ts index 113f7b53..438db172 100644 --- a/src/util/textextract.ts +++ b/src/util/textextract.ts @@ -43,15 +43,18 @@ export abstract class BaseTextExtract { return { changed: false, text }; } if (saveToWarc) { - this.writer.writeNewResourceRecord({ - buffer: new TextEncoder().encode(text), - resourceType, - contentType: "text/plain", - url: this.url, - }); - logger.debug( - `Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.writer.filename}`, + this.writer.writeNewResourceRecord( + { + buffer: new TextEncoder().encode(text), + resourceType, + contentType: "text/plain", + url: this.url, + }, + { type: resourceType, url: this.url, filename: this.writer.filename }, ); + // logger.debug( + // `Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.writer.filename}`, + // ); } this.lastText = text; diff --git a/src/util/warcwriter.ts b/src/util/warcwriter.ts index bdff98c5..4bf567ee 100644 --- a/src/util/warcwriter.ts +++ b/src/util/warcwriter.ts @@ -4,7 +4,7 @@ import path from "path"; import { CDXIndexer, WARCRecord } from "warcio"; import { WARCSerializer } from "warcio/node"; -import { logger, formatErr } from "./logger.js"; +import { logger, formatErr, LogDetails } from "./logger.js"; import type { IndexerOffsetLength } from "warcio"; import { timestampNow } from "./timing.js"; import PQueue from "p-queue"; @@ -118,7 +118,7 @@ export class WARCWriter implements IndexerOffsetLength { requestRecord: WARCRecord, responseSerializer: WARCSerializer | undefined = undefined, ) { - this.warcQ.add(() => + this.addToQueue( this._writeRecordPair(responseRecord, requestRecord, responseSerializer), ); } @@ -150,8 +150,28 @@ export class WARCWriter implements IndexerOffsetLength { this._writeCDX(requestRecord); } + private addToQueue( + promise: Promise, + details: LogDetails | null = null, + ) { + this.warcQ.add(async () => { + try { + await promise; + if (details) { + logger.debug("WARC Record Written", details, "writer"); + } + } catch (e) { + logger.error( + "WARC Record Write Failed", + { ...details, ...formatErr(e) }, + "writer", + ); + } + }); + } + writeSingleRecord(record: WARCRecord) { - this.warcQ.add(() => this._writeSingleRecord(record)); + this.addToQueue(this._writeSingleRecord(record)); } private async _writeSingleRecord(record: WARCRecord) { @@ -164,8 +184,8 @@ export class WARCWriter implements IndexerOffsetLength { this._writeCDX(record); } - writeNewResourceRecord(record: ResourceRecordData) { - this.warcQ.add(() => this._writeNewResourceRecord(record)); + writeNewResourceRecord(record: ResourceRecordData, details: LogDetails) { + this.addToQueue(this._writeNewResourceRecord(record), details); } private async _writeNewResourceRecord({