Retry support and additional fixes (#743)

- retries: for failed pages, set retry to 5 in cases multiple retries
may be needed.
- redirect: if page url is /path/ -> /path, don't add as extra seed
- proxy: don't use global dispatcher, pass dispatcher explicitly when
using proxy, as proxy may interfere with local network requests
- final exit flag: if crawl is done and also interrupted, ensure WACZ is
still written/uploaded by setting final exit to true
- hashtag only change force reload: if loading page with same URL but
different hashtag, eg. `https://example.com/#B` after
`https://example.com/#A`, do a full reload
This commit is contained in:
Ilya Kreymer 2025-01-25 22:55:49 -08:00 committed by GitHub
parent 5d9c62e264
commit f7cbf9645b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 212 additions and 74 deletions

View file

@ -1,6 +1,6 @@
{
"name": "browsertrix-crawler",
"version": "1.5.0-beta.2",
"version": "1.5.0-beta.3",
"main": "browsertrix-crawler",
"type": "module",
"repository": "https://github.com/webrecorder/browsertrix-crawler",

View file

@ -46,6 +46,7 @@ import {
ExtractSelector,
PAGE_OP_TIMEOUT_SECS,
SITEMAP_INITIAL_FETCH_TIMEOUT_SECS,
MAX_RETRY_FAILED,
} from "./util/constants.js";
import { AdBlockRules, BlockRuleDecl, BlockRules } from "./util/blockrules.js";
@ -1152,13 +1153,13 @@ self.__bx_behaviors.selectMainBehavior();
}
async pageFinished(data: PageState) {
await this.writePage(data);
// if page loaded, considered page finished successfully
// (even if behaviors timed out)
const { loadState, logDetails, depth, url } = data;
const { loadState, logDetails, depth, url, retry } = data;
if (data.loadState >= LoadState.FULL_PAGE_LOADED) {
await this.writePage(data);
logger.info("Page Finished", { loadState, ...logDetails }, "pageStatus");
await this.crawlState.markFinished(url);
@ -1171,6 +1172,9 @@ self.__bx_behaviors.selectMainBehavior();
await this.checkLimits();
} else {
if (retry >= MAX_RETRY_FAILED) {
await this.writePage(data);
}
await this.crawlState.markFailed(url);
if (this.healthChecker) {
@ -1370,7 +1374,7 @@ self.__bx_behaviors.selectMainBehavior();
}
if (this.params.failOnFailedLimit) {
const numFailed = await this.crawlState.numFailed();
const numFailed = await this.crawlState.numFailedWillRetry();
const failedLimit = this.params.failOnFailedLimit;
if (numFailed >= failedLimit) {
logger.fatal(
@ -1498,6 +1502,7 @@ self.__bx_behaviors.selectMainBehavior();
logger.info("crawl already finished, running post-crawl tasks", {
state: initState,
});
this.finalExit = true;
await this.postCrawl();
return;
} else if (await this.crawlState.isCrawlStopped()) {
@ -1581,8 +1586,11 @@ self.__bx_behaviors.selectMainBehavior();
await this.writeStats();
// if crawl has been stopped, mark as final exit for post-crawl tasks
if (await this.crawlState.isCrawlStopped()) {
// if crawl has been stopped or finished, mark as final exit for post-crawl tasks
if (
(await this.crawlState.isCrawlStopped()) ||
(await this.crawlState.isFinished())
) {
this.finalExit = true;
}
@ -1822,16 +1830,19 @@ self.__bx_behaviors.selectMainBehavior();
const realSize = await this.crawlState.queueSize();
const pendingPages = await this.crawlState.getPendingList();
const done = await this.crawlState.numDone();
const failed = await this.crawlState.numFailed();
const total = realSize + pendingPages.length + done;
const pending = pendingPages.length;
const crawled = await this.crawlState.numDone();
const failedWillRetry = await this.crawlState.numFailedWillRetry();
const failed = await this.crawlState.numFailedNoRetry();
const total = realSize + pendingPages.length + crawled;
const limit = { max: this.pageLimit || 0, hit: this.limitHit };
const stats = {
crawled: done,
total: total,
pending: pendingPages.length,
failed: failed,
limit: limit,
crawled,
total,
pending,
failedWillRetry,
failed,
limit,
pendingPages,
};
@ -1885,12 +1896,14 @@ self.__bx_behaviors.selectMainBehavior();
}
};
const handleFirstLoadEvents = () => {
page.on("response", waitFirstResponse);
// store that domcontentloaded was finished
page.once("domcontentloaded", () => {
data.loadState = LoadState.CONTENT_LOADED;
});
};
const gotoOpts = data.isHTMLPage
? this.gotoOpts
@ -1898,9 +1911,24 @@ self.__bx_behaviors.selectMainBehavior();
logger.info("Awaiting page load", logDetails);
const urlNoHash = url.split("#")[0];
const fullRefresh = urlNoHash === page.url().split("#")[0];
try {
if (!fullRefresh) {
handleFirstLoadEvents();
}
// store the page load response when page fully loads
fullLoadedResponse = await page.goto(url, gotoOpts);
if (fullRefresh) {
logger.debug("Hashtag-only change, doing full page reload");
handleFirstLoadEvents();
fullLoadedResponse = await page.reload(gotoOpts);
}
} catch (e) {
if (!(e instanceof Error)) {
throw e;
@ -1921,7 +1949,7 @@ self.__bx_behaviors.selectMainBehavior();
} else if (!downloadResponse) {
// log if not already log and rethrow, consider page failed
if (msg !== "logged") {
logger.error("Page Load Failed, skipping page", {
logger.error("Page Load Failed, will retry", {
msg,
loadState: data.loadState,
...logDetails,
@ -1944,7 +1972,8 @@ self.__bx_behaviors.selectMainBehavior();
if (
depth === 0 &&
!isChromeError &&
respUrl !== url.split("#")[0] &&
respUrl !== urlNoHash &&
respUrl + "/" !== url &&
!downloadResponse
) {
data.seedId = await this.crawlState.addExtraSeed(
@ -2652,8 +2681,9 @@ self.__bx_behaviors.selectMainBehavior();
if (this.origConfig) {
this.origConfig.state = state;
}
const res = yaml.dump(this.origConfig, { lineWidth: -1 });
try {
const res = yaml.dump(this.origConfig, { lineWidth: -1 });
logger.info(`Saving crawl state to: ${filename}`);
await fsp.writeFile(filename, res);
} catch (e) {

View file

@ -5,6 +5,7 @@ import { HTTPRequest, Page } from "puppeteer-core";
import { Browser } from "./browser.js";
import { fetch } from "undici";
import { getProxyDispatcher } from "./proxy.js";
const RULE_TYPES = ["block", "allowOnly"];
@ -271,7 +272,7 @@ export class BlockRules {
logDetails: Record<string, any>,
) {
try {
const res = await fetch(reqUrl);
const res = await fetch(reqUrl, { dispatcher: getProxyDispatcher() });
const text = await res.text();
return !!text.match(frameTextMatch);
@ -302,6 +303,7 @@ export class BlockRules {
method: "PUT",
headers: { "Content-Type": "text/html" },
body,
dispatcher: getProxyDispatcher(),
});
}
}

View file

@ -27,6 +27,7 @@ export const ADD_LINK_FUNC = "__bx_addLink";
export const FETCH_FUNC = "__bx_fetch";
export const MAX_DEPTH = 1000000;
export const MAX_RETRY_FAILED = 5;
export const FETCH_HEADERS_TIMEOUT_SECS = 30;
export const PAGE_OP_TIMEOUT_SECS = 5;

View file

@ -6,6 +6,7 @@ import util from "util";
import { exec as execCallback } from "child_process";
import { logger } from "./logger.js";
import { getProxyDispatcher } from "./proxy.js";
const exec = util.promisify(execCallback);
@ -85,7 +86,7 @@ async function collectOnlineBehavior(url: string): Promise<FileSources> {
const behaviorFilepath = `/app/behaviors/${filename}`;
try {
const res = await fetch(url);
const res = await fetch(url, { dispatcher: getProxyDispatcher() });
const fileContents = await res.text();
await fsp.writeFile(behaviorFilepath, fileContents);
logger.info(

View file

@ -3,6 +3,7 @@ import { formatErr, logger } from "./logger.js";
import { Browser } from "./browser.js";
import { fetch } from "undici";
import { getProxyDispatcher } from "./proxy.js";
export class OriginOverride {
originOverride: { origUrl: URL; destUrl: URL }[];
@ -45,7 +46,10 @@ export class OriginOverride {
headers.set("origin", orig.origin);
}
const resp = await fetch(newUrl, { headers });
const resp = await fetch(newUrl, {
headers,
dispatcher: getProxyDispatcher(),
});
const body = Buffer.from(await resp.arrayBuffer());
const respHeaders = Object.fromEntries(resp.headers);

View file

@ -1,5 +1,5 @@
import net from "net";
import { Agent, Dispatcher, ProxyAgent, setGlobalDispatcher } from "undici";
import { Agent, Dispatcher, ProxyAgent } from "undici";
import child_process from "child_process";
@ -13,6 +13,8 @@ const SSH_PROXY_LOCAL_PORT = 9722;
const SSH_WAIT_TIMEOUT = 30000;
let proxyDispatcher: Dispatcher | undefined = undefined;
export function getEnvProxyUrl() {
if (process.env.PROXY_SERVER) {
return process.env.PROXY_SERVER;
@ -46,10 +48,14 @@ export async function initProxy(
// set global fetch() dispatcher (with proxy, if any)
const dispatcher = createDispatcher(proxy, agentOpts);
setGlobalDispatcher(dispatcher);
proxyDispatcher = dispatcher;
return proxy;
}
export function getProxyDispatcher() {
return proxyDispatcher;
}
export function createDispatcher(
proxyUrl: string,
opts: Agent.Options,

View file

@ -8,7 +8,7 @@ import {
isRedirectStatus,
} from "./reqresp.js";
import { fetch, getGlobalDispatcher, Response } from "undici";
import { fetch, Response } from "undici";
import {
getCustomRewriter,
@ -23,6 +23,7 @@ import { WARCWriter } from "./warcwriter.js";
import { RedisCrawlState, WorkerId } from "./state.js";
import { CDPSession, Protocol } from "puppeteer-core";
import { Crawler } from "../crawler.js";
import { getProxyDispatcher } from "./proxy.js";
const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000;
const MAX_TEXT_REWRITE_SIZE = 25_000_000;
@ -1588,7 +1589,10 @@ class AsyncFetcher {
const headers = reqresp.getRequestHeadersDict();
const dispatcher = getGlobalDispatcher().compose((dispatch) => {
let dispatcher = getProxyDispatcher();
if (dispatcher) {
dispatcher = dispatcher.compose((dispatch) => {
return (opts, handler) => {
if (opts.headers) {
reqresp.requestHeaders = opts.headers as Record<string, string>;
@ -1596,6 +1600,7 @@ class AsyncFetcher {
return dispatch(opts, handler);
};
});
}
const resp = await fetch(url!, {
method,

View file

@ -10,6 +10,7 @@ import { DETECT_SITEMAP } from "./constants.js";
import { sleep } from "./timing.js";
import { fetch, Response } from "undici";
import { getProxyDispatcher } from "./proxy.js";
const SITEMAP_CONCURRENCY = 5;
@ -65,7 +66,10 @@ export class SitemapReader extends EventEmitter {
async _fetchWithRetry(url: string, message: string) {
while (true) {
const resp = await fetch(url, { headers: this.headers });
const resp = await fetch(url, {
headers: this.headers,
dispatcher: getProxyDispatcher(),
});
if (resp.ok) {
return resp;

View file

@ -3,7 +3,7 @@ import { v4 as uuidv4 } from "uuid";
import { logger } from "./logger.js";
import { MAX_DEPTH } from "./constants.js";
import { MAX_DEPTH, MAX_RETRY_FAILED } from "./constants.js";
import { ScopedSeed } from "./seeds.js";
import { Frame } from "puppeteer-core";
@ -35,6 +35,7 @@ export type QueueEntry = {
extraHops: number;
ts?: number;
pageid?: string;
retry?: number;
};
// ============================================================================
@ -54,6 +55,7 @@ export class PageState {
seedId: number;
depth: number;
extraHops: number;
retry: number;
status: number;
@ -87,6 +89,7 @@ export class PageState {
}
this.pageid = redisData.pageid || uuidv4();
this.status = 0;
this.retry = redisData.retry || 0;
}
}
@ -115,17 +118,12 @@ declare module "ioredis" {
uid: string,
): Result<void, Context>;
movefailed(
pkey: string,
fkey: string,
url: string,
value: string,
state: string,
): Result<void, Context>;
movefailed(pkey: string, fkey: string, url: string): Result<void, Context>;
requeuefailed(
fkey: string,
qkey: string,
ffkey: string,
maxRetryPending: number,
maxRegularDepth: number,
): Result<number, Context>;
@ -170,7 +168,7 @@ export type SaveState = {
// ============================================================================
export class RedisCrawlState {
redis: Redis;
maxRetryPending = 1;
maxRetryPending = MAX_RETRY_FAILED;
uid: string;
key: string;
@ -181,6 +179,7 @@ export class RedisCrawlState {
skey: string;
dkey: string;
fkey: string;
ffkey: string;
ekey: string;
pageskey: string;
esKey: string;
@ -202,6 +201,8 @@ export class RedisCrawlState {
this.dkey = this.key + ":d";
// failed
this.fkey = this.key + ":f";
// failed final, no more retry
this.ffkey = this.key + ":ff";
// crawler errors
this.ekey = this.key + ":e";
// pages
@ -283,7 +284,6 @@ local json = redis.call('hget', KEYS[1], ARGV[1]);
if json then
local data = cjson.decode(json);
data[ARGV[3]] = ARGV[2];
json = cjson.encode(data);
redis.call('lpush', KEYS[2], json);
@ -294,23 +294,25 @@ end
});
redis.defineCommand("requeuefailed", {
numberOfKeys: 2,
numberOfKeys: 3,
lua: `
local json = redis.call('rpop', KEYS[1]);
if json then
local data = cjson.decode(json);
data['retry'] = (data['retry'] or 0) + 1;
if tonumber(data['retry']) <= tonumber(ARGV[1]) then
json = cjson.encode(data);
if data['retry'] <= tonumber(ARGV[1]) then
local json = cjson.encode(data);
local score = (data['depth'] or 0) + ((data['extraHops'] or 0) * ARGV[2]);
redis.call('zadd', KEYS[2], score, json);
return 1;
return data['retry'];
else
return 2;
redis.call('lpush', KEYS[3], json);
return 0;
end
end
return 0;
return -1;
`,
});
@ -382,9 +384,7 @@ return inx;
}
async markFailed(url: string) {
await this.redis.movefailed(this.pkey, this.fkey, url, "1", "failed");
return await this.redis.incr(this.dkey);
await this.redis.movefailed(this.pkey, this.fkey, url);
}
async markExcluded(url: string) {
@ -400,7 +400,10 @@ return inx;
}
async isFinished() {
return (await this.queueSize()) == 0 && (await this.numDone()) > 0;
return (
(await this.queueSize()) + (await this.numFailedWillRetry()) == 0 &&
(await this.numDone()) + (await this.numFailedNoRetry()) > 0
);
}
async setStatus(status_: string) {
@ -572,23 +575,20 @@ return inx;
async nextFromQueue() {
let json = await this._getNext();
let retryFailed = false;
let retry = 0;
if (!json) {
const res = await this.redis.requeuefailed(
retry = await this.redis.requeuefailed(
this.fkey,
this.qkey,
this.ffkey,
this.maxRetryPending,
MAX_DEPTH,
);
switch (res) {
case 1:
if (retry > 0) {
json = await this._getNext();
retryFailed = true;
break;
case 2:
} else if (retry === 0) {
logger.debug("Did not retry failed, already retried", {}, "state");
return null;
}
@ -607,8 +607,8 @@ return inx;
return null;
}
if (retryFailed) {
logger.debug("Retring failed URL", { url: data.url }, "state");
if (retry) {
logger.debug("Retrying failed URL", { url: data.url, retry }, "state");
}
await this.markStarted(data.url);
@ -626,11 +626,14 @@ return inx;
const seen = await this._iterSet(this.skey);
const queued = await this._iterSortedKey(this.qkey, seen);
const pending = await this.getPendingList();
const failed = await this._iterListKeys(this.fkey, seen);
const failedWillRetry = await this._iterListKeys(this.fkey, seen);
const failedNoRetry = await this._iterListKeys(this.ffkey, seen);
const errors = await this.getErrorList();
const extraSeeds = await this._iterListKeys(this.esKey, seen);
const sitemapDone = await this.isSitemapDone();
const failed = failedWillRetry.concat(failedNoRetry);
const finished = [...seen.values()];
return {
@ -721,6 +724,7 @@ return inx;
await this.redis.del(this.pkey);
await this.redis.del(this.dkey);
await this.redis.del(this.fkey);
await this.redis.del(this.ffkey);
await this.redis.del(this.skey);
await this.redis.del(this.ekey);
@ -803,7 +807,12 @@ return inx;
for (const json of state.failed) {
const data = JSON.parse(json);
const retry = data.retry || 0;
if (retry <= this.maxRetryPending) {
await this.redis.zadd(this.qkey, this._getScore(data), json);
} else {
await this.redis.rpush(this.ffkey, json);
}
seen.push(data.url);
}
@ -831,10 +840,14 @@ return inx;
return res;
}
async numFailed() {
async numFailedWillRetry() {
return await this.redis.llen(this.fkey);
}
async numFailedNoRetry() {
return await this.redis.llen(this.ffkey);
}
async getPendingList() {
return await this.redis.hvals(this.pkey);
}

View file

@ -50,6 +50,7 @@ test("check that stats file format is correct", () => {
expect(dataJSON.total).toEqual(3);
expect(dataJSON.pending).toEqual(0);
expect(dataJSON.failed).toEqual(0);
expect(dataJSON.failedWillRetry).toEqual(0);
expect(dataJSON.limit.max).toEqual(3);
expect(dataJSON.limit.hit).toBe(true);
expect(dataJSON.pendingPages.length).toEqual(0);

View file

@ -0,0 +1,71 @@
import { execSync, spawn } from "child_process";
import fs from "fs";
import Redis from "ioredis";
const DOCKER_HOST_NAME = process.env.DOCKER_HOST_NAME || "host.docker.internal";
async function sleep(time) {
await new Promise((resolve) => setTimeout(resolve, time));
}
test("run crawl", async () => {
let status = 0;
execSync(`docker run -d -v $PWD/test-crawls:/crawls -e CRAWL_ID=test -p 36387:6379 --rm webrecorder/browsertrix-crawler crawl --url http://${DOCKER_HOST_NAME}:31501 --url https://example.com/ --limit 2 --pageExtraDelay 10 --debugAccessRedis --collection retry-fail`);
/*
async function runServer() {
console.log("Waiting to start server");
await sleep(2000);
console.log("Starting server");
//spawn("../../node_modules/.bin/http-server", ["-p", "31501", "--username", "user", "--password", "pass"], {cwd: "./docs/site"});
}
*/
const redis = new Redis("redis://127.0.0.1:36387/0", { lazyConnect: true, retryStrategy: () => null });
await sleep(3000);
let numRetries = 0;
try {
await redis.connect({
maxRetriesPerRequest: 100,
});
//runServer();
while (true) {
const res = await redis.lrange("test:ff", 0, -1);
if (res.length) {
const data = JSON.parse(res);
if (data.retry) {
numRetries = data.retry;
break;
}
}
await sleep(20);
}
} catch (e) {
console.error(e);
} finally {
expect(numRetries).toBe(5);
}
});
test("check only one failed page entry is made", () => {
expect(
fs.existsSync("test-crawls/collections/retry-fail/pages/pages.jsonl"),
).toBe(true);
expect(
fs
.readFileSync(
"test-crawls/collections/retry-fail/pages/pages.jsonl",
"utf8",
).trim().split("\n").length
).toBe(3);
});