WARC writer + incremental indexing fixes (#679)

- ensure WARC rollover happens only after response/request + cdx or
single record + cdx have been written
- ensure request payload is buffered for POST request indexing
- update to warcio 2.3.1 for POST request case-insensitive
'content-type' check
- recorder: remove unused 'tempdir', no longer used as warcio chooses a
temp file on it's own
This commit is contained in:
Ilya Kreymer 2024-09-05 11:10:31 -07:00 committed by GitHub
parent 0d6a0b0efa
commit 9d0e3423a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 24 additions and 34 deletions

View file

@ -36,7 +36,7 @@
"tsc": "^2.0.4",
"undici": "^6.18.2",
"uuid": "8.3.2",
"warcio": "^2.3.0",
"warcio": "^2.3.1",
"ws": "^7.4.4",
"yargs": "^17.7.2"
},
@ -65,6 +65,7 @@
"testTimeout": 90000
},
"resolutions": {
"wrap-ansi": "7.0.0"
"wrap-ansi": "7.0.0",
"warcio": "^2.3.1"
}
}

View file

@ -156,7 +156,6 @@ export class Crawler {
otherPagesFile: string;
archivesDir: string;
tempdir: string;
warcCdxDir: string;
indexesDir: string;
@ -295,7 +294,6 @@ export class Crawler {
// archives dir
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");
// indexes dirs
this.warcCdxDir = path.join(this.collDir, "warc-cdx");
@ -480,7 +478,6 @@ export class Crawler {
if (!this.params.dryRun) {
await fsp.mkdir(this.archivesDir, { recursive: true });
await fsp.mkdir(this.tempdir, { recursive: true });
await fsp.mkdir(this.warcCdxDir, { recursive: true });
}
@ -2581,7 +2578,6 @@ self.__bx_behaviors.selectMainBehavior();
workerid: id,
crawler: this,
writer,
tempdir: this.tempdir,
});
this.browser.recorders.push(res);

View file

@ -1,11 +1,7 @@
import path from "path";
import { v4 as uuidv4 } from "uuid";
import PQueue from "p-queue";
import { logger, formatErr } from "./logger.js";
import { sleep, timedRun, timestampNow } from "./timing.js";
import { sleep, timedRun } from "./timing.js";
import {
RequestResponseInfo,
isHTMLMime,
@ -142,8 +138,6 @@ export class Recorder {
logDetails: Record<string, any> = {};
skipping = false;
tempdir: string;
gzip = true;
writer: WARCWriter;
@ -157,12 +151,10 @@ export class Recorder {
workerid,
writer,
crawler,
tempdir,
}: {
workerid: WorkerId;
writer: WARCWriter;
crawler: Crawler;
tempdir: string;
}) {
this.workerid = workerid;
this.crawler = crawler;
@ -170,8 +162,6 @@ export class Recorder {
this.writer = writer;
this.tempdir = tempdir;
this.fetcherQ = new PQueue({ concurrency: 1 });
this.frameIdToExecId = null;
@ -1274,9 +1264,6 @@ class AsyncFetcher {
recorder: Recorder;
tempdir: string;
filename: string;
manualRedirect = false;
constructor({
@ -1299,19 +1286,13 @@ class AsyncFetcher {
this.recorder = recorder;
this.tempdir = recorder.tempdir;
this.filename = path.join(
this.tempdir,
`${timestampNow()}-${uuidv4()}.data`,
);
this.maxFetchSize = maxFetchSize;
this.manualRedirect = manualRedirect;
}
async load() {
const { reqresp, recorder, networkId, filename } = this;
const { reqresp, recorder, networkId } = this;
const { url, status } = reqresp;
const { pageid, crawlState, gzip, logDetails } = recorder;
@ -1361,7 +1342,7 @@ class AsyncFetcher {
} catch (e) {
logger.error(
"Error reading + digesting payload",
{ url, filename, ...formatErr(e), ...logDetails },
{ url, ...formatErr(e), ...logDetails },
"recorder",
);
}
@ -1436,7 +1417,7 @@ class AsyncFetcher {
}
logger.debug(
"Streaming Fetch Error",
{ url, networkId, filename, ...formatErr(e), ...logDetails },
{ url, networkId, ...formatErr(e), ...logDetails },
"recorder",
);
// indicate response is ultimately not valid

View file

@ -155,6 +155,10 @@ export class WARCWriter implements IndexerOffsetLength {
this._writeCDX(responseRecord);
if (requestRecord.httpHeaders?.method !== "GET") {
await requestRecord.readFully(false);
}
const requestSerializer = new WARCSerializer(requestRecord, opts);
this.recordLength = await this._writeRecord(
requestRecord,
@ -162,6 +166,10 @@ export class WARCWriter implements IndexerOffsetLength {
);
this._writeCDX(requestRecord);
if (this.offset >= this.rolloverSize) {
this.fh = await this.initFH();
}
}
private addToQueue(
@ -197,6 +205,10 @@ export class WARCWriter implements IndexerOffsetLength {
this.recordLength = await this._writeRecord(record, requestSerializer);
this._writeCDX(record);
if (this.offset >= this.rolloverSize) {
this.fh = await this.initFH();
}
}
writeNewResourceRecord(
@ -257,7 +269,7 @@ export class WARCWriter implements IndexerOffsetLength {
let total = 0;
const url = record.warcTargetURI;
if (!this.fh || this.offset >= this.rolloverSize) {
if (!this.fh) {
this.fh = await this.initFH();
}

View file

@ -5277,10 +5277,10 @@ walker@^1.0.8:
dependencies:
makeerror "1.0.12"
warcio@^2.3.0:
version "2.3.0"
resolved "https://registry.yarnpkg.com/warcio/-/warcio-2.3.0.tgz#a655df9b5986a53e5d05aa68cda51bfefdfa8347"
integrity sha512-PCHcZ/fDE5+QECOFe/n/vzyDmAITJ1mvLx1jVONJ0uaV9OwcTbIWoh7Z0+OQwQdq8Wr1Nnb2hwhtHJ7J+9rHIQ==
warcio@^2.3.0, warcio@^2.3.1:
version "2.3.1"
resolved "https://registry.yarnpkg.com/warcio/-/warcio-2.3.1.tgz#8ac9de897de1a556161168f2a3938b60929908ca"
integrity sha512-PjcWqzXfs6HdWfHi1V/i8MoMmV5M0Csg3rOa2mqCJ1dmCJXswVfQ0VXbEVumwavNIW2oFFj6LJoCHHeL4Ls/zw==
dependencies:
"@types/pako" "^1.0.7"
"@types/stream-buffers" "^3.0.7"