WARC filename prefix + rollover size + improved 'livestream' / truncated response support. (#440)

Support for rollover size and custom WARC prefix templates:
- reenable --rolloverSize (default to 1GB) for when a new WARC is
created
- support custom WARC prefix via --warcPrefix, prepended to new WARC
filename, test via basic_crawl.test.js
- filename template for new files is:
`${prefix}-${crawlId}-$ts-${this.workerid}.warc${his.gzip ? ".gz" : ""}`
with `$ts` replaced at new file creation time with current timestamp

Improved support for long (non-terminating) responses, such as from
live-streaming:
- add a size to CDP takeStream to ensure data is streamed in fixed
chunks, defaulting to 64k
- change shutdown order: first close browser, then finish writing all
WARCs to ensure any truncated responses can be captured.
- ensure WARC is not rewritten after it is done, skip writing records if
stream already flushed
  - add timeout to final fetch tasks to avoid never hanging on finish
- fix adding `WARC-Truncated` header, need to set after stream is
finished to determine if its been truncated
- move temp download `tmp-dl` dir to main temp folder, outside of
collection (no need to be there).
This commit is contained in:
Ilya Kreymer 2023-12-07 23:02:55 -08:00 committed by GitHub
parent e9ed7a45df
commit 3323262852
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 137 additions and 31 deletions

View file

@ -1225,8 +1225,6 @@ self.__bx_behaviors.selectMainBehavior();
await this.serializeConfig(true);
await this.browser.close();
if (this.pagesFH) {
await this.pagesFH.sync();
await this.pagesFH.close();

View file

@ -489,6 +489,13 @@ class ArgParser {
"if set, runs internal redis without protected mode to allow external access (for debugging)",
type: "boolean",
},
warcPrefix: {
describe:
"prefix for WARC files generated, including WARCs added to WACZ",
type: "string",
default: "rec",
},
};
}

View file

@ -7,7 +7,7 @@ import { v4 as uuidv4 } from "uuid";
import PQueue from "p-queue";
import { logger, formatErr } from "./logger.js";
import { sleep, timestampNow } from "./timing.js";
import { sleep, timedRun, timestampNow } from "./timing.js";
import { RequestResponseInfo } from "./reqresp.js";
// @ts-expect-error TODO fill in why error is expected
@ -30,6 +30,8 @@ const MAX_BROWSER_TEXT_FETCH_SIZE = 25_000_000;
const MAX_NETWORK_LOAD_SIZE = 200_000_000;
const TAKE_STREAM_BUFF_SIZE = 1024 * 64;
const ASYNC_FETCH_DUPE_KEY = "s:fetchdupe";
const WRITE_DUPE_KEY = "s:writedupe";
@ -87,8 +89,6 @@ export class Recorder {
}: {
workerid: WorkerId;
collDir: string;
// TODO: Fix this the next time the file is edited.
crawler: Crawler;
}) {
this.workerid = workerid;
@ -102,20 +102,24 @@ export class Recorder {
this.collDir = collDir;
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(this.collDir, "tmp-dl");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");
fs.mkdirSync(this.tempdir, { recursive: true });
fs.mkdirSync(this.archivesDir, { recursive: true });
fs.mkdirSync(this.tempCdxDir, { recursive: true });
const prefix = crawler.params.warcPrefix || "rec";
const crawlId = process.env.CRAWL_ID || os.hostname();
const filename = `rec-${crawlId}-${timestampNow()}-${this.workerid}.warc`;
const filenameTemplate = `${prefix}-${crawlId}-$ts-${this.workerid}.warc${
this.gzip ? ".gz" : ""
}`;
this.writer = new WARCWriter({
archivesDir: this.archivesDir,
tempCdxDir: this.tempCdxDir,
filename,
filenameTemplate,
rolloverSize: crawler.params.rolloverSize,
gzip: this.gzip,
logDetails: this.logDetails,
});
@ -658,11 +662,21 @@ export class Recorder {
// Any page-specific handling before page is closed.
}
async onDone() {
async onDone(timeout: number) {
await this.crawlState.setStatus("pending-wait");
const finishFetch = async () => {
logger.debug("Finishing Fetcher Queue", this.logDetails, "recorder");
await this.fetcherQ.onIdle();
};
await timedRun(
finishFetch(),
timeout,
"Finishing Fetch Timed Out",
this.logDetails,
"recorder",
);
logger.debug("Finishing WARC writing", this.logDetails, "recorder");
await this.warcQ.onIdle();
@ -1020,6 +1034,17 @@ class AsyncFetcher {
readSize -= serializer.httpHeadersBuff.length;
}
reqresp.readSize = readSize;
// set truncated field and recompute header buff
if (reqresp.truncated) {
responseRecord.warcHeaders.headers.set(
"WARC-Truncated",
reqresp.truncated,
);
// todo: keep this internal in warcio after adding new header
serializer.warcHeadersBuff = encoder.encode(
responseRecord.warcHeaders.toString(),
);
}
} catch (e) {
logger.error(
"Error reading + digesting payload",
@ -1150,6 +1175,7 @@ class AsyncFetcher {
}
async *takeReader(reader: ReadableStreamDefaultReader<Uint8Array>) {
let size = 0;
try {
while (true) {
const { value, done } = await reader.read();
@ -1157,12 +1183,18 @@ class AsyncFetcher {
break;
}
size += value.length;
yield value;
}
} catch (e) {
logger.warn(
"takeReader interrupted",
{ ...formatErr(e), url: this.reqresp.url, ...this.recorder.logDetails },
{
size,
url: this.reqresp.url,
...formatErr(e),
...this.recorder.logDetails,
},
"recorder",
);
this.reqresp.truncated = "disconnect";
@ -1170,13 +1202,16 @@ class AsyncFetcher {
}
async *takeStreamIter(cdp: CDPSession, stream: Protocol.IO.StreamHandle) {
let size = 0;
try {
while (true) {
const { data, base64Encoded, eof } = await cdp.send("IO.read", {
handle: stream,
size: TAKE_STREAM_BUFF_SIZE,
});
const buff = Buffer.from(data, base64Encoded ? "base64" : "utf-8");
size += buff.length;
yield buff;
if (eof) {
@ -1186,7 +1221,12 @@ class AsyncFetcher {
} catch (e) {
logger.warn(
"takeStream interrupted",
{ ...formatErr(e), url: this.reqresp.url, ...this.recorder.logDetails },
{
size,
url: this.reqresp.url,
...formatErr(e),
...this.recorder.logDetails,
},
"recorder",
);
this.reqresp.truncated = "disconnect";
@ -1315,10 +1355,6 @@ function createResponse(
"WARC-Page-ID": pageid,
};
if (reqresp.truncated) {
warcHeaders["WARC-Truncated"] = reqresp.truncated;
}
if (!contentIter) {
contentIter = [reqresp.payload] as Iterable<Uint8Array>;
}

View file

@ -6,17 +6,24 @@ import { CDXIndexer } from "warcio";
import { WARCSerializer } from "warcio/node";
import { logger, formatErr } from "./logger.js";
import type { IndexerOffsetLength, WARCRecord } from "warcio";
import { timestampNow } from "./timing.js";
const DEFAULT_ROLLOVER_SIZE = 1_000_000_000;
// =================================================================
export class WARCWriter implements IndexerOffsetLength {
archivesDir: string;
tempCdxDir: string;
filename: string;
filenameTemplate: string;
filename?: string;
gzip: boolean;
logDetails: Record<string, string>;
offset = 0;
recordLength = 0;
done = false;
rolloverSize: number;
indexer?: CDXIndexer;
@ -26,21 +33,29 @@ export class WARCWriter implements IndexerOffsetLength {
constructor({
archivesDir,
tempCdxDir,
filename,
filenameTemplate,
rolloverSize = DEFAULT_ROLLOVER_SIZE,
gzip,
logDetails,
}: {
archivesDir: string;
tempCdxDir: string;
filename: string;
filenameTemplate: string;
rolloverSize?: number;
gzip: boolean;
logDetails: Record<string, string>;
}) {
this.archivesDir = archivesDir;
this.tempCdxDir = tempCdxDir;
this.filename = filename;
this.gzip = gzip;
this.logDetails = logDetails;
this.gzip = gzip;
this.rolloverSize = rolloverSize;
this.filenameTemplate = filenameTemplate;
}
_initNewFile() {
const filename = this.filenameTemplate.replace("$ts", timestampNow());
this.offset = 0;
this.recordLength = 0;
@ -48,9 +63,28 @@ export class WARCWriter implements IndexerOffsetLength {
if (this.tempCdxDir) {
this.indexer = new CDXIndexer({ format: "cdxj" });
}
return filename;
}
private async initFH() {
if (this.offset >= this.rolloverSize) {
logger.info(
`Rollover size exceeded, creating new WARC`,
{
rolloverSize: this.rolloverSize,
size: this.offset,
...this.logDetails,
},
"writer",
);
this.filename = this._initNewFile();
this.fh = null;
this.cdxFH = null;
} else if (!this.filename) {
this.filename = this._initNewFile();
}
async initFH() {
if (!this.fh) {
this.fh = fs.createWriteStream(
path.join(this.archivesDir, this.filename),
@ -68,6 +102,15 @@ export class WARCWriter implements IndexerOffsetLength {
requestRecord: WARCRecord,
responseSerializer: WARCSerializer | undefined = undefined,
) {
if (this.done) {
logger.warn(
"Writer closed, not writing records",
this.logDetails,
"writer",
);
return;
}
const opts = { gzip: this.gzip };
if (!responseSerializer) {
@ -117,7 +160,7 @@ export class WARCWriter implements IndexerOffsetLength {
}
_writeCDX(record: WARCRecord | null) {
if (this.indexer) {
if (this.indexer && this.filename) {
const cdx = this.indexer.indexRecord(record, this, this.filename);
if (this.indexer && this.cdxFH && cdx) {
@ -140,6 +183,8 @@ export class WARCWriter implements IndexerOffsetLength {
await streamFinish(this.cdxFH);
this.cdxFH = null;
}
this.done = true;
}
}

View file

@ -17,7 +17,7 @@ const TEARDOWN_TIMEOUT = 10;
const FINISHED_TIMEOUT = 60;
// ===========================================================================
export function runWorkers(
export async function runWorkers(
crawler: Crawler,
numWorkers: number,
maxPageTime: number,
@ -46,7 +46,11 @@ export function runWorkers(
workers.push(new PageWorker(i + offset, crawler, maxPageTime, collDir));
}
return Promise.allSettled(workers.map((worker) => worker.run()));
await Promise.allSettled(workers.map((worker) => worker.run()));
await crawler.browser.close();
await Promise.allSettled(workers.map((worker) => worker.finalize()));
}
// ===========================================================================
@ -362,11 +366,13 @@ export class PageWorker {
{ ...formatErr(e), workerid: this.id },
"worker",
);
} finally {
if (this.recorder) {
await this.recorder.onDone();
}
}
async finalize() {
if (this.recorder) {
await this.recorder.onDone(this.maxPageTime);
}
}
async runLoop() {

View file

@ -5,7 +5,7 @@ import md5 from "md5";
test("ensure basic crawl run with docker run passes", async () => {
child_process.execSync(
'docker run -v $PWD/test-crawls:/crawls webrecorder/browsertrix-crawler crawl --url http://www.example.com/ --generateWACZ --text --collection wr-net --combineWARC --rolloverSize 10000 --workers 2 --title "test title" --description "test description"',
'docker run -v $PWD/test-crawls:/crawls webrecorder/browsertrix-crawler crawl --url https://example.com/ --generateWACZ --text --collection wr-net --combineWARC --rolloverSize 10000 --workers 2 --title "test title" --description "test description" --warcPrefix custom-prefix',
);
child_process.execSync(
@ -17,6 +17,20 @@ test("ensure basic crawl run with docker run passes", async () => {
);
});
test("check that individual WARCs have correct prefix and are under rollover size", () => {
const archiveWarcLists = fs.readdirSync(
"test-crawls/collections/wr-net/archive",
);
archiveWarcLists.forEach((filename) => {
expect(filename.startsWith("custom-prefix-")).toEqual(true);
const size = fs.statSync(
path.join("test-crawls/collections/wr-net/archive", filename),
).size;
expect(size < 10000).toEqual(true);
});
});
test("check that a combined warc file exists in the archive folder", () => {
const warcLists = fs.readdirSync("test-crawls/collections/wr-net");
var captureFound = 0;