Unify WARC writing + CDXJ indexing into single class (#507)

Previously, there was the main WARCWriter as well as utility
WARCResourceWriter that was used for screenshots, text, pageinfo and
only generated resource records. This separate WARC writing path did not
generate CDX, but used appendFile() to append new WARC records to an
existing WARC.

This change removes WARCResourceWriter and ensures all WARC writing is done through a single WARCWriter, which uses a writable stream to append records, and can also generate CDX on the fly. This change is a
pre-requisite to the js-wacz conversion (#484) since all WARCs need to
have generated CDX.

---------
Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
This commit is contained in:
Ilya Kreymer 2024-03-26 14:54:27 -07:00 committed by GitHub
parent 01c4139aa7
commit 0ad10a8dee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 198 additions and 190 deletions

View file

@ -61,6 +61,7 @@ import { CDPSession, Frame, HTTPRequest, Page } from "puppeteer-core";
import { Recorder } from "./util/recorder.js";
import { SitemapReader } from "./util/sitemapper.js";
import { ScopedSeed } from "./util/seeds.js";
import { WARCWriter } from "./util/warcwriter.js";
const HTTPS_AGENT = new HTTPSAgent({
rejectUnauthorized: false,
@ -149,6 +150,11 @@ export class Crawler {
pagesFile: string;
archivesDir: string;
tempdir: string;
tempCdxDir: string;
screenshotWriter: WARCWriter | null;
textWriter: WARCWriter | null;
blockRules: BlockRules | null;
adBlockRules: AdBlockRules | null;
@ -177,8 +183,6 @@ export class Crawler {
maxHeapUsed = 0;
maxHeapTotal = 0;
warcPrefix: string;
driver!: (opts: {
page: Page;
data: PageState;
@ -271,6 +275,11 @@ export class Crawler {
// archives dir
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");
this.screenshotWriter = null;
this.textWriter = null;
this.blockRules = null;
this.adBlockRules = null;
@ -288,12 +297,6 @@ export class Crawler {
this.customBehaviors = "";
this.browser = new Browser();
this.warcPrefix = process.env.WARC_PREFIX || this.params.warcPrefix || "";
if (this.warcPrefix) {
this.warcPrefix += "-" + this.crawlId + "-";
}
}
protected parseArgs() {
@ -447,14 +450,10 @@ export class Crawler {
subprocesses.push(this.launchRedis());
//const initRes = child_process.spawnSync("wb-manager", ["init", this.params.collection], {cwd: this.params.cwd});
//if (initRes.status) {
// logger.info("wb-manager init failed, collection likely already exists");
//}
await fsp.mkdir(this.logDir, { recursive: true });
await fsp.mkdir(this.archivesDir, { recursive: true });
await fsp.mkdir(this.tempdir, { recursive: true });
await fsp.mkdir(this.tempCdxDir, { recursive: true });
this.logFH = fs.createWriteStream(this.logFilename);
logger.setExternalLogStream(this.logFH);
@ -514,6 +513,13 @@ export class Crawler {
{ detached: RUN_DETACHED },
);
}
if (this.params.screenshot) {
this.screenshotWriter = this.createExtraResourceWarcWriter("screenshots");
}
if (this.params.text) {
this.textWriter = this.createExtraResourceWarcWriter("text");
}
}
extraChromeArgs() {
@ -812,16 +818,15 @@ self.__bx_behaviors.selectMainBehavior();
const logDetails = { page: url, workerid };
if (this.params.screenshot) {
if (this.params.screenshot && this.screenshotWriter) {
if (!data.isHTMLPage) {
logger.debug("Skipping screenshots for non-HTML page", logDetails);
}
const screenshots = new Screenshots({
warcPrefix: this.warcPrefix,
browser: this.browser,
page,
url,
directory: this.archivesDir,
writer: this.screenshotWriter,
});
if (this.params.screenshot.includes("view")) {
await screenshots.take("view", saveOutput ? data : null);
@ -836,11 +841,10 @@ self.__bx_behaviors.selectMainBehavior();
let textextract = null;
if (data.isHTMLPage) {
if (data.isHTMLPage && this.textWriter) {
textextract = new TextExtractViaSnapshot(cdp, {
warcPrefix: this.warcPrefix,
writer: this.textWriter,
url,
directory: this.archivesDir,
skipDocs: this.skipTextDocs,
});
const { text } = await textextract.extractAndStoreText(
@ -1151,6 +1155,7 @@ self.__bx_behaviors.selectMainBehavior();
if (this.interrupted) {
await this.browser.close();
await closeWorkers(0);
await this.closeFiles();
await this.setStatusAndExit(13, "interrupted");
} else {
await this.setStatusAndExit(0, "done");
@ -1298,6 +1303,8 @@ self.__bx_behaviors.selectMainBehavior();
await this.pagesFH.close();
}
await this.closeFiles();
await this.writeStats();
// if crawl has been stopped, mark as final exit for post-crawl tasks
@ -1308,6 +1315,15 @@ self.__bx_behaviors.selectMainBehavior();
await this.postCrawl();
}
async closeFiles() {
if (this.textWriter) {
await this.textWriter.flush();
}
if (this.screenshotWriter) {
await this.screenshotWriter.flush();
}
}
protected async _addInitialSeeds() {
for (let i = 0; i < this.params.scopedSeeds.length; i++) {
const seed = this.params.scopedSeeds[i];
@ -2368,15 +2384,56 @@ self.__bx_behaviors.selectMainBehavior();
}
}
getWarcPrefix(defaultValue = "") {
let warcPrefix =
process.env.WARC_PREFIX || this.params.warcPrefix || defaultValue;
if (warcPrefix) {
warcPrefix += "-" + this.crawlId + "-";
}
return warcPrefix;
}
createExtraResourceWarcWriter(resourceName: string, gzip = true) {
const filenameBase = `${this.getWarcPrefix()}${resourceName}`;
return this.createWarcWriter(filenameBase, gzip, { resourceName });
}
createWarcWriter(
filenameBase: string,
gzip: boolean,
logDetails: Record<string, string>,
) {
const filenameTemplate = `${filenameBase}.warc${gzip ? ".gz" : ""}`;
return new WARCWriter({
archivesDir: this.archivesDir,
tempCdxDir: this.tempCdxDir,
filenameTemplate,
rolloverSize: this.params.rolloverSize,
gzip,
logDetails,
});
}
createRecorder(id: number): Recorder | null {
if (!this.recording) {
return null;
}
const filenameBase = `${this.getWarcPrefix("rec")}$ts-${id}`;
const writer = this.createWarcWriter(filenameBase, true, {
id: id.toString(),
});
const res = new Recorder({
workerid: id,
collDir: this.collDir,
crawler: this,
writer,
tempdir: this.tempdir,
});
this.browser.recorders.push(res);

View file

@ -16,7 +16,6 @@ import { ZipRangeReader } from "@webrecorder/wabac/src/wacz/ziprangereader.js";
import { createLoader } from "@webrecorder/wabac/src/blockloaders.js";
import { AsyncIterReader } from "warcio";
import { WARCResourceWriter } from "./util/warcresourcewriter.js";
import { parseArgs } from "./util/argParser.js";
import { PNG } from "pngjs";
@ -25,6 +24,7 @@ import pixelmatch from "pixelmatch";
import levenshtein from "js-levenshtein";
import { MAX_URL_LENGTH } from "./util/reqresp.js";
import { openAsBlob } from "fs";
import { WARCWriter } from "./util/warcwriter.js";
// RWP Replay Prefix
const REPLAY_PREFIX = "http://localhost:9990/replay/w/replay/";
@ -67,6 +67,7 @@ export class ReplayCrawler extends Crawler {
qaSource: string;
pageInfos: Map<Page, ReplayPageInfoRecord>;
infoWriter: WARCWriter | null;
reloadTimeouts: WeakMap<Page, NodeJS.Timeout>;
@ -98,6 +99,14 @@ export class ReplayCrawler extends Crawler {
this.params.serviceWorker = "enabled";
this.reloadTimeouts = new WeakMap<Page, NodeJS.Timeout>();
this.infoWriter = null;
}
async bootstrap(): Promise<void> {
await super.bootstrap();
this.infoWriter = this.createExtraResourceWarcWriter("info");
}
protected parseArgs() {
@ -666,18 +675,13 @@ export class ReplayCrawler extends Crawler {
(state as ComparisonPageState).comparison = comparison;
}
const writer = new WARCResourceWriter({
await this.infoWriter?.writeNewResourceRecord({
buffer: new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)),
resourceType: "pageinfo",
contentType: "application/json",
url: pageInfo.url,
directory: this.archivesDir,
warcPrefix: this.warcPrefix,
date: new Date(),
warcName: "info.warc.gz",
});
await writer.writeBufferToWARC(
new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)),
"pageinfo",
"application/json",
);
this.pageInfos.delete(page);
}
}

View file

@ -1,6 +1,4 @@
import fs from "fs";
import path from "path";
import os from "os";
import { v4 as uuidv4 } from "uuid";
@ -24,7 +22,6 @@ import { WARCWriter } from "./warcwriter.js";
import { RedisCrawlState, WorkerId } from "./state.js";
import { CDPSession, Protocol } from "puppeteer-core";
import { Crawler } from "../crawler.js";
import { WARCResourceWriter } from "./warcresourcewriter.js";
const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000;
const MAX_BROWSER_TEXT_FETCH_SIZE = 25_000_000;
@ -70,7 +67,6 @@ export type PageInfoRecord = {
// =================================================================
export class Recorder {
workerid: WorkerId;
collDir: string;
crawler: Crawler;
@ -94,9 +90,7 @@ export class Recorder {
allowFull206 = false;
archivesDir: string;
tempdir: string;
tempCdxDir: string;
gzip = true;
@ -107,46 +101,26 @@ export class Recorder {
constructor({
workerid,
collDir,
writer,
crawler,
tempdir,
}: {
workerid: WorkerId;
collDir: string;
writer: WARCWriter;
crawler: Crawler;
tempdir: string;
}) {
this.workerid = workerid;
this.crawler = crawler;
this.crawlState = crawler.crawlState;
this.writer = writer;
this.tempdir = tempdir;
this.warcQ = new PQueue({ concurrency: 1 });
this.fetcherQ = new PQueue({ concurrency: 1 });
this.collDir = collDir;
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");
fs.mkdirSync(this.tempdir, { recursive: true });
fs.mkdirSync(this.archivesDir, { recursive: true });
// fs.mkdirSync(this.tempCdxDir, { recursive: true });
const prefix =
process.env.WARC_PREFIX || crawler.params.warcPrefix || "rec";
const crawlId = process.env.CRAWL_ID || os.hostname();
const filenameTemplate = `${prefix}-${crawlId}-$ts-${this.workerid}.warc${
this.gzip ? ".gz" : ""
}`;
this.writer = new WARCWriter({
archivesDir: this.archivesDir,
// tempCdxDir: this.tempCdxDir,
filenameTemplate,
rolloverSize: crawler.params.rolloverSize,
gzip: this.gzip,
logDetails: this.logDetails,
});
}
async onCreatePage({ cdp }: { cdp: CDPSession }) {
@ -733,18 +707,19 @@ export class Recorder {
}
}
async writePageInfoRecord() {
writePageInfoRecord() {
const text = JSON.stringify(this.pageInfo, null, 2);
const resourceRecord = await WARCResourceWriter.createResourceRecord(
new TextEncoder().encode(text),
"pageinfo",
"application/json",
this.pageUrl,
new Date(),
);
const url = this.pageUrl;
this.warcQ.add(() => this.writer.writeSingleRecord(resourceRecord));
this.warcQ.add(() =>
this.writer.writeNewResourceRecord({
buffer: new TextEncoder().encode(text),
resourceType: "pageinfo",
contentType: "application/json",
url,
}),
);
return this.pageInfo.ts;
}

View file

@ -1,10 +1,10 @@
import sharp from "sharp";
import { WARCResourceWriter } from "./warcresourcewriter.js";
import { logger, formatErr } from "./logger.js";
import { Browser } from "./browser.js";
import { Page } from "puppeteer-core";
import { PageState } from "./state.js";
import { WARCWriter } from "./warcwriter.js";
// ============================================================================
@ -42,18 +42,20 @@ export type ScreenshotOpts = {
browser: Browser;
page: Page;
url: string;
directory: string;
warcPrefix: string;
writer: WARCWriter;
};
export class Screenshots extends WARCResourceWriter {
export class Screenshots {
browser: Browser;
page: Page;
url: string;
writer: WARCWriter;
constructor(opts: ScreenshotOpts) {
super({ ...opts, warcName: "screenshots.warc.gz" });
this.browser = opts.browser;
this.page = opts.page;
constructor({ browser, page, writer, url }: ScreenshotOpts) {
this.browser = browser;
this.page = page;
this.url = url;
this.writer = writer;
}
async take(
@ -72,13 +74,14 @@ export class Screenshots extends WARCResourceWriter {
if (state && screenshotType === "view") {
state.screenshotView = screenshotBuffer;
}
await this.writeBufferToWARC(
screenshotBuffer,
screenshotType,
"image/" + options.type,
);
await this.writer.writeNewResourceRecord({
buffer: screenshotBuffer,
resourceType: screenshotType,
contentType: "image/" + options.type,
url: this.url,
});
logger.info(
`Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.warcName}`,
`Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.writer.filename}`,
);
} catch (e) {
logger.error(
@ -103,13 +106,14 @@ export class Screenshots extends WARCResourceWriter {
// 16:9 thumbnail
.resize(640, 360)
.toBuffer();
await this.writeBufferToWARC(
thumbnailBuffer,
screenshotType,
"image/" + options.type,
);
await this.writer.writeNewResourceRecord({
buffer: thumbnailBuffer,
resourceType: screenshotType,
contentType: "image/" + options.type,
url: this.url,
});
logger.info(
`Screenshot (type: thumbnail) for ${this.url} written to ${this.warcName}`,
`Screenshot (type: thumbnail) for ${this.url} written to ${this.writer.filename}`,
);
} catch (e) {
logger.error(

View file

@ -1,26 +1,28 @@
import { WARCResourceWriter } from "./warcresourcewriter.js";
import { logger } from "./logger.js";
import { CDPSession, Protocol } from "puppeteer-core";
import { WARCWriter } from "./warcwriter.js";
// ============================================================================
type TextExtractOpts = {
url: string;
directory: string;
warcPrefix: string;
writer: WARCWriter;
skipDocs: number;
};
// ============================================================================
export abstract class BaseTextExtract extends WARCResourceWriter {
export abstract class BaseTextExtract {
cdp: CDPSession;
lastText: string | null = null;
text: string | null = null;
skipDocs: number = 0;
writer: WARCWriter;
url: string;
constructor(cdp: CDPSession, opts: TextExtractOpts) {
super({ ...opts, warcName: "text.warc.gz" });
constructor(cdp: CDPSession, { writer, skipDocs, url }: TextExtractOpts) {
this.writer = writer;
this.cdp = cdp;
this.skipDocs = opts.skipDocs || 0;
this.url = url;
this.skipDocs = skipDocs || 0;
}
async extractAndStoreText(
@ -41,13 +43,14 @@ export abstract class BaseTextExtract extends WARCResourceWriter {
return { changed: false, text };
}
if (saveToWarc) {
await this.writeBufferToWARC(
new TextEncoder().encode(text),
await this.writer.writeNewResourceRecord({
buffer: new TextEncoder().encode(text),
resourceType,
"text/plain",
);
contentType: "text/plain",
url: this.url,
});
logger.debug(
`Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.warcName}`,
`Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.writer.filename}`,
);
}

View file

@ -1,78 +0,0 @@
import fs from "fs";
import path from "path";
import * as warcio from "warcio";
// ===========================================================================
export type WARCResourceWriterOpts = {
url: string;
directory: string;
date?: Date;
warcName: string;
warcPrefix: string;
};
// ===========================================================================
export class WARCResourceWriter {
url: string;
directory: string;
warcName: string;
date: Date;
constructor({
url,
directory,
date,
warcPrefix,
warcName,
}: WARCResourceWriterOpts) {
this.url = url;
this.directory = directory;
this.warcName = path.join(this.directory, warcPrefix + warcName);
this.date = date ? date : new Date();
}
async writeBufferToWARC(
contents: Uint8Array,
resourceType: string,
contentType: string,
) {
const warcRecord = await WARCResourceWriter.createResourceRecord(
contents,
resourceType,
contentType,
this.url,
this.date,
);
const warcRecordBuffer = await warcio.WARCSerializer.serialize(warcRecord, {
gzip: true,
});
fs.appendFileSync(this.warcName, warcRecordBuffer);
}
static async createResourceRecord(
buffer: Uint8Array,
resourceType: string,
contentType: string,
url: string,
date: Date,
) {
const warcVersion = "WARC/1.1";
const warcRecordType = "resource";
const warcHeaders = { "Content-Type": contentType };
async function* content() {
yield buffer;
}
const resourceUrl = `urn:${resourceType}:${url}`;
return warcio.WARCRecord.create(
{
url: resourceUrl,
date: date.toISOString(),
type: warcRecordType,
warcVersion,
warcHeaders,
},
content(),
);
}
}

View file

@ -2,14 +2,22 @@ import fs from "fs";
import { Writable } from "stream";
import path from "path";
import { CDXIndexer } from "warcio";
import { CDXIndexer, WARCRecord } from "warcio";
import { WARCSerializer } from "warcio/node";
import { logger, formatErr } from "./logger.js";
import type { IndexerOffsetLength, WARCRecord } from "warcio";
import type { IndexerOffsetLength } from "warcio";
import { timestampNow } from "./timing.js";
const DEFAULT_ROLLOVER_SIZE = 1_000_000_000;
export type ResourceRecordData = {
buffer: Uint8Array;
resourceType: string;
contentType: string;
url: string;
date?: Date;
};
// =================================================================
export class WARCWriter implements IndexerOffsetLength {
archivesDir: string;
@ -47,6 +55,8 @@ export class WARCWriter implements IndexerOffsetLength {
}) {
this.archivesDir = archivesDir;
this.tempCdxDir = tempCdxDir;
// for now, disabling CDX
this.tempCdxDir = undefined;
this.logDetails = logDetails;
this.gzip = gzip;
this.rolloverSize = rolloverSize;
@ -137,6 +147,39 @@ export class WARCWriter implements IndexerOffsetLength {
this._writeCDX(record);
}
async writeNewResourceRecord({
buffer,
resourceType,
contentType,
url,
date,
}: ResourceRecordData) {
const warcVersion = "WARC/1.1";
const warcRecordType = "resource";
const warcHeaders = { "Content-Type": contentType };
async function* content() {
yield buffer;
}
const resourceUrl = `urn:${resourceType}:${url}`;
if (!date) {
date = new Date();
}
return await this.writeSingleRecord(
WARCRecord.create(
{
url: resourceUrl,
date: date.toISOString(),
type: warcRecordType,
warcVersion,
warcHeaders,
},
content(),
),
);
}
private async _writeRecord(record: WARCRecord, serializer: WARCSerializer) {
if (this.done) {
logger.warn(
@ -188,8 +231,6 @@ export class WARCWriter implements IndexerOffsetLength {
}
async flush() {
this.done = true;
if (this.fh) {
await streamFinish(this.fh);
this.fh = null;
@ -201,6 +242,8 @@ export class WARCWriter implements IndexerOffsetLength {
await streamFinish(this.cdxFH);
this.cdxFH = null;
}
this.done = true;
}
}

View file

@ -290,7 +290,7 @@ export class PageWorker {
} finally {
try {
if (this.recorder) {
opts.data.ts = await this.recorder.writePageInfoRecord();
opts.data.ts = this.recorder.writePageInfoRecord();
}
} catch (e) {
logger.error(