warcwriter operations result in the write promise being put on a queue.

ensure the process is awaited and any rejections are logged
if logdetails is provided, also log successful writes (only for resource records)
This commit is contained in:
Ilya Kreymer 2024-04-11 17:00:52 -07:00
parent b5f3238c29
commit b3a5cc43fa
7 changed files with 80 additions and 45 deletions

View file

@ -31,7 +31,7 @@ import {
import { ScreenCaster, WSTransport } from "./util/screencaster.js";
import { Screenshots } from "./util/screenshots.js";
import { initRedis } from "./util/redis.js";
import { logger, formatErr } from "./util/logger.js";
import { logger, formatErr, LogDetails } from "./util/logger.js";
import {
WorkerOpts,
WorkerState,
@ -89,9 +89,6 @@ const POST_CRAWL_STATES = [
"generate-warc",
];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type LogDetails = Record<string, any>;
type PageEntry = {
id: string;
url: string;

View file

@ -748,12 +748,15 @@ export class ReplayCrawler extends Crawler {
(state as ComparisonPageState).comparison = comparison;
}
this.infoWriter?.writeNewResourceRecord({
buffer: new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)),
resourceType: "pageinfo",
contentType: "application/json",
url: pageInfo.url,
});
this.infoWriter?.writeNewResourceRecord(
{
buffer: new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)),
resourceType: "pageinfo",
contentType: "application/json",
url: pageInfo.url,
},
{ type: "pageinfo", url: pageInfo.url },
);
this.pageInfos.delete(page);
}

View file

@ -9,6 +9,9 @@ Object.defineProperty(RegExp.prototype, "toJSON", {
value: RegExp.prototype.toString,
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type LogDetails = Record<string, any>;
// ===========================================================================
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function formatErr(e: unknown): Record<string, any> {

View file

@ -725,12 +725,15 @@ export class Recorder {
const url = this.pageUrl;
this.writer.writeNewResourceRecord({
buffer: new TextEncoder().encode(text),
resourceType: "pageinfo",
contentType: "application/json",
url,
});
this.writer.writeNewResourceRecord(
{
buffer: new TextEncoder().encode(text),
resourceType: "pageinfo",
contentType: "application/json",
url,
},
{ type: "pageinfo", url },
);
return this.pageInfo.ts;
}

View file

@ -74,15 +74,18 @@ export class Screenshots {
if (state && screenshotType === "view") {
state.screenshotView = screenshotBuffer;
}
this.writer.writeNewResourceRecord({
buffer: screenshotBuffer,
resourceType: screenshotType,
contentType: "image/" + options.type,
url: this.url,
});
logger.info(
`Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.writer.filename}`,
this.writer.writeNewResourceRecord(
{
buffer: screenshotBuffer,
resourceType: screenshotType,
contentType: "image/" + options.type,
url: this.url,
},
{ type: screenshotType, url: this.url, filename: this.writer.filename },
);
// logger.info(
// `Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.writer.filename}`,
// );
} catch (e) {
logger.error(
"Taking screenshot failed",
@ -106,15 +109,18 @@ export class Screenshots {
// 16:9 thumbnail
.resize(640, 360)
.toBuffer();
this.writer.writeNewResourceRecord({
buffer: thumbnailBuffer,
resourceType: screenshotType,
contentType: "image/" + options.type,
url: this.url,
});
logger.info(
`Screenshot (type: thumbnail) for ${this.url} written to ${this.writer.filename}`,
this.writer.writeNewResourceRecord(
{
buffer: thumbnailBuffer,
resourceType: screenshotType,
contentType: "image/" + options.type,
url: this.url,
},
{ type: screenshotType, url: this.url, filename: this.writer.filename },
);
// logger.info(
// `Screenshot (type: thumbnail) for ${this.url} written to ${this.writer.filename}`,
// );
} catch (e) {
logger.error(
"Taking screenshot failed",

View file

@ -43,15 +43,18 @@ export abstract class BaseTextExtract {
return { changed: false, text };
}
if (saveToWarc) {
this.writer.writeNewResourceRecord({
buffer: new TextEncoder().encode(text),
resourceType,
contentType: "text/plain",
url: this.url,
});
logger.debug(
`Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.writer.filename}`,
this.writer.writeNewResourceRecord(
{
buffer: new TextEncoder().encode(text),
resourceType,
contentType: "text/plain",
url: this.url,
},
{ type: resourceType, url: this.url, filename: this.writer.filename },
);
// logger.debug(
// `Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.writer.filename}`,
// );
}
this.lastText = text;

View file

@ -4,7 +4,7 @@ import path from "path";
import { CDXIndexer, WARCRecord } from "warcio";
import { WARCSerializer } from "warcio/node";
import { logger, formatErr } from "./logger.js";
import { logger, formatErr, LogDetails } from "./logger.js";
import type { IndexerOffsetLength } from "warcio";
import { timestampNow } from "./timing.js";
import PQueue from "p-queue";
@ -118,7 +118,7 @@ export class WARCWriter implements IndexerOffsetLength {
requestRecord: WARCRecord,
responseSerializer: WARCSerializer | undefined = undefined,
) {
this.warcQ.add(() =>
this.addToQueue(
this._writeRecordPair(responseRecord, requestRecord, responseSerializer),
);
}
@ -150,8 +150,28 @@ export class WARCWriter implements IndexerOffsetLength {
this._writeCDX(requestRecord);
}
private addToQueue(
promise: Promise<void>,
details: LogDetails | null = null,
) {
this.warcQ.add(async () => {
try {
await promise;
if (details) {
logger.debug("WARC Record Written", details, "writer");
}
} catch (e) {
logger.error(
"WARC Record Write Failed",
{ ...details, ...formatErr(e) },
"writer",
);
}
});
}
writeSingleRecord(record: WARCRecord) {
this.warcQ.add(() => this._writeSingleRecord(record));
this.addToQueue(this._writeSingleRecord(record));
}
private async _writeSingleRecord(record: WARCRecord) {
@ -164,8 +184,8 @@ export class WARCWriter implements IndexerOffsetLength {
this._writeCDX(record);
}
writeNewResourceRecord(record: ResourceRecordData) {
this.warcQ.add(() => this._writeNewResourceRecord(record));
writeNewResourceRecord(record: ResourceRecordData, details: LogDetails) {
this.addToQueue(this._writeNewResourceRecord(record), details);
}
private async _writeNewResourceRecord({