Merge branch 'unify-warc-writer' into use-js-wacz

This commit is contained in:
Ilya Kreymer 2024-03-22 21:49:15 -07:00
commit 50a771cc68
8 changed files with 197 additions and 191 deletions

View file

@ -65,6 +65,7 @@ import { CDPSession, Frame, HTTPRequest, Page } from "puppeteer-core";
import { Recorder } from "./util/recorder.js";
import { SitemapReader } from "./util/sitemapper.js";
import { ScopedSeed } from "./util/seeds.js";
import { WARCWriter } from "./util/warcwriter.js";
const HTTPS_AGENT = new HTTPSAgent({
rejectUnauthorized: false,
@ -155,6 +156,11 @@ export class Crawler {
otherPagesFile: string;
archivesDir: string;
tempdir: string;
tempCdxDir: string;
screenshotWriter: WARCWriter | null;
textWriter: WARCWriter | null;
blockRules: BlockRules | null;
adBlockRules: AdBlockRules | null;
@ -183,8 +189,6 @@ export class Crawler {
maxHeapUsed = 0;
maxHeapTotal = 0;
warcPrefix: string;
driver!: (opts: {
page: Page;
data: PageState;
@ -278,6 +282,11 @@ export class Crawler {
// archives dir
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");
this.screenshotWriter = null;
this.textWriter = null;
this.blockRules = null;
this.adBlockRules = null;
@ -295,12 +304,6 @@ export class Crawler {
this.customBehaviors = "";
this.browser = new Browser();
this.warcPrefix = process.env.WARC_PREFIX || this.params.warcPrefix || "";
if (this.warcPrefix) {
this.warcPrefix += "-" + this.crawlId + "-";
}
}
protected parseArgs() {
@ -454,14 +457,10 @@ export class Crawler {
subprocesses.push(this.launchRedis());
//const initRes = child_process.spawnSync("wb-manager", ["init", this.params.collection], {cwd: this.params.cwd});
//if (initRes.status) {
// logger.info("wb-manager init failed, collection likely already exists");
//}
await fsp.mkdir(this.logDir, { recursive: true });
await fsp.mkdir(this.archivesDir, { recursive: true });
await fsp.mkdir(this.tempdir, { recursive: true });
await fsp.mkdir(this.tempCdxDir, { recursive: true });
this.logFH = fs.createWriteStream(this.logFilename);
logger.setExternalLogStream(this.logFH);
@ -521,6 +520,13 @@ export class Crawler {
{ detached: RUN_DETACHED },
);
}
if (this.params.screenshot) {
this.screenshotWriter = this.createExtraResourceWarcWriter("screenshots");
}
if (this.params.text) {
this.textWriter = this.createExtraResourceWarcWriter("text");
}
}
extraChromeArgs() {
@ -819,16 +825,15 @@ self.__bx_behaviors.selectMainBehavior();
const logDetails = { page: url, workerid };
if (this.params.screenshot) {
if (this.params.screenshot && this.screenshotWriter) {
if (!data.isHTMLPage) {
logger.debug("Skipping screenshots for non-HTML page", logDetails);
}
const screenshots = new Screenshots({
warcPrefix: this.warcPrefix,
browser: this.browser,
page,
url,
directory: this.archivesDir,
writer: this.screenshotWriter,
});
if (this.params.screenshot.includes("view")) {
await screenshots.take("view", saveOutput ? data : null);
@ -843,11 +848,10 @@ self.__bx_behaviors.selectMainBehavior();
let textextract = null;
if (data.isHTMLPage) {
if (data.isHTMLPage && this.textWriter) {
textextract = new TextExtractViaSnapshot(cdp, {
warcPrefix: this.warcPrefix,
writer: this.textWriter,
url,
directory: this.archivesDir,
skipDocs: this.skipTextDocs,
});
const { text } = await textextract.extractAndStoreText(
@ -1303,6 +1307,8 @@ self.__bx_behaviors.selectMainBehavior();
await this.closePages();
await this.closeFiles();
await this.writeStats();
// if crawl has been stopped, mark as final exit for post-crawl tasks
@ -1339,6 +1345,15 @@ self.__bx_behaviors.selectMainBehavior();
}
}
async closeFiles() {
if (this.textWriter) {
await this.textWriter.flush();
}
if (this.screenshotWriter) {
await this.screenshotWriter.flush();
}
}
protected async _addInitialSeeds() {
for (let i = 0; i < this.params.scopedSeeds.length; i++) {
const seed = this.params.scopedSeeds[i];
@ -2385,15 +2400,54 @@ self.__bx_behaviors.selectMainBehavior();
}
}
getWarcPrefix(defaultValue = "") {
let warcPrefix =
process.env.WARC_PREFIX || this.params.warcPrefix || defaultValue;
if (warcPrefix) {
warcPrefix += "-" + this.crawlId + "-";
}
return warcPrefix;
}
createExtraResourceWarcWriter(resourceName: string, gzip = true) {
const filenameBase = `${this.getWarcPrefix()}${resourceName}`;
return this.createWarcWriter(filenameBase, gzip, { resourceName });
}
createWarcWriter(
filenameBase: string,
gzip: boolean,
logDetails: Record<string, string>,
) {
const filenameTemplate = `${filenameBase}.warc${gzip ? ".gz" : ""}`;
return new WARCWriter({
archivesDir: this.archivesDir,
tempCdxDir: this.tempCdxDir,
filenameTemplate,
rolloverSize: this.params.rolloverSize,
gzip,
logDetails,
});
}
createRecorder(id: number): Recorder | null {
if (!this.recording) {
return null;
}
const filenameBase = `${this.getWarcPrefix("rec")}$ts-${id}`;
const writer = this.createWarcWriter(filenameBase, true, { id: id + "" });
const res = new Recorder({
workerid: id,
collDir: this.collDir,
crawler: this,
writer,
tempdir: this.tempdir,
});
this.browser.recorders.push(res);

View file

@ -16,7 +16,6 @@ import { ZipRangeReader } from "@webrecorder/wabac/src/wacz/ziprangereader.js";
import { createLoader } from "@webrecorder/wabac/src/blockloaders.js";
import { AsyncIterReader } from "warcio";
import { WARCResourceWriter } from "./util/warcresourcewriter.js";
import { parseArgs } from "./util/argParser.js";
import { PNG } from "pngjs";
@ -25,6 +24,7 @@ import pixelmatch from "pixelmatch";
import levenshtein from "js-levenshtein";
import { MAX_URL_LENGTH } from "./util/reqresp.js";
import { openAsBlob } from "fs";
import { WARCWriter } from "./util/warcwriter.js";
// RWP Replay Prefix
const REPLAY_PREFIX = "http://localhost:9990/replay/w/replay/";
@ -67,6 +67,7 @@ export class ReplayCrawler extends Crawler {
qaSource: string;
pageInfos: Map<Page, ReplayPageInfoRecord>;
infoWriter: WARCWriter | null;
reloadTimeouts: WeakMap<Page, NodeJS.Timeout>;
@ -98,6 +99,14 @@ export class ReplayCrawler extends Crawler {
this.params.serviceWorker = "enabled";
this.reloadTimeouts = new WeakMap<Page, NodeJS.Timeout>();
this.infoWriter = null;
}
async bootstrap(): Promise<void> {
await super.bootstrap();
this.infoWriter = this.createExtraResourceWarcWriter("info");
}
protected parseArgs() {
@ -666,18 +675,13 @@ export class ReplayCrawler extends Crawler {
(state as ComparisonPageState).comparison = comparison;
}
const writer = new WARCResourceWriter({
await this.infoWriter?.writeNewResourceRecord({
buffer: new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)),
resourceType: "pageinfo",
contentType: "application/json",
url: pageInfo.url,
directory: this.archivesDir,
warcPrefix: this.warcPrefix,
date: new Date(),
warcName: "info.warc.gz",
});
await writer.writeBufferToWARC(
new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)),
"pageinfo",
"application/json",
);
this.pageInfos.delete(page);
}
}

View file

@ -1,6 +1,4 @@
import fs from "fs";
import path from "path";
import os from "os";
import { v4 as uuidv4 } from "uuid";
@ -24,7 +22,6 @@ import { WARCWriter } from "./warcwriter.js";
import { RedisCrawlState, WorkerId } from "./state.js";
import { CDPSession, Protocol } from "puppeteer-core";
import { Crawler } from "../crawler.js";
import { WARCResourceWriter } from "./warcresourcewriter.js";
const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000;
const MAX_BROWSER_TEXT_FETCH_SIZE = 25_000_000;
@ -70,7 +67,6 @@ export type PageInfoRecord = {
// =================================================================
export class Recorder {
workerid: WorkerId;
collDir: string;
crawler: Crawler;
@ -94,9 +90,7 @@ export class Recorder {
allowFull206 = false;
archivesDir: string;
tempdir: string;
tempCdxDir: string;
gzip = true;
@ -107,46 +101,26 @@ export class Recorder {
constructor({
workerid,
collDir,
writer,
crawler,
tempdir,
}: {
workerid: WorkerId;
collDir: string;
writer: WARCWriter;
crawler: Crawler;
tempdir: string;
}) {
this.workerid = workerid;
this.crawler = crawler;
this.crawlState = crawler.crawlState;
this.writer = writer;
this.tempdir = tempdir;
this.warcQ = new PQueue({ concurrency: 1 });
this.fetcherQ = new PQueue({ concurrency: 1 });
this.collDir = collDir;
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");
fs.mkdirSync(this.tempdir, { recursive: true });
fs.mkdirSync(this.archivesDir, { recursive: true });
fs.mkdirSync(this.tempCdxDir, { recursive: true });
const prefix =
process.env.WARC_PREFIX || crawler.params.warcPrefix || "rec";
const crawlId = process.env.CRAWL_ID || os.hostname();
const filenameTemplate = `${prefix}-${crawlId}-$ts-${this.workerid}.warc${
this.gzip ? ".gz" : ""
}`;
this.writer = new WARCWriter({
archivesDir: this.archivesDir,
tempCdxDir: this.tempCdxDir,
filenameTemplate,
rolloverSize: crawler.params.rolloverSize,
gzip: this.gzip,
logDetails: this.logDetails,
});
}
async onCreatePage({ cdp }: { cdp: CDPSession }) {
@ -733,18 +707,19 @@ export class Recorder {
}
}
async writePageInfoRecord() {
writePageInfoRecord() {
const text = JSON.stringify(this.pageInfo, null, 2);
const resourceRecord = await WARCResourceWriter.createResourceRecord(
new TextEncoder().encode(text),
"pageinfo",
"application/json",
this.pageUrl,
new Date(),
);
const url = this.pageUrl;
this.warcQ.add(() => this.writer.writeSingleRecord(resourceRecord));
this.warcQ.add(() =>
this.writer.writeNewResourceRecord({
buffer: new TextEncoder().encode(text),
resourceType: "pageinfo",
contentType: "application/json",
url,
}),
);
return this.pageInfo.ts;
}
@ -796,6 +771,8 @@ export class Recorder {
}
async onDone(timeout: number) {
console.log("ON DONE!!");
await this.crawlState.setStatus("pending-wait");
const finishFetch = async () => {

View file

@ -1,10 +1,10 @@
import sharp from "sharp";
import { WARCResourceWriter } from "./warcresourcewriter.js";
import { logger, formatErr } from "./logger.js";
import { Browser } from "./browser.js";
import { Page } from "puppeteer-core";
import { PageState } from "./state.js";
import { WARCWriter } from "./warcwriter.js";
// ============================================================================
@ -42,18 +42,20 @@ export type ScreenshotOpts = {
browser: Browser;
page: Page;
url: string;
directory: string;
warcPrefix: string;
writer: WARCWriter;
};
export class Screenshots extends WARCResourceWriter {
export class Screenshots {
browser: Browser;
page: Page;
url: string;
writer: WARCWriter;
constructor(opts: ScreenshotOpts) {
super({ ...opts, warcName: "screenshots.warc.gz" });
this.browser = opts.browser;
this.page = opts.page;
constructor({ browser, page, writer, url }: ScreenshotOpts) {
this.browser = browser;
this.page = page;
this.url = url;
this.writer = writer;
}
async take(
@ -72,13 +74,14 @@ export class Screenshots extends WARCResourceWriter {
if (state && screenshotType === "view") {
state.screenshotView = screenshotBuffer;
}
await this.writeBufferToWARC(
screenshotBuffer,
screenshotType,
"image/" + options.type,
);
await this.writer.writeNewResourceRecord({
buffer: screenshotBuffer,
resourceType: screenshotType,
contentType: "image/" + options.type,
url: this.url,
});
logger.info(
`Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.warcName}`,
`Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.writer.filename}`,
);
} catch (e) {
logger.error(
@ -103,13 +106,14 @@ export class Screenshots extends WARCResourceWriter {
// 16:9 thumbnail
.resize(640, 360)
.toBuffer();
await this.writeBufferToWARC(
thumbnailBuffer,
screenshotType,
"image/" + options.type,
);
await this.writer.writeNewResourceRecord({
buffer: thumbnailBuffer,
resourceType: screenshotType,
contentType: "image/" + options.type,
url: this.url,
});
logger.info(
`Screenshot (type: thumbnail) for ${this.url} written to ${this.warcName}`,
`Screenshot (type: thumbnail) for ${this.url} written to ${this.writer.filename}`,
);
} catch (e) {
logger.error(

View file

@ -1,26 +1,28 @@
import { WARCResourceWriter } from "./warcresourcewriter.js";
import { logger } from "./logger.js";
import { CDPSession, Protocol } from "puppeteer-core";
import { WARCWriter } from "./warcwriter.js";
// ============================================================================
type TextExtractOpts = {
url: string;
directory: string;
warcPrefix: string;
writer: WARCWriter;
skipDocs: number;
};
// ============================================================================
export abstract class BaseTextExtract extends WARCResourceWriter {
export abstract class BaseTextExtract {
cdp: CDPSession;
lastText: string | null = null;
text: string | null = null;
skipDocs: number = 0;
writer: WARCWriter;
url: string;
constructor(cdp: CDPSession, opts: TextExtractOpts) {
super({ ...opts, warcName: "text.warc.gz" });
constructor(cdp: CDPSession, { writer, skipDocs, url }: TextExtractOpts) {
this.writer = writer;
this.cdp = cdp;
this.skipDocs = opts.skipDocs || 0;
this.url = url;
this.skipDocs = skipDocs || 0;
}
async extractAndStoreText(
@ -41,13 +43,14 @@ export abstract class BaseTextExtract extends WARCResourceWriter {
return { changed: false, text };
}
if (saveToWarc) {
await this.writeBufferToWARC(
new TextEncoder().encode(text),
await this.writer.writeNewResourceRecord({
buffer: new TextEncoder().encode(text),
resourceType,
"text/plain",
);
contentType: "text/plain",
url: this.url,
});
logger.debug(
`Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.warcName}`,
`Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.writer.filename}`,
);
}

View file

@ -1,78 +0,0 @@
import fs from "fs";
import path from "path";
import * as warcio from "warcio";
// ===========================================================================
export type WARCResourceWriterOpts = {
url: string;
directory: string;
date?: Date;
warcName: string;
warcPrefix: string;
};
// ===========================================================================
export class WARCResourceWriter {
url: string;
directory: string;
warcName: string;
date: Date;
constructor({
url,
directory,
date,
warcPrefix,
warcName,
}: WARCResourceWriterOpts) {
this.url = url;
this.directory = directory;
this.warcName = path.join(this.directory, warcPrefix + warcName);
this.date = date ? date : new Date();
}
async writeBufferToWARC(
contents: Uint8Array,
resourceType: string,
contentType: string,
) {
const warcRecord = await WARCResourceWriter.createResourceRecord(
contents,
resourceType,
contentType,
this.url,
this.date,
);
const warcRecordBuffer = await warcio.WARCSerializer.serialize(warcRecord, {
gzip: true,
});
fs.appendFileSync(this.warcName, warcRecordBuffer);
}
static async createResourceRecord(
buffer: Uint8Array,
resourceType: string,
contentType: string,
url: string,
date: Date,
) {
const warcVersion = "WARC/1.1";
const warcRecordType = "resource";
const warcHeaders = { "Content-Type": contentType };
async function* content() {
yield buffer;
}
const resourceUrl = `urn:${resourceType}:${url}`;
return warcio.WARCRecord.create(
{
url: resourceUrl,
date: date.toISOString(),
type: warcRecordType,
warcVersion,
warcHeaders,
},
content(),
);
}
}

View file

@ -2,14 +2,22 @@ import fs from "fs";
import { Writable } from "stream";
import path from "path";
import { CDXIndexer } from "warcio";
import { CDXIndexer, WARCRecord } from "warcio";
import { WARCSerializer } from "warcio/node";
import { logger, formatErr } from "./logger.js";
import type { IndexerOffsetLength, WARCRecord } from "warcio";
import type { IndexerOffsetLength } from "warcio";
import { timestampNow } from "./timing.js";
const DEFAULT_ROLLOVER_SIZE = 1_000_000_000;
export type ResourceRecordData = {
buffer: Uint8Array;
resourceType: string;
contentType: string;
url: string;
date?: Date;
};
// =================================================================
export class WARCWriter implements IndexerOffsetLength {
archivesDir: string;
@ -47,6 +55,8 @@ export class WARCWriter implements IndexerOffsetLength {
}) {
this.archivesDir = archivesDir;
this.tempCdxDir = tempCdxDir;
// for now, disabling CDX
this.tempCdxDir = undefined;
this.logDetails = logDetails;
this.gzip = gzip;
this.rolloverSize = rolloverSize;
@ -137,6 +147,39 @@ export class WARCWriter implements IndexerOffsetLength {
this._writeCDX(record);
}
async writeNewResourceRecord({
buffer,
resourceType,
contentType,
url,
date,
}: ResourceRecordData) {
const warcVersion = "WARC/1.1";
const warcRecordType = "resource";
const warcHeaders = { "Content-Type": contentType };
async function* content() {
yield buffer;
}
const resourceUrl = `urn:${resourceType}:${url}`;
if (!date) {
date = new Date();
}
return await this.writeSingleRecord(
WARCRecord.create(
{
url: resourceUrl,
date: date.toISOString(),
type: warcRecordType,
warcVersion,
warcHeaders,
},
content(),
),
);
}
private async _writeRecord(record: WARCRecord, serializer: WARCSerializer) {
if (this.done) {
logger.warn(
@ -188,8 +231,6 @@ export class WARCWriter implements IndexerOffsetLength {
}
async flush() {
this.done = true;
if (this.fh) {
await streamFinish(this.fh);
this.fh = null;
@ -201,6 +242,8 @@ export class WARCWriter implements IndexerOffsetLength {
await streamFinish(this.cdxFH);
this.cdxFH = null;
}
this.done = true;
}
}

View file

@ -290,7 +290,7 @@ export class PageWorker {
} finally {
try {
if (this.recorder) {
opts.data.ts = await this.recorder.writePageInfoRecord();
opts.data.ts = this.recorder.writePageInfoRecord();
}
} catch (e) {
logger.error(
@ -403,7 +403,6 @@ export async function runWorkers(
) {
logger.info(`Creating ${numWorkers} workers`, {}, "worker");
const workers = [];
let offset = 0;
// automatically set worker start by ordinal in k8s