Use new browser-based archiving mechanism instead of pywb proxy (#424)

Major refactoring of Browsertrix Crawler to native capture network traffic to WARC files
via the Chrome Debug Protocol (CDP). Allows for more flexibility and accuracy when dealing
with HTTP/2.x sites and avoids a MITM proxy. Addresses #343 

Changes include:
- Recorder class for capture CDP network traffic for each page.
- Handling requests from service workers via matching active frames, skipping unrelated requests outside the page (from background pages, etc..)
- WARC writing support via TS-based warcio.js library.
- Generates single WARC file per worker (still need to add size rollover).
- Request interception via Fetch.requestPaused
- Rule-based rewriting response support (via wabac.js), using Fetch.getResponseBody() / Fetch.fulfillRequest()
- Streaming responses via three methods: inline response fetch via Fetch.takeResponseBodyAsStream, 
async loading via browser network stack with Network.loadNetworkResource() and node-based async fetch
via fetch()
- Direct async fetch() capture of non-HTML URLs
- Awaiting for all requests to finish before moving on to next page, upto page timeout.
- Experimental: generate CDXJ on-the-fly as WARC is being written (not yet in use).
- removed pywb, using cdxj-indexer for --generateCDX option.
This commit is contained in:
Ilya Kreymer 2023-11-07 21:38:50 -08:00 committed by GitHub
parent dd7b926d87
commit 877d9f5b44
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 2237 additions and 570 deletions

View file

@ -30,6 +30,10 @@ module.exports = {
"no-constant-condition": [
"error",
{"checkLoops": false }
],
"no-use-before-define": [
"error",
{"variables": true, "functions": false, "classes": false, "allowNamedExports": true}
]
}
};

View file

@ -20,7 +20,6 @@ ENV PROXY_HOST=localhost \
WORKDIR /app
ADD requirements.txt /app/
RUN pip install 'uwsgi==2.0.21'
RUN pip install -U setuptools; pip install -r requirements.txt
ADD package.json /app/

View file

@ -6,7 +6,6 @@ import fsp from "fs/promises";
import { RedisCrawlState, LoadState, QueueState } from "./util/state.js";
import Sitemapper from "sitemapper";
import { v4 as uuidv4 } from "uuid";
import yaml from "js-yaml";
import * as warcio from "warcio";
@ -103,8 +102,9 @@ export class Crawler {
this.emulateDevice = this.params.emulateDevice || {};
this.captureBasePrefix = `http://${process.env.PROXY_HOST}:${process.env.PROXY_PORT}/${this.params.collection}/record`;
this.capturePrefix = process.env.NO_PROXY ? "" : this.captureBasePrefix + "/id_/";
//this.captureBasePrefix = `http://${process.env.PROXY_HOST}:${process.env.PROXY_PORT}/${this.params.collection}/record`;
//this.capturePrefix = "";//process.env.NO_PROXY ? "" : this.captureBasePrefix + "/id_/";
this.captureBasePrefix = null;
this.gotoOpts = {
waitUntil: this.params.waitUntil,
@ -213,13 +213,36 @@ export class Crawler {
return new ScreenCaster(transport, this.params.workers);
}
async bootstrap() {
const initRes = child_process.spawnSync("wb-manager", ["init", this.params.collection], {cwd: this.params.cwd});
launchRedis() {
let redisStdio;
if (initRes.status) {
logger.info("wb-manager init failed, collection likely already exists");
if (this.params.logging.includes("redis")) {
const redisStderr = fs.openSync(path.join(this.logDir, "redis.log"), "a");
redisStdio = [process.stdin, redisStderr, redisStderr];
} else {
redisStdio = "ignore";
}
let redisArgs = [];
if (this.params.debugAccessRedis) {
redisArgs = ["--protected-mode", "no"];
}
return child_process.spawn("redis-server", redisArgs,{cwd: "/tmp/", stdio: redisStdio});
}
async bootstrap() {
const subprocesses = [];
subprocesses.push(this.launchRedis());
//const initRes = child_process.spawnSync("wb-manager", ["init", this.params.collection], {cwd: this.params.cwd});
//if (initRes.status) {
// logger.info("wb-manager init failed, collection likely already exists");
//}
fs.mkdirSync(this.logDir, {recursive: true});
this.logFH = fs.createWriteStream(this.logFilename);
logger.setExternalLogStream(this.logFH);
@ -246,42 +269,8 @@ export class Crawler {
this.customBehaviors = this.loadCustomBehaviors(this.params.customBehaviors);
}
let opts = {};
let redisStdio;
if (this.params.logging.includes("pywb")) {
const pywbStderr = fs.openSync(path.join(this.logDir, "pywb.log"), "a");
const stdio = [process.stdin, pywbStderr, pywbStderr];
const redisStderr = fs.openSync(path.join(this.logDir, "redis.log"), "a");
redisStdio = [process.stdin, redisStderr, redisStderr];
opts = {stdio, cwd: this.params.cwd};
} else {
opts = {stdio: "ignore", cwd: this.params.cwd};
redisStdio = "ignore";
}
this.headers = {"User-Agent": this.configureUA()};
const subprocesses = [];
let redisArgs = [];
if (this.params.debugAccessRedis) {
redisArgs = ["--protected-mode", "no"];
}
subprocesses.push(child_process.spawn("redis-server", redisArgs, {cwd: "/tmp/", stdio: redisStdio}));
opts.env = {
...process.env,
COLL: this.params.collection,
ROLLOVER_SIZE: this.params.rolloverSize,
DEDUP_POLICY: this.params.dedupPolicy
};
subprocesses.push(child_process.spawn("uwsgi", [new URL("uwsgi.ini", import.meta.url).pathname], opts));
process.on("exit", () => {
for (const proc of subprocesses) {
proc.kill();
@ -472,7 +461,7 @@ self.__bx_behaviors.selectMainBehavior();
async crawlPage(opts) {
await this.writeStats();
const {page, cdp, data, workerid, callbacks} = opts;
const {page, cdp, data, workerid, callbacks, directFetchCapture} = opts;
data.callbacks = callbacks;
const {url} = data;
@ -481,6 +470,38 @@ self.__bx_behaviors.selectMainBehavior();
data.logDetails = logDetails;
data.workerid = workerid;
data.isHTMLPage = await timedRun(
this.isHTML(url, logDetails),
FETCH_TIMEOUT_SECS,
"HEAD request to determine if URL is HTML page timed out",
logDetails,
"fetch",
true
);
if (!data.isHTMLPage && directFetchCapture) {
try {
const {fetched, mime} = await timedRun(
directFetchCapture(url),
FETCH_TIMEOUT_SECS,
"Direct fetch capture attempt timed out",
logDetails,
"fetch",
true
);
if (fetched) {
data.loadState = LoadState.FULL_PAGE_LOADED;
if (mime) {
data.mime = mime;
}
logger.info("Direct fetch successful", {url, ...logDetails}, "fetch");
return true;
}
} catch (e) {
// ignore failed direct fetch attempt, do browser-based capture
}
}
// run custom driver here
await this.driver({page, data, crawler: this});
@ -660,9 +681,8 @@ self.__bx_behaviors.selectMainBehavior();
async getInfoString() {
const packageFileJSON = JSON.parse(await fsp.readFile("../app/package.json"));
const warcioPackageJSON = JSON.parse(await fsp.readFile("/app/node_modules/warcio/package.json"));
const pywbVersion = child_process.execSync("pywb -V", {encoding: "utf8"}).trim().split(" ")[1];
return `Browsertrix-Crawler ${packageFileJSON.version} (with warcio.js ${warcioPackageJSON.version} pywb ${pywbVersion})`;
return `Browsertrix-Crawler ${packageFileJSON.version} (with warcio.js ${warcioPackageJSON.version})`;
}
async createWARCInfo(filename) {
@ -872,7 +892,7 @@ self.__bx_behaviors.selectMainBehavior();
headless: this.params.headless,
emulateDevice: this.emulateDevice,
chromeOptions: {
proxy: !process.env.NO_PROXY,
proxy: false,
userAgent: this.emulateDevice.userAgent,
extraArgs: this.extraChromeArgs()
},
@ -882,9 +902,10 @@ self.__bx_behaviors.selectMainBehavior();
}
});
// --------------
// Run Crawl Here!
await runWorkers(this, this.params.workers, this.maxPageTime);
await runWorkers(this, this.params.workers, this.maxPageTime, this.collDir);
// --------------
await this.serializeConfig(true);
@ -898,8 +919,6 @@ self.__bx_behaviors.selectMainBehavior();
await this.writeStats();
// extra wait for all resources to land into WARCs
await this.awaitPendingClear();
// if crawl has been stopped, mark as final exit for post-crawl tasks
if (await this.crawlState.isCrawlStopped()) {
@ -916,8 +935,19 @@ self.__bx_behaviors.selectMainBehavior();
if (this.params.generateCDX) {
logger.info("Generating CDX");
await fsp.mkdir(path.join(this.collDir, "indexes"), {recursive: true});
await this.crawlState.setStatus("generate-cdx");
const indexResult = await this.awaitProcess(child_process.spawn("wb-manager", ["reindex", this.params.collection], {cwd: this.params.cwd}));
const warcList = await fsp.readdir(path.join(this.collDir, "archive"));
const warcListFull = warcList.map((filename) => path.join(this.collDir, "archive", filename));
//const indexResult = await this.awaitProcess(child_process.spawn("wb-manager", ["reindex", this.params.collection], {cwd: this.params.cwd}));
const params = [
"-o",
path.join(this.collDir, "indexes", "index.cdxj"),
...warcListFull
];
const indexResult = await this.awaitProcess(child_process.spawn("cdxj-indexer", params, {cwd: this.params.cwd}));
if (indexResult === 0) {
logger.debug("Indexing complete, CDX successfully created");
} else {
@ -1136,34 +1166,6 @@ self.__bx_behaviors.selectMainBehavior();
const failCrawlOnError = ((depth === 0) && this.params.failOnFailedSeed);
let isHTMLPage = await timedRun(
this.isHTML(url),
FETCH_TIMEOUT_SECS,
"HEAD request to determine if URL is HTML page timed out",
logDetails,
"fetch",
true
);
if (!isHTMLPage) {
try {
const captureResult = await timedRun(
this.directFetchCapture(url),
FETCH_TIMEOUT_SECS,
"Direct fetch capture attempt timed out",
logDetails,
"fetch",
true
);
if (captureResult) {
logger.info("Direct fetch successful", {url, ...logDetails}, "fetch");
return;
}
} catch (e) {
// ignore failed direct fetch attempt, do browser-based capture
}
}
let ignoreAbort = false;
// Detect if ERR_ABORTED is actually caused by trying to load a non-page (eg. downloadable PDF),
@ -1172,6 +1174,8 @@ self.__bx_behaviors.selectMainBehavior();
ignoreAbort = shouldIgnoreAbort(req);
});
let isHTMLPage = data.isHTMLPage;
if (isHTMLPage) {
page.once("domcontentloaded", () => {
data.loadState = LoadState.CONTENT_LOADED;
@ -1441,9 +1445,12 @@ self.__bx_behaviors.selectMainBehavior();
}
}
async writePage({url, depth, title, text, loadState, favicon}) {
const id = uuidv4();
const row = {id, url, title, loadState};
async writePage({pageid, url, depth, title, text, loadState, mime, favicon}) {
const row = {id: pageid, url, title, loadState};
if (mime) {
row.mime = mime;
}
if (depth === 0) {
row.seed = true;
@ -1469,7 +1476,7 @@ self.__bx_behaviors.selectMainBehavior();
return urlParsed.protocol === "https:" ? HTTPS_AGENT : HTTP_AGENT;
}
async isHTML(url) {
async isHTML(url, logDetails) {
try {
const resp = await fetch(url, {
method: "HEAD",
@ -1477,7 +1484,7 @@ self.__bx_behaviors.selectMainBehavior();
agent: this.resolveAgent
});
if (resp.status !== 200) {
logger.debug(`Skipping HEAD check ${url}, invalid status ${resp.status}`);
logger.debug("HEAD response code != 200, loading in browser", {status: resp.status, ...logDetails});
return true;
}
@ -1485,7 +1492,7 @@ self.__bx_behaviors.selectMainBehavior();
} catch(e) {
// can't confirm not html, so try in browser
logger.debug("HEAD request failed", {...e, url});
logger.debug("HEAD request failed", {...errJSON(e), ...logDetails});
return true;
}
}
@ -1505,35 +1512,6 @@ self.__bx_behaviors.selectMainBehavior();
return false;
}
async directFetchCapture(url) {
const abort = new AbortController();
const signal = abort.signal;
const resp = await fetch(this.capturePrefix + url, {signal, headers: this.headers, redirect: "manual"});
abort.abort();
return resp.status === 200 && !resp.headers.get("set-cookie");
}
async awaitPendingClear() {
logger.info("Waiting to ensure pending data is written to WARCs...");
await this.crawlState.setStatus("pending-wait");
const redis = await initRedis("redis://localhost/0");
while (!this.interrupted) {
try {
const count = Number(await redis.get(`pywb:${this.params.collection}:pending`) || 0);
if (count <= 0) {
break;
}
logger.debug("Waiting for pending requests to finish", {numRequests: count});
} catch (e) {
break;
}
await sleep(1);
}
}
async parseSitemap(url, seedId, sitemapFromDate) {
// handle sitemap last modified date if passed
let lastmodFromTimestamp = null;

View file

@ -11,7 +11,6 @@ import yargs from "yargs";
import { logger } from "./util/logger.js";
import { sleep } from "./util/timing.js";
import { Browser } from "./util/browser.js";
import { initStorage } from "./util/storage.js";
@ -144,18 +143,6 @@ async function main() {
]);
}
let useProxy = false;
if (params.proxy) {
child_process.spawn("wayback", ["--live", "--proxy", "live"], {stdio: "inherit", cwd: "/tmp"});
logger.debug("Running with pywb proxy");
await sleep(3000);
useProxy = true;
}
const browser = new Browser();
await browser.launch({
@ -163,7 +150,7 @@ async function main() {
headless: params.headless,
signals: true,
chromeOptions: {
proxy: useProxy,
proxy: false,
extraArgs: [
"--window-position=0,0",
`--window-size=${params.windowSize}`,

View file

@ -1,6 +1,6 @@
{
"name": "browsertrix-crawler",
"version": "0.12.1",
"version": "1.0.0-beta.0",
"main": "browsertrix-crawler",
"type": "module",
"repository": "https://github.com/webrecorder/browsertrix-crawler",
@ -13,6 +13,7 @@
},
"dependencies": {
"@novnc/novnc": "^1.4.0",
"@webrecorder/wabac": "^2.16.12",
"browsertrix-behaviors": "^0.5.2",
"crc": "^4.3.2",
"get-folder-size": "^4.0.0",
@ -20,16 +21,17 @@
"ioredis": "^4.27.1",
"js-yaml": "^4.1.0",
"minio": "7.0.26",
"p-queue": "^7.3.4",
"puppeteer-core": "^20.7.4",
"sharp": "^0.32.1",
"sitemapper": "^3.2.5",
"uuid": "8.3.2",
"warcio": "^1.6.0",
"warcio": "^2.2.0",
"ws": "^7.4.4",
"yargs": "^17.7.2"
},
"devDependencies": {
"eslint": "^7.20.0",
"eslint": "^8.37.0",
"eslint-plugin-react": "^7.22.0",
"jest": "^29.2.1",
"md5": "^2.3.0"

View file

@ -1,4 +1 @@
pywb>=2.7.4
uwsgi
wacz>=0.4.9
requests[socks]

View file

@ -21,7 +21,7 @@ test("check that the warcinfo file works as expected on the command line", async
expect(string.indexOf("operator: test")).toBeGreaterThan(-1);
expect(string.indexOf("host: hostname")).toBeGreaterThan(-1);
expect(string.match(/Browsertrix-Crawler \d[\w.-]+ \(with warcio.js \d[\w.-]+ pywb \d[\w.-]+\)/)).not.toEqual(null);
expect(string.match(/Browsertrix-Crawler \d[\w.-]+ \(with warcio.js \d[\w.-]+\)/)).not.toEqual(null);
expect(string.indexOf("format: WARC File Format 1.0")).toBeGreaterThan(-1);

View file

@ -178,7 +178,7 @@ class ArgParser {
},
"logging": {
describe: "Logging options for crawler, can include: stats (enabled by default), jserrors, pywb, debug",
describe: "Logging options for crawler, can include: stats (enabled by default), jserrors, debug",
type: "array",
default: ["stats"],
coerce,

View file

@ -19,6 +19,8 @@ export class BaseBrowser
this.profileDir = fs.mkdtempSync(path.join(os.tmpdir(), "profile-"));
this.customProfile = false;
this.emulateDevice = null;
this.recorders = [];
}
async launch({profileUrl, chromeOptions, signals = false, headless = false, emulateDevice = {}, ondisconnect = null} = {}) {
@ -117,6 +119,7 @@ export class BaseBrowser
chromeArgs({proxy=true, userAgent=null, extraArgs=[]} = {}) {
// Chrome Flags, including proxy server
const args = [
// eslint-disable-next-line no-use-before-define
...defaultArgs,
...(process.env.CHROME_FLAGS ?? "").split(" ").filter(Boolean),
//"--no-xshm", // needed for Chrome >80 (check if puppeteer adds automatically)
@ -239,6 +242,8 @@ export class Browser extends BaseBrowser
this.firstCDP = await target.createCDPSession();
await this.serviceWorkerFetch();
if (ondisconnect) {
this.browser.on("disconnected", (err) => ondisconnect(err));
}
@ -294,6 +299,50 @@ export class Browser extends BaseBrowser
return {page, cdp};
}
async serviceWorkerFetch() {
this.firstCDP.on("Fetch.requestPaused", async (params) => {
const { frameId, requestId, networkId, request } = params;
if (networkId) {
try {
await this.firstCDP.send("Fetch.continueResponse", {requestId});
} catch (e) {
logger.warn("continueResponse failed", {url: request.url}, "recorder");
}
return;
}
let foundRecorder = null;
for (const recorder of this.recorders) {
if (recorder.swUrls.has(request.url)) {
recorder.swFrameIds.add(frameId);
}
if (recorder.swFrameIds && recorder.swFrameIds.has(frameId)) {
foundRecorder = recorder;
break;
}
}
if (!foundRecorder) {
logger.debug("Skipping URL from unknown frame", {url: request.url, frameId}, "recorder");
try {
await this.firstCDP.send("Fetch.continueResponse", {requestId});
} catch (e) {
logger.warn("continueResponse failed", {url: request.url}, "recorder");
}
return;
}
await foundRecorder.handleRequestPaused(params, this.firstCDP, true);
});
await this.firstCDP.send("Fetch.enable", {patterns: [{urlPattern: "*", requestStage: "Response"}]});
}
async evaluateWithCLI(_, frame, cdp, funcString, logData, contextName) {
const context = await frame.executionContext();
cdp = context._client;
@ -360,6 +409,9 @@ export const defaultArgs = [
// See https://chromium-review.googlesource.com/c/chromium/src/+/2436773
"--no-service-autorun",
"--export-tagged-pdf",
"--apps-keep-chrome-alive-in-tests",
"--apps-gallery-url=https://invalid.webstore.example.com/",
"--apps-gallery-update-url=https://invalid.webstore.example.com/",
"--component-updater=url-source=http://invalid.dev/",
"--brave-stats-updater-server=url-source=http://invalid.dev/"
];

949
util/recorder.js Normal file
View file

@ -0,0 +1,949 @@
import fs from "fs";
import path from "path";
import os from "os";
import { v4 as uuidv4 } from "uuid";
import PQueue from "p-queue";
import { logger, errJSON } from "./logger.js";
import { sleep, timestampNow } from "./timing.js";
import { RequestResponseInfo } from "./reqresp.js";
import { baseRules as baseDSRules } from "@webrecorder/wabac/src/rewrite/index.js";
import { rewriteDASH, rewriteHLS } from "@webrecorder/wabac/src/rewrite/rewriteVideo.js";
import { WARCRecord } from "warcio";
import { WARCSerializer } from "warcio/node";
import { WARCWriter } from "./warcwriter.js";
const MAX_BROWSER_FETCH_SIZE = 2_000_000;
const MAX_NETWORK_LOAD_SIZE = 200_000_000;
const ASYNC_FETCH_DUPE_KEY = "s:fetchdupe";
const WRITE_DUPE_KEY = "s:writedupe";
const encoder = new TextEncoder();
// =================================================================
function logNetwork(/*msg, data*/) {
// logger.debug(msg, data, "recorderNetwork");
}
// =================================================================
export class Recorder
{
constructor({workerid, collDir, crawler}) {
this.workerid = workerid;
this.crawler = crawler;
this.crawlState = crawler.crawlState;
this.warcQ = new PQueue({concurrency: 1});
this.fetcherQ = new PQueue({concurrency: 1});
this.pendingRequests = null;
this.skipIds = null;
this.swSessionId = null;
this.swFrameIds = new Set();
this.swUrls = new Set();
this.logDetails = {};
this.skipping = false;
this.allowFull206 = true;
this.collDir = collDir;
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(this.collDir, "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");
fs.mkdirSync(this.tempdir, {recursive: true});
fs.mkdirSync(this.archivesDir, {recursive: true});
fs.mkdirSync(this.tempCdxDir, {recursive: true});
const crawlId = process.env.CRAWL_ID || os.hostname();
const filename = `rec-${crawlId}-${timestampNow()}-${this.workerid}.warc`;
this.gzip = true;
this.writer = new WARCWriter({
archivesDir: this.archivesDir,
tempCdxDir: this.tempCdxDir,
filename,
gzip: this.gzip,
logDetails: this.logDetails
});
}
async onCreatePage({cdp}) {
// Fetch
cdp.on("Fetch.requestPaused", async (params) => {
this.handleRequestPaused(params, cdp);
});
await cdp.send("Fetch.enable", {patterns: [{urlPattern: "*", requestStage: "Response"}]});
// Response
cdp.on("Network.responseReceived", (params) => {
// handling to fill in security details
logNetwork("Network.responseReceived", {requestId: params.requestId, ...this.logDetails});
this.handleResponseReceived(params);
});
cdp.on("Network.responseReceivedExtraInfo", (params) => {
logNetwork("Network.responseReceivedExtraInfo", {requestId: params.requestId, ...this.logDetails});
const reqresp = this.pendingReqResp(params.requestId, true);
if (reqresp) {
reqresp.fillResponseReceivedExtraInfo(params);
}
});
// Request
cdp.on("Network.requestWillBeSent", (params) => {
// only handling redirect here, committing last response in redirect chain
// request data stored from requestPaused
if (params.redirectResponse) {
logNetwork("Network.requestWillBeSent after redirect", {requestId: params.requestId, ...this.logDetails});
this.handleRedirectResponse(params);
}
});
cdp.on("Network.requestServedFromCache", (params) => {
logNetwork("Network.requestServedFromCache", {requestId: params.requestId, ...this.logDetails});
this.removeReqResp(params.requestId);
});
cdp.on("Network.requestWillBeSentExtraInfo", (params) => {
logNetwork("Network.requestWillBeSentExtraInfo", {requestId: params.requestId, ...this.logDetails});
this.handleRequestExtraInfo(params);
});
// Loading
cdp.on("Network.loadingFinished", (params) => {
logNetwork("Network.loadingFinished", {requestId: params.requestId, ...this.logDetails});
this.handleLoadingFinished(params);
});
cdp.on("Network.loadingFailed", (params) => {
logNetwork("Network.loadingFailed", {requestId: params.requestId, ...this.logDetails});
this.handleLoadingFailed(params);
});
await cdp.send("Network.enable");
// Target
cdp.on("Target.attachedToTarget", async (params) => {
const { url, type, sessionId } = params.targetInfo;
if (type === "service_worker") {
this.swSessionId = sessionId;
this.swUrls.add(url);
}
});
cdp.on("Target.detachedFromTarget", async (params) => {
const { sessionId } = params;
if (this.swSessionId && sessionId === this.swSessionId) {
this.swUrls.clear();
this.swFrameIds.clear();
this.swSessionId = null;
}
});
await cdp.send("Target.setAutoAttach", {autoAttach: true, waitForDebuggerOnStart: false, flatten: true});
}
handleResponseReceived(params) {
const { requestId, response } = params;
const reqresp = this.pendingReqResp(requestId);
if (!reqresp) {
return;
}
reqresp.fillResponse(response);
}
handleRequestExtraInfo(params) {
if (!this.shouldSkip(params.headers)) {
const reqresp = this.pendingReqResp(params.requestId, true);
if (reqresp) {
reqresp.fillRequestExtraInfo(params);
}
}
}
handleRedirectResponse(params) {
const { requestId, redirectResponse } = params;
// remove and serialize, but allow reusing requestId
// as redirect chain may reuse same requestId for subsequent request
const reqresp = this.removeReqResp(requestId, true);
if (!reqresp) {
return;
}
reqresp.fillResponse(redirectResponse);
if (reqresp.isSelfRedirect()) {
logger.warn("Skipping self redirect", {url: reqresp. url, status: reqresp.status, ...this.logDetails}, "recorder");
return;
}
this.serializeToWARC(reqresp);
}
handleLoadingFailed(params) {
const { errorText, type, requestId } = params;
const reqresp = this.pendingReqResp(requestId, true);
if (!reqresp) {
return;
}
const { url } = reqresp;
switch (errorText) {
case "net::ERR_BLOCKED_BY_CLIENT":
logNetwork("Request blocked", {url, errorText, ...this.logDetails}, "recorder");
break;
case "net::ERR_ABORTED":
// check if this is a false positive -- a valid download that's already been fetched
// the abort is just for page, but download will succeed
if (url && type === "Document" && reqresp.isValidBinary()) {
this.serializeToWARC(reqresp);
//} else if (url) {
} else if (url && reqresp.requestHeaders && reqresp.requestHeaders["x-browsertrix-fetch"]) {
delete reqresp.requestHeaders["x-browsertrix-fetch"];
logger.warn("Attempt direct fetch of failed request", {url, ...this.logDetails}, "recorder");
const fetcher = new AsyncFetcher({tempdir: this.tempdir, reqresp, recorder: this, networkId: requestId});
this.fetcherQ.add(() => fetcher.load());
return;
}
break;
default:
logger.warn("Request failed", {url, errorText, ...this.logDetails}, "recorder");
}
this.removeReqResp(requestId);
}
handleLoadingFinished(params) {
const reqresp = this.pendingReqResp(params.requestId, true);
if (!reqresp || reqresp.asyncLoading) {
return;
}
this.removeReqResp(params.requestId);
if (!this.isValidUrl(reqresp.url)) {
return;
}
this.serializeToWARC(reqresp);
}
async handleRequestPaused(params, cdp, isSWorker = false) {
const { requestId, request, responseStatusCode, responseErrorReason, resourceType, networkId } = params;
const { method, headers, url } = request;
logNetwork("Fetch.requestPaused", {requestId, networkId, url, ...this.logDetails});
let continued = false;
try {
if (responseStatusCode && !responseErrorReason && !this.shouldSkip(headers, url, method, resourceType) && !(isSWorker && networkId)) {
continued = await this.handleFetchResponse(params, cdp, isSWorker);
}
} catch (e) {
logger.error("Error handling response, probably skipping URL", {url, ...errJSON(e), ...this.logDetails}, "recorder");
}
if (!continued) {
try {
await cdp.send("Fetch.continueResponse", {requestId});
} catch (e) {
logger.debug("continueResponse failed", {requestId, networkId, url, ...errJSON(e), ...this.logDetails}, "recorder");
}
}
}
async handleFetchResponse(params, cdp, isSWorker) {
const { request } = params;
const { url } = request;
const {requestId, responseErrorReason, responseStatusCode, responseHeaders} = params;
const networkId = params.networkId || requestId;
if (responseErrorReason) {
logger.warn("Skipping failed response", {url, reason: responseErrorReason, ...this.logDetails}, "recorder");
return false;
}
const contentLen = this._getContentLen(responseHeaders);
if (responseStatusCode === 206) {
const range = this._getContentRange(responseHeaders);
if (this.allowFull206 && range === `bytes 0-${contentLen - 1}/${contentLen}`) {
logger.debug("Keep 206 Response, Full Range", {range, contentLen, url, networkId, ...this.logDetails}, "recorder");
} else {
logger.debug("Skip 206 Response", {range, contentLen, url, ...this.logDetails}, "recorder");
this.removeReqResp(networkId);
return false;
}
}
const reqresp = this.pendingReqResp(networkId);
if (!reqresp) {
return false;
}
reqresp.fillFetchRequestPaused(params);
if (this.noResponseForStatus(responseStatusCode)) {
reqresp.payload = new Uint8Array();
if (isSWorker) {
this.removeReqResp(networkId);
await this.serializeToWARC(reqresp);
}
return false;
}
let streamingConsume = false;
if (contentLen < 0 || contentLen > MAX_BROWSER_FETCH_SIZE) {
const opts = {tempdir: this.tempdir, reqresp, expectedSize: contentLen, recorder: this, networkId, cdp};
// fetching using response stream, await here and then either call fulFill, or if not started, return false
if (contentLen < 0) {
const fetcher = new ResponseStreamAsyncFetcher({...opts, requestId, cdp });
const res = await fetcher.load();
switch (res) {
case "dupe":
this.removeReqResp(networkId);
return false;
case "fetched":
streamingConsume = true;
break;
}
}
// if not consumed via takeStream, attempt async loading
if (!streamingConsume) {
let fetcher = null;
if (reqresp.method !== "GET" || contentLen > MAX_NETWORK_LOAD_SIZE) {
fetcher = new AsyncFetcher(opts);
} else {
fetcher = new NetworkLoadStreamAsyncFetcher(opts);
}
this.fetcherQ.add(() => fetcher.load());
return false;
}
} else {
try {
logNetwork("Fetching response", {sizeExpected: this._getContentLen(responseHeaders), url, networkId, ...this.logDetails});
const { body, base64Encoded } = await cdp.send("Fetch.getResponseBody", {requestId});
reqresp.payload = Buffer.from(body, base64Encoded ? "base64" : "utf-8");
logNetwork("Fetch done", {size: reqresp.payload.length, url, networkId, ...this.logDetails});
} catch (e) {
logger.warn("Failed to load response body", {url, networkId, ...errJSON(e), ...this.logDetails}, "recorder");
return false;
}
}
const rewritten = await this.rewriteResponse(reqresp);
// if in service worker, serialize here
// as won't be getting a loadingFinished message
if (isSWorker && reqresp.payload) {
this.removeReqResp(networkId);
await this.serializeToWARC(reqresp);
}
// not rewritten, and not streaming, return false to continue
if (!rewritten && !streamingConsume) {
if (!reqresp.payload) {
logger.error("Unable to get payload skipping recording", {url, ...this.logDetails}, "recorder");
this.removeReqResp(networkId);
}
return false;
}
// if has payload, encode it, otherwise return empty string
const body = reqresp.payload && reqresp.payload.length ? Buffer.from(reqresp.payload).toString("base64") : "";
try {
await cdp.send("Fetch.fulfillRequest", {
requestId,
responseCode: responseStatusCode,
responseHeaders,
body
});
} catch (e) {
const type = reqresp.type;
if (type === "Document") {
logger.debug("document not loaded in browser, possibly other URLs missing", {url, type: reqresp.resourceType}, "recorder");
} else {
logger.debug("URL not loaded in browser", {url, type: reqresp.resourceType}, "recorder");
}
}
return true;
}
startPage({pageid, url}) {
this.pageid = pageid;
this.logDetails = {page: url, workerid: this.workerid};
if (this.pendingRequests && this.pendingRequests.size) {
logger.debug("Interrupting timed out requests, moving to next page", this.logDetails, "recorder");
}
this.pendingRequests = new Map();
this.skipIds = new Set();
this.skipping = false;
}
async finishPage() {
for (const [requestId, reqresp] of this.pendingRequests.entries()) {
if (reqresp.payload) {
this.removeReqResp(requestId);
await this.serializeToWARC(reqresp);
// no url, likely invalid
} else if (!reqresp.url) {
this.removeReqResp(requestId);
}
}
let numPending = this.pendingRequests.size;
while (numPending && !this.crawler.interrupted) {
const pending = [];
for (const [requestId, reqresp] of this.pendingRequests.entries()) {
const url = reqresp.url;
const entry = {requestId, url};
if (reqresp.expectedSize) {
entry.expectedSize = reqresp.expectedSize;
}
if (reqresp.readSize) {
entry.readSize = reqresp.readSize;
}
pending.push(entry);
}
logger.debug("Finishing pending requests for page", {numPending, pending, ...this.logDetails}, "recorder");
await sleep(5.0);
numPending = this.pendingRequests.size;
}
}
async onClosePage() {
// Any page-specific handling before page is closed.
}
async onDone() {
await this.crawlState.setStatus("pending-wait");
logger.debug("Finishing Fetcher Queue", this.logDetails, "recorder");
await this.fetcherQ.onIdle();
logger.debug("Finishing WARC writing", this.logDetails, "recorder");
await this.warcQ.onIdle();
await this.writer.flush();
}
shouldSkip(headers, url, method, resourceType) {
if (headers && !method) {
method = headers[":method"];
}
if (!this.isValidUrl(url)) {
return true;
}
if (method === "OPTIONS" || method === "HEAD") {
return true;
}
if (["EventSource", "WebSocket", "Ping"].includes(resourceType)) {
return true;
}
// beacon
if (resourceType === "Other" && method === "POST") {
return true;
}
// skip eventsource, resourceType may not be set correctly
if (headers && (headers["accept"] === "text/event-stream" || headers["Accept"] === "text/event-stream")) {
return true;
}
return false;
}
async rewriteResponse(reqresp) {
const { url, responseHeadersList, extraOpts, payload } = reqresp;
if (!payload || !payload.length) {
return false;
}
let newString = null;
let string = null;
const ct = this._getContentType(responseHeadersList);
switch (ct) {
case "application/x-mpegURL":
case "application/vnd.apple.mpegurl":
string = payload.toString("utf-8");
newString = rewriteHLS(string, {save: extraOpts});
break;
case "application/dash+xml":
string = payload.toString("utf-8");
newString = rewriteDASH(string, {save: extraOpts});
break;
case "text/html":
case "application/json":
case "text/javascript":
case "application/javascript":
case "application/x-javascript": {
const rw = baseDSRules.getRewriter(url);
if (rw !== baseDSRules.defaultRewriter) {
string = payload.toString("utf-8");
newString = rw.rewrite(string, {live: true, save: extraOpts});
}
break;
}
}
if (!newString) {
return false;
}
if (newString !== string) {
extraOpts.rewritten = 1;
logger.debug("Content Rewritten", {url, ...this.logDetails}, "recorder");
reqresp.payload = encoder.encode(newString);
return true;
} else {
return false;
}
//return Buffer.from(newString).toString("base64");
}
_getContentType(headers) {
for (let header of headers) {
if (header.name.toLowerCase() === "content-type") {
return header.value.split(";")[0];
}
}
return null;
}
_getContentLen(headers) {
for (let header of headers) {
if (header.name.toLowerCase() === "content-length") {
return Number(header.value);
}
}
return -1;
}
_getContentRange(headers) {
for (let header of headers) {
if (header.name.toLowerCase() === "content-range") {
return header.value;
}
}
return null;
}
noResponseForStatus(status) {
return (!status || status === 204 || (status >= 300 && status < 400));
}
isValidUrl(url) {
return url && (url.startsWith("https:") || url.startsWith("http:"));
}
pendingReqResp(requestId, reuseOnly = false) {
if (!this.pendingRequests.has(requestId)) {
if (reuseOnly || !requestId) {
return null;
}
if (this.skipIds.has(requestId)) {
logNetwork("Skipping ignored id", {requestId});
return null;
}
if (this.skipping) {
//logger.debug("Skipping request, page already finished", this.logDetails, "recorder");
return null;
}
const reqresp = new RequestResponseInfo(requestId);
this.pendingRequests.set(requestId, reqresp);
return reqresp;
} else {
const reqresp = this.pendingRequests.get(requestId);
if (requestId !== reqresp.requestId) {
logger.warn("Invalid request id", {requestId, actualRequestId: reqresp.requestId}, "recorder");
}
return reqresp;
}
}
removeReqResp(requestId, allowReuse=false) {
const reqresp = this.pendingRequests.get(requestId);
this.pendingRequests.delete(requestId);
if (!allowReuse) {
this.skipIds.add(requestId);
}
return reqresp;
}
async serializeToWARC(reqresp) {
if (!reqresp.payload) {
logNetwork("Not writing, no payload", {url: reqresp.url});
return;
}
if (reqresp.method === "GET" && !await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, reqresp.url)) {
logNetwork("Skipping dupe", {url: reqresp.url});
return;
}
const responseRecord = createResponse(reqresp, this.pageid);
const requestRecord = createRequest(reqresp, responseRecord, this.pageid);
this.warcQ.add(() => this.writer.writeRecordPair(responseRecord, requestRecord));
}
async directFetchCapture(url) {
const reqresp = new RequestResponseInfo(0);
reqresp.url = url;
reqresp.method = "GET";
logger.debug("Directly fetching page URL without browser", {url, ...this.logDetails}, "recorder");
const filter = (resp) => resp.status === 200 && !resp.headers.get("set-cookie");
// ignore dupes: if previous URL was not a page, still load as page. if previous was page,
// should not get here, as dupe pages tracked via seen list
const fetcher = new AsyncFetcher({tempdir: this.tempdir, reqresp, recorder: this, networkId: 0, filter, ignoreDupe: true});
const res = await fetcher.load();
const mime = reqresp && reqresp.responseHeaders["content-type"] && reqresp.responseHeaders["content-type"].split(";")[0];
return {fetched: res === "fetched", mime};
}
}
// =================================================================
class AsyncFetcher
{
constructor({tempdir, reqresp, expectedSize = -1, recorder, networkId, filter = null, ignoreDupe = false}) {
this.reqresp = reqresp;
this.reqresp.expectedSize = expectedSize;
this.reqresp.asyncLoading = true;
this.networkId = networkId;
this.filter = filter;
this.ignoreDupe = ignoreDupe;
this.recorder = recorder;
this.tempdir = tempdir;
this.filename = path.join(this.tempdir, `${timestampNow()}-${uuidv4()}.data`);
}
async load() {
const { reqresp, recorder, networkId, filename } = this;
const { url } = reqresp;
const { pageid, crawlState, gzip, logDetails } = recorder;
let fetched = "notfetched";
try {
if (reqresp.method === "GET" && !await crawlState.addIfNoDupe(ASYNC_FETCH_DUPE_KEY, url)) {
if (!this.ignoreDupe) {
this.reqresp.asyncLoading = false;
return "dupe";
}
}
const body = await this._doFetch();
fetched = "fetched";
const responseRecord = createResponse(reqresp, pageid, body);
const requestRecord = createRequest(reqresp, responseRecord, pageid);
const serializer = new WARCSerializer(responseRecord, {gzip, maxMemSize: MAX_BROWSER_FETCH_SIZE});
try {
let readSize = await serializer.digestRecord();
if (serializer.httpHeadersBuff) {
readSize -= serializer.httpHeadersBuff.length;
}
reqresp.readSize = readSize;
} catch (e) {
logger.error("Error reading + digesting payload", {url, filename, ...errJSON(e), ...logDetails}, "recorder");
}
if (reqresp.readSize === reqresp.expectedSize || reqresp.expectedSize < 0) {
logger.debug("Async fetch: streaming done", {size: reqresp.readSize, expected: reqresp.expectedSize, networkId, url, ...logDetails}, "recorder");
} else {
logger.warn("Async fetch: possible response size mismatch", {size: reqresp.readSize, expected: reqresp.expectedSize, url, ...logDetails}, "recorder");
//await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
//return fetched;
}
const externalBuffer = serializer.externalBuffer;
if (externalBuffer) {
const { currSize, buffers, fh } = externalBuffer;
if (buffers && buffers.length && !fh) {
reqresp.payload = Buffer.concat(buffers, currSize);
externalBuffer.buffers = [reqresp.payload];
}
}
if (Object.keys(reqresp.extraOpts).length) {
responseRecord.warcHeaders["WARC-JSON-Metadata"] = JSON.stringify(reqresp.extraOpts);
}
recorder.warcQ.add(() => recorder.writer.writeRecordPair(responseRecord, requestRecord, serializer));
} catch (e) {
logger.error("Streaming Fetch Error", {url, networkId, filename, ...errJSON(e), ...logDetails}, "recorder");
await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
} finally {
recorder.removeReqResp(networkId);
}
return fetched;
}
async _doFetch() {
const { reqresp } = this;
const { method, url } = reqresp;
logger.debug("Async started: fetch", {url}, "recorder");
const headers = reqresp.getRequestHeadersDict();
let signal = null;
let abort = null;
if (this.filter) {
abort = new AbortController();
signal = abort.signal;
}
const resp = await fetch(url, {method, headers, body: reqresp.postData || undefined, signal});
if (this.filter && !this.filter(resp)) {
abort.abort();
throw new Error("invalid response, ignoring fetch");
}
if (reqresp.expectedSize < 0 && resp.headers.get("content-length") && !resp.headers.get("content-encoding")) {
reqresp.expectedSize = Number(resp.headers.get("content-length") || -1);
}
if (reqresp.expectedSize === 0) {
reqresp.payload = new Uint8Array();
return;
} else if (!resp.body) {
logger.error("Empty body, stopping fetch", {url}, "recorder");
await this.recorder.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
return;
}
reqresp.fillFetchResponse(resp);
return this.takeReader(resp.body.getReader());
}
async* takeReader(reader) {
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
yield value;
}
} catch (e) {
logger.warn("takeReader interrupted", {...errJSON(e), url: this.reqresp.url, ...this.recorder.logDetails}, "recorder");
this.reqresp.truncated = "disconnect";
}
}
async* takeStreamIter(cdp, stream) {
try {
while (true) {
const {data, base64Encoded, eof} = await cdp.send("IO.read", {handle: stream});
const buff = Buffer.from(data, base64Encoded ? "base64" : "utf-8");
yield buff;
if (eof) {
break;
}
}
} catch (e) {
logger.warn("takeStream interrupted", {...errJSON(e), url: this.reqresp.url, ...this.recorder.logDetails}, "recorder");
this.reqresp.truncated = "disconnect";
}
}
}
// =================================================================
class ResponseStreamAsyncFetcher extends AsyncFetcher
{
constructor(opts) {
super(opts);
this.cdp = opts.cdp;
this.requestId = opts.requestId;
}
async _doFetch() {
const { requestId, reqresp, cdp } = this;
const { url } = reqresp;
logger.debug("Async started: takeStream", {url}, "recorder");
const { stream } = await cdp.send("Fetch.takeResponseBodyAsStream", {requestId});
return this.takeStreamIter(cdp, stream);
}
}
// =================================================================
class NetworkLoadStreamAsyncFetcher extends AsyncFetcher
{
constructor(opts) {
super(opts);
this.cdp = opts.cdp;
}
async _doFetch() {
const { reqresp, cdp } = this;
const { url } = reqresp;
logger.debug("Async started: loadNetworkResource", {url}, "recorder");
const options = {disableCache: false, includeCredentials: true};
let result = null;
try {
result = await cdp.send("Network.loadNetworkResource", {frameId: reqresp.frameId, url, options});
} catch (e) {
logger.debug("Network.loadNetworkResource failed, attempting node fetch", {url, ...errJSON(e), ...this.recorder.logDetails}, "recorder");
return await super._doFetch();
}
const { stream, headers, httpStatusCode, success, netError, netErrorName } = result.resource;
if (!success || !stream) {
//await this.recorder.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
logger.debug("Network.loadNetworkResource failed, attempting node fetch", {url, netErrorName, netError, httpStatusCode, ...this.recorder.logDetails}, "recorder");
return await super._doFetch();
}
if (reqresp.expectedSize < 0 && headers && headers["content-length"] && !headers["content-encoding"]) {
reqresp.expectedSize = Number(headers["content-length"] || -1);
}
if (reqresp.expectedSize === 0) {
reqresp.payload = new Uint8Array();
return;
}
reqresp.status = httpStatusCode;
reqresp.responseHeaders = headers || {};
return this.takeStreamIter(cdp, stream);
}
}
// =================================================================
// response
function createResponse(reqresp, pageid, contentIter) {
const url = reqresp.url;
const warcVersion = "WARC/1.1";
const statusline = `HTTP/1.1 ${reqresp.status} ${reqresp.statusText}`;
const date = new Date().toISOString();
const httpHeaders = reqresp.getResponseHeadersDict(reqresp.payload ? reqresp.payload.length : null);
const warcHeaders = {
"WARC-Page-ID": pageid,
};
if (reqresp.truncated) {
warcHeaders["WARC-Truncated"] = reqresp.truncated;
}
if (!contentIter) {
contentIter = [reqresp.payload];
}
if (Object.keys(reqresp.extraOpts).length) {
warcHeaders["WARC-JSON-Metadata"] = JSON.stringify(reqresp.extraOpts);
}
return WARCRecord.create({
url, date, warcVersion, type: "response", warcHeaders,
httpHeaders, statusline}, contentIter);
}
// =================================================================
// request
function createRequest(reqresp, responseRecord, pageid) {
const url = reqresp.url;
const warcVersion = "WARC/1.1";
const method = reqresp.method;
const urlParsed = new URL(url);
const statusline = `${method} ${url.slice(urlParsed.origin.length)} HTTP/1.1`;
const requestBody = reqresp.postData ? [encoder.encode(reqresp.postData)] : [];
const httpHeaders = reqresp.getRequestHeadersDict();
const warcHeaders = {
"WARC-Concurrent-To": responseRecord.warcHeader("WARC-Record-ID"),
"WARC-Page-ID": pageid,
};
const date = responseRecord.warcDate;
return WARCRecord.create({
url, date, warcVersion, type: "request", warcHeaders,
httpHeaders, statusline}, requestBody);
}

239
util/reqresp.js Normal file
View file

@ -0,0 +1,239 @@
import { getStatusText } from "@webrecorder/wabac/src/utils.js";
const CONTENT_LENGTH = "content-length";
const CONTENT_TYPE = "content-type";
const EXCLUDE_HEADERS = ["content-encoding", "transfer-encoding"];
// ===========================================================================
export class RequestResponseInfo
{
constructor(requestId) {
this._created = new Date();
this.requestId = requestId;
this.ts = null;
// request data
this.method = null;
this.url = null;
this.protocol = "HTTP/1.1";
this.requestHeaders = null;
this.requestHeadersText = null;
this.postData = null;
this.hasPostData = false;
// response data
this.status = 0;
this.statusText = null;
this.responseHeaders = null;
this.responseHeadersList = null;
this.responseHeadersText = null;
this.payload = null;
this.fromServiceWorker = false;
this.fetch = false;
this.resourceType = null;
this.extraOpts = {};
this.readSize = 0;
this.expectedSize = 0;
// set to true to indicate async loading in progress
this.asyncLoading = false;
// set to add truncated message
this.truncated = null;
}
fillRequest(params) {
this.url = params.request.url;
this.method = params.request.method;
if (!this.requestHeaders) {
this.requestHeaders = params.request.headers;
}
this.postData = params.request.postData;
this.hasPostData = params.request.hasPostData;
if (params.type) {
this.resourceType = params.type;
}
}
fillFetchRequestPaused(params) {
this.fillRequest(params);
this.status = params.responseStatusCode;
this.statusText = params.responseStatusText || getStatusText(this.status);
this.responseHeadersList = params.responseHeaders;
this.fetch = true;
this.resourceType = params.resourceType;
this.frameId = params.frameId;
}
fillResponse(response) {
// if initial fetch was a 200, but now replacing with 304, don't!
if (response.status == 304 && this.status && this.status != 304 && this.url) {
return;
}
this.url = response.url.split("#")[0];
this.status = response.status;
this.statusText = response.statusText || getStatusText(this.status);
this.protocol = response.protocol;
if (response.requestHeaders) {
this.requestHeaders = response.requestHeaders;
}
if (response.requestHeadersText) {
this.requestHeadersText = response.requestHeadersText;
}
this.responseHeaders = response.headers;
if (response.headersText) {
this.responseHeadersText = response.headersText;
}
this.fromServiceWorker = !!response.fromServiceWorker;
if (response.securityDetails) {
const issuer = response.securityDetails.issuer || "";
const ctc = response.securityDetails.certificateTransparencyCompliance === "compliant" ? "1" : "0";
this.extraOpts.cert = {issuer, ctc};
}
}
isSelfRedirect() {
if (this.status < 300 || this.status >= 400 || this.status === 304) {
return false;
}
try {
const headers = new Headers(this.responseHeaders);
const redirUrl = new URL(headers.get("location"), this.url).href;
return this.url === redirUrl;
} catch (e) {
return false;
}
}
fillResponseReceivedExtraInfo(params) {
// this.responseHeaders = params.headers;
// if (params.headersText) {
// this.responseHeadersText = params.headersText;
// }
this.extraOpts.ipType = params.resourceIPAddressSpace;
}
fillFetchResponse(response) {
this.responseHeaders = Object.fromEntries(response.headers);
this.status = response.status;
this.statusText = response.statusText || getStatusText(this.status);
}
fillRequestExtraInfo(params) {
this.requestHeaders = params.headers;
}
getResponseHeadersText() {
let headers = `${this.protocol} ${this.status} ${this.statusText}\r\n`;
for (const header of Object.keys(this.responseHeaders)) {
headers += `${header}: ${this.responseHeaders[header].replace(/\n/g, ", ")}\r\n`;
}
headers += "\r\n";
return headers;
}
hasRequest() {
return this.method && (this.requestHeaders || this.requestHeadersText);
}
getRequestHeadersDict() {
return this._getHeadersDict(this.requestHeaders, null);
}
getResponseHeadersDict(length) {
return this._getHeadersDict(this.responseHeaders, this.responseHeadersList, length);
}
_getHeadersDict(headersDict, headersList, actualContentLength) {
if (!headersDict && headersList) {
headersDict = {};
for (const header of headersList) {
let headerName = header.name.toLowerCase();
if (EXCLUDE_HEADERS.includes(headerName)) {
headerName = "x-orig-" + headerName;
continue;
}
if (actualContentLength && headerName === CONTENT_LENGTH) {
headersDict[headerName] = "" + actualContentLength;
continue;
}
headersDict[headerName] = header.value.replace(/\n/g, ", ");
}
}
if (!headersDict) {
return {};
}
for (const key of Object.keys(headersDict)) {
if (key[0] === ":") {
delete headersDict[key];
continue;
}
const keyLower = key.toLowerCase();
if (EXCLUDE_HEADERS.includes(keyLower)) {
headersDict["x-orig-" + key] = headersDict[key];
delete headersDict[key];
continue;
}
if (actualContentLength && keyLower === CONTENT_LENGTH) {
headersDict[key] = "" + actualContentLength;
continue;
}
headersDict[key] = headersDict[key].replace(/\n/g, ", ");
}
return headersDict;
}
isValidBinary() {
if (!this.payload) {
return false;
}
const length = this.payload.length;
const headers = new Headers(this.getResponseHeadersDict());
const contentType = headers.get(CONTENT_TYPE);
const contentLength = headers.get(CONTENT_LENGTH);
if (Number(contentLength) !== length) {
return false;
}
if (contentType && contentType.startsWith("text/html")) {
return false;
}
return true;
}
}

View file

@ -31,6 +31,7 @@ export class PageState
this.extraHops = redisData.extraHops;
this.workerid = null;
this.pageid = null;
this.title = null;
this.isHTMLPage = null;
@ -550,6 +551,14 @@ return 0;
return this._lastSize;
}
async addIfNoDupe(key, value) {
return await this.redis.sadd(key, value) === 1;
}
async removeDupe(key, value) {
return await this.redis.srem(key, value);
}
async logError(error) {
return await this.redis.lpush(this.ekey, error);
}

View file

@ -31,3 +31,7 @@ export function secondsElapsed(startTime, nowDate = null) {
return (nowDate.getTime() - startTime) / 1000;
}
export function timestampNow() {
return new Date().toISOString().replace(/[^\d]/g, "");
}

111
util/warcwriter.js Normal file
View file

@ -0,0 +1,111 @@
import fs from "fs";
import path from "path";
import { CDXIndexer } from "warcio";
import { WARCSerializer } from "warcio/node";
import { logger, errJSON } from "./logger.js";
// =================================================================
export class WARCWriter
{
constructor({archivesDir, tempCdxDir, filename, gzip, logDetails}) {
this.archivesDir = archivesDir;
this.tempCdxDir = tempCdxDir;
this.filename = filename;
this.gzip = gzip;
this.logDetails = logDetails;
this.offset = 0;
this.recordLength = 0;
if (this.tempCdxDir) {
this.indexer = new CDXIndexer({format: "cdxj"});
} else {
this.indexer = null;
}
this.fh = null;
this.cdxFH = null;
}
async initFH() {
if (!this.fh) {
this.fh = fs.createWriteStream(path.join(this.archivesDir, this.filename));
}
if (!this.cdxFH && this.tempCdxDir) {
this.cdxFH = fs.createWriteStream(path.join(this.tempCdxDir, this.filename + ".cdx"));
}
}
async writeRecordPair(responseRecord, requestRecord, responseSerializer = null) {
const opts = {gzip: this.gzip};
if (!responseSerializer) {
responseSerializer = new WARCSerializer(responseRecord, opts);
}
await this.initFH();
this.recordLength = await this._writeRecord(responseRecord, responseSerializer);
this._writeCDX(responseRecord);
const requestSerializer = new WARCSerializer(requestRecord, opts);
this.recordLength = await this._writeRecord(requestRecord, requestSerializer);
this._writeCDX(requestRecord);
}
async _writeRecord(record, serializer) {
let total = 0;
const url = record.warcTargetURI;
for await (const chunk of serializer) {
total += chunk.length;
try {
this.fh.write(chunk);
} catch (e) {
logger.error("Error writing to WARC, corruption possible", {...errJSON(e), url, ...this.logDetails}, "writer");
}
}
return total;
}
_writeCDX(record) {
if (this.indexer) {
const cdx = this.indexer.indexRecord(record, this, this.filename);
if (this.indexer && this.cdxFH && cdx) {
this.indexer.write(cdx, this.cdxFH);
}
}
this.offset += this.recordLength;
}
async flush() {
if (this.fh) {
await streamFinish(this.fh);
this.fh = null;
}
if (this.cdxFH) {
this._writeCDX(null);
await streamFinish(this.cdxFH);
this.cdxFH = null;
}
}
}
// =================================================================
export function streamFinish(fh) {
const p = new Promise(resolve => {
fh.once("finish", () => resolve());
});
fh.end();
return p;
}

View file

@ -1,6 +1,10 @@
import os from "os";
import { v4 as uuidv4 } from "uuid";
import { logger, errJSON } from "./logger.js";
import { sleep, timedRun } from "./timing.js";
import { Recorder } from "./recorder.js";
import { rxEscape } from "./seeds.js";
const MAX_REUSE = 5;
@ -10,7 +14,7 @@ const TEARDOWN_TIMEOUT = 10;
const FINISHED_TIMEOUT = 60;
// ===========================================================================
export function runWorkers(crawler, numWorkers, maxPageTime) {
export function runWorkers(crawler, numWorkers, maxPageTime, collDir) {
logger.info(`Creating ${numWorkers} workers`, {}, "worker");
const workers = [];
@ -31,8 +35,7 @@ export function runWorkers(crawler, numWorkers, maxPageTime) {
}
for (let i = 0; i < numWorkers; i++) {
//workers.push(new PageWorker(`worker-${i+1}`, crawler, maxPageTime));
workers.push(new PageWorker(i + offset, crawler, maxPageTime));
workers.push(new PageWorker(i + offset, crawler, maxPageTime, collDir));
}
return Promise.allSettled(workers.map((worker) => worker.run()));
@ -42,7 +45,7 @@ export function runWorkers(crawler, numWorkers, maxPageTime) {
// ===========================================================================
export class PageWorker
{
constructor(id, crawler, maxPageTime) {
constructor(id, crawler, maxPageTime, collDir) {
this.id = id;
this.crawler = crawler;
this.maxPageTime = maxPageTime;
@ -59,6 +62,10 @@ export class PageWorker
this.crashed = false;
this.markCrashed = null;
this.crashBreak = null;
this.recorder = new Recorder({workerid: id, collDir, crawler: this.crawler});
this.crawler.browser.recorders.push(this.recorder);
}
async closePage() {
@ -66,6 +73,10 @@ export class PageWorker
return;
}
if (this.recorder) {
await this.recorder.onClosePage();
}
if (!this.crashed) {
try {
await timedRun(
@ -140,7 +151,18 @@ export class PageWorker
this.page = page;
this.cdp = cdp;
this.callbacks = {};
this.opts = {page: this.page, cdp: this.cdp, workerid, callbacks: this.callbacks};
const directFetchCapture = this.recorder ? (x) => this.recorder.directFetchCapture(x) : null;
this.opts = {
page: this.page,
cdp: this.cdp,
workerid,
callbacks: this.callbacks,
directFetchCapture,
};
if (this.recorder) {
await this.recorder.onCreatePage(this.opts);
}
// updated per page crawl
this.crashed = false;
@ -184,6 +206,14 @@ export class PageWorker
}
}
async crawlPage(opts) {
const res = await this.crawler.crawlPage(opts);
if (this.recorder) {
await this.recorder.finishPage();
}
return res;
}
async timedCrawlPage(opts) {
const workerid = this.id;
const { data } = opts;
@ -193,10 +223,18 @@ export class PageWorker
this.logDetails = {page: url, workerid};
// set new page id
const pageid = uuidv4();
data.pageid = pageid;
if (this.recorder) {
this.recorder.startPage({pageid, url});
}
try {
await Promise.race([
timedRun(
this.crawler.crawlPage(opts),
this.crawlPage(opts),
this.maxPageTime,
"Page Worker Timeout",
this.logDetails,
@ -225,9 +263,13 @@ export class PageWorker
try {
await this.runLoop();
logger.info("Worker exiting, all tasks complete", {workerid: this.id}, "worker");
logger.info("Worker done, all tasks complete", {workerid: this.id}, "worker");
} catch (e) {
logger.error("Worker error, exiting", {...errJSON(e), workerid: this.id}, "worker");
} finally {
if (this.recorder) {
await this.recorder.onDone();
}
}
}

1150
yarn.lock

File diff suppressed because it is too large Load diff