Retry same queue (#757)

- follow up to #743
- page retries are simply added back to the same queue with `retry`
param incremented and a higher scope, after extraHops, to ensure retries
are added at the end.
- score calculation is: `score = depth + (extraHops * MAX_DEPTH) +
(retry * MAX_DEPTH * 2)`, this ensures that retries have lower priority
than extraHops, and additional retries even lower priority (higher
score).
- warning is logged when a retry happens, error only when all retries
are exhausted.
- back to one failure list, urls added there only when all retries are
exhausted.
- rename --numRetries -> --maxRetries / --retries for clarity
- state load: allow retrying previously failed URLs if --maxRetries is
higher then on previous run.
- ensure working with --failOnFailedStatus, if provided, invalid status
codes (>= 400) are retried along with page load failures
- fixes #132

---------

Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
This commit is contained in:
Ilya Kreymer 2025-02-06 18:48:40 -08:00 committed by GitHub
parent 5c9d808651
commit 00835fc4f2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 218 additions and 131 deletions

View file

@ -240,8 +240,9 @@ Options:
s [boolean] [default: false]
--writePagesToRedis If set, write page objects to redis
[boolean] [default: false]
--numRetries If set, number of times to retry a p
age that failed to load
--maxPageRetries, --retries If set, number of times to retry a p
age that failed to load before page
is considered to have failed
[number] [default: 1]
--failOnFailedSeed If set, crawler will fail with exit
code 1 if any seed fails. When combi

View file

@ -379,7 +379,7 @@ export class Crawler {
this.crawlId,
this.maxPageTime,
os.hostname(),
this.params.numRetries,
this.params.maxPageRetries,
);
// load full state from config
@ -1202,22 +1202,30 @@ self.__bx_behaviors.selectMainBehavior();
await this.checkLimits();
} else {
if (retry >= this.params.numRetries && !pageSkipped) {
if (retry >= this.params.maxPageRetries && !pageSkipped) {
await this.writePage(data);
}
if (pageSkipped) {
await this.crawlState.markExcluded(url);
} else {
await this.crawlState.markFailed(url);
}
if (this.healthChecker) {
this.healthChecker.incError();
}
const retry = await this.crawlState.markFailed(url);
await this.serializeConfig();
if (retry < 0) {
if (this.healthChecker) {
this.healthChecker.incError();
}
if (depth === 0 && this.params.failOnFailedSeed) {
logger.fatal("Seed Page Load Failed, failing crawl", {}, "general", 1);
await this.serializeConfig();
if (depth === 0 && this.params.failOnFailedSeed) {
logger.fatal(
"Seed Page Load Failed, failing crawl",
{},
"general",
1,
);
}
}
}
await this.checkLimits();
@ -1407,7 +1415,7 @@ self.__bx_behaviors.selectMainBehavior();
}
if (this.params.failOnFailedLimit) {
const numFailed = await this.crawlState.numFailedWillRetry();
const numFailed = await this.crawlState.numFailed();
const failedLimit = this.params.failOnFailedLimit;
if (numFailed >= failedLimit) {
logger.fatal(
@ -1875,15 +1883,13 @@ self.__bx_behaviors.selectMainBehavior();
const pendingPages = await this.crawlState.getPendingList();
const pending = pendingPages.length;
const crawled = await this.crawlState.numDone();
const failedWillRetry = await this.crawlState.numFailedWillRetry();
const failed = await this.crawlState.numFailedNoRetry();
const failed = await this.crawlState.numFailed();
const total = realSize + pendingPages.length + crawled;
const limit = { max: this.pageLimit || 0, hit: this.limitHit };
const stats = {
crawled,
total,
pending,
failedWillRetry,
failed,
limit,
pendingPages,
@ -1904,8 +1910,26 @@ self.__bx_behaviors.selectMainBehavior();
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
pageFailed(msg: string, retry: number, msgData: any) {
if (retry < this.params.maxPageRetries) {
logger.warn(
msg + ": will retry",
{ retry, retries: this.params.maxPageRetries, ...msgData },
"pageStatus",
);
} else {
logger.error(
msg + ": retry limit reached",
{ retry, retries: this.params.maxPageRetries, ...msgData },
"pageStatus",
);
}
throw new Error("logged");
}
async loadPage(page: Page, data: PageState, seed: ScopedSeed) {
const { url, depth } = data;
const { url, depth, retry } = data;
const logDetails = data.logDetails;
@ -1999,22 +2023,24 @@ self.__bx_behaviors.selectMainBehavior();
data.pageSkipped = true;
logger.warn("Page Load Blocked, skipping", { msg, loadState });
} else {
logger.error("Page Load Failed, will retry", {
return this.pageFailed("Page Load Failed", retry, {
msg,
url,
loadState,
...logDetails,
});
}
e.message = "logged";
}
throw e;
}
}
const resp = fullLoadedResponse || downloadResponse || firstResponse;
if (!resp) {
throw new Error("no response for page load, assuming failed");
return this.pageFailed("Page Load Failed, no response", retry, {
url,
...logDetails,
});
}
const respUrl = resp.url().split("#")[0];
@ -2051,14 +2077,11 @@ self.__bx_behaviors.selectMainBehavior();
}
if (failed) {
logger.error(
return this.pageFailed(
isChromeError ? "Page Crashed on Load" : "Page Invalid Status",
{
status,
...logDetails,
},
retry,
{ url, status, ...logDetails },
);
throw new Error("logged");
}
const contentType = resp.headers()["content-type"];

View file

@ -17,7 +17,7 @@ import {
DEFAULT_SELECTORS,
BEHAVIOR_TYPES,
ExtractSelector,
DEFAULT_NUM_RETRIES,
DEFAULT_MAX_RETRIES,
} from "./constants.js";
import { ScopedSeed } from "./seeds.js";
import { interpolateFilename } from "./storage.js";
@ -550,11 +550,12 @@ class ArgParser {
default: false,
},
numRetries: {
maxPageRetries: {
alias: "retries",
describe:
"If set, number of times to retry a page that failed to load",
"If set, number of times to retry a page that failed to load before page is considered to have failed",
type: "number",
default: DEFAULT_NUM_RETRIES,
default: DEFAULT_MAX_RETRIES,
},
failOnFailedSeed: {

View file

@ -27,7 +27,7 @@ export const ADD_LINK_FUNC = "__bx_addLink";
export const FETCH_FUNC = "__bx_fetch";
export const MAX_DEPTH = 1000000;
export const DEFAULT_NUM_RETRIES = 1;
export const DEFAULT_MAX_RETRIES = 1;
export const FETCH_HEADERS_TIMEOUT_SECS = 30;
export const PAGE_OP_TIMEOUT_SECS = 5;

View file

@ -3,7 +3,7 @@ import { v4 as uuidv4 } from "uuid";
import { logger } from "./logger.js";
import { MAX_DEPTH, DEFAULT_NUM_RETRIES } from "./constants.js";
import { MAX_DEPTH, DEFAULT_MAX_RETRIES } from "./constants.js";
import { ScopedSeed } from "./seeds.js";
import { Frame } from "puppeteer-core";
import { interpolateFilename } from "./storage.js";
@ -120,16 +120,6 @@ declare module "ioredis" {
uid: string,
): Result<void, Context>;
movefailed(pkey: string, fkey: string, url: string): Result<void, Context>;
requeuefailed(
fkey: string,
qkey: string,
ffkey: string,
maxRetries: number,
maxRegularDepth: number,
): Result<number, Context>;
unlockpending(
pkeyUrl: string,
uid: string,
@ -145,6 +135,15 @@ declare module "ioredis" {
maxRegularDepth: number,
): Result<number, Context>;
requeuefailed(
pkey: string,
qkey: string,
fkey: string,
url: string,
maxRetries: number,
maxRegularDepth: number,
): Result<number, Context>;
addnewseed(
esKey: string,
esMap: string,
@ -181,7 +180,6 @@ export class RedisCrawlState {
skey: string;
dkey: string;
fkey: string;
ffkey: string;
ekey: string;
pageskey: string;
esKey: string;
@ -203,17 +201,15 @@ export class RedisCrawlState {
this.uid = uid;
this.key = key;
this.maxPageTime = maxPageTime;
this.maxRetries = maxRetries || DEFAULT_NUM_RETRIES;
this.maxRetries = maxRetries ?? DEFAULT_MAX_RETRIES;
this.qkey = this.key + ":q";
this.pkey = this.key + ":p";
this.skey = this.key + ":s";
// done (integer)
this.dkey = this.key + ":d";
// failed
this.fkey = this.key + ":f";
// failed final, no more retry
this.ffkey = this.key + ":ff";
this.fkey = this.key + ":f";
// crawler errors
this.ekey = this.key + ":e";
// pages
@ -288,42 +284,30 @@ end
`,
});
redis.defineCommand("movefailed", {
numberOfKeys: 2,
redis.defineCommand("requeuefailed", {
numberOfKeys: 3,
lua: `
local json = redis.call('hget', KEYS[1], ARGV[1]);
if json then
local data = cjson.decode(json);
json = cjson.encode(data);
local retry = data['retry'] or 0;
redis.call('lpush', KEYS[2], json);
redis.call('hdel', KEYS[1], ARGV[1]);
end
`,
});
redis.defineCommand("requeuefailed", {
numberOfKeys: 3,
lua: `
local json = redis.call('rpop', KEYS[1]);
if json then
local data = cjson.decode(json);
data['retry'] = (data['retry'] or 0) + 1;
if data['retry'] <= tonumber(ARGV[1]) then
local json = cjson.encode(data);
local score = (data['depth'] or 0) + ((data['extraHops'] or 0) * ARGV[2]);
if retry < tonumber(ARGV[2]) then
retry = retry + 1;
data['retry'] = retry;
json = cjson.encode(data);
local score = (data['depth'] or 0) + ((data['extraHops'] or 0) * ARGV[3]) + (retry * ARGV[3] * 2);
redis.call('zadd', KEYS[2], score, json);
return data['retry'];
return retry;
else
redis.call('lpush', KEYS[3], json);
return 0;
end
end
return -1;
`,
});
@ -335,11 +319,15 @@ if not res then
local json = redis.call('hget', KEYS[1], ARGV[1]);
if json then
local data = cjson.decode(json);
data['retry'] = (data['retry'] or 0) + 1;
local retry = data['retry'] or 0;
redis.call('hdel', KEYS[1], ARGV[1]);
if tonumber(data['retry']) <= tonumber(ARGV[2]) then
if retry < tonumber(ARGV[2]) then
retry = retry + 1;
data['retry'] = retry;
json = cjson.encode(data);
local score = (data['depth'] or 0) + ((data['extraHops'] or 0) * ARGV[3]);
local score = (data['depth'] or 0) + ((data['extraHops'] or 0) * ARGV[3]) + (retry * ARGV[3] * 2);
redis.call('zadd', KEYS[2], score, json);
return 1;
else
@ -395,7 +383,14 @@ return inx;
}
async markFailed(url: string) {
await this.redis.movefailed(this.pkey, this.fkey, url);
return await this.redis.requeuefailed(
this.pkey,
this.qkey,
this.fkey,
url,
this.maxRetries,
MAX_DEPTH,
);
}
async markExcluded(url: string) {
@ -411,10 +406,7 @@ return inx;
}
async isFinished() {
return (
(await this.queueSize()) + (await this.numFailedWillRetry()) == 0 &&
(await this.numDone()) + (await this.numFailedNoRetry()) > 0
);
return (await this.queueSize()) == 0 && (await this.numDone()) > 0;
}
async setStatus(status_: string) {
@ -608,25 +600,7 @@ return inx;
}
async nextFromQueue() {
let json = await this._getNext();
let retry = 0;
if (!json) {
retry = await this.redis.requeuefailed(
this.fkey,
this.qkey,
this.ffkey,
this.maxRetries,
MAX_DEPTH,
);
if (retry > 0) {
json = await this._getNext();
} else if (retry === 0) {
logger.debug("Did not retry failed, already retried", {}, "state");
return null;
}
}
const json = await this._getNext();
if (!json) {
return null;
@ -641,10 +615,6 @@ return inx;
return null;
}
if (retry) {
logger.debug("Retrying failed URL", { url: data.url, retry }, "state");
}
await this.markStarted(data.url);
return new PageState(data);
@ -660,14 +630,11 @@ return inx;
const seen = await this._iterSet(this.skey);
const queued = await this._iterSortedKey(this.qkey, seen);
const pending = await this.getPendingList();
const failedWillRetry = await this._iterListKeys(this.fkey, seen);
const failedNoRetry = await this._iterListKeys(this.ffkey, seen);
const failed = await this._iterListKeys(this.fkey, seen);
const errors = await this.getErrorList();
const extraSeeds = await this._iterListKeys(this.esKey, seen);
const sitemapDone = await this.isSitemapDone();
const failed = failedWillRetry.concat(failedNoRetry);
const finished = [...seen.values()];
return {
@ -682,7 +649,11 @@ return inx;
}
_getScore(data: QueueEntry) {
return (data.depth || 0) + (data.extraHops || 0) * MAX_DEPTH;
return (
(data.depth || 0) +
(data.extraHops || 0) * MAX_DEPTH +
(data.retry || 0) * MAX_DEPTH * 2
);
}
async _iterSet(key: string, count = 100) {
@ -758,7 +729,6 @@ return inx;
await this.redis.del(this.pkey);
await this.redis.del(this.dkey);
await this.redis.del(this.fkey);
await this.redis.del(this.ffkey);
await this.redis.del(this.skey);
await this.redis.del(this.ekey);
@ -842,10 +812,11 @@ return inx;
for (const json of state.failed) {
const data = JSON.parse(json);
const retry = data.retry || 0;
if (retry <= this.maxRetries) {
// allow retrying failed URLs if number of retries has increased
if (retry < this.maxRetries) {
await this.redis.zadd(this.qkey, this._getScore(data), json);
} else {
await this.redis.rpush(this.ffkey, json);
await this.redis.rpush(this.fkey, json);
}
seen.push(data.url);
}
@ -874,14 +845,10 @@ return inx;
return res;
}
async numFailedWillRetry() {
async numFailed() {
return await this.redis.llen(this.fkey);
}
async numFailedNoRetry() {
return await this.redis.llen(this.ffkey);
}
async getPendingList() {
return await this.redis.hvals(this.pkey);
}

View file

@ -50,7 +50,6 @@ test("check that stats file format is correct", () => {
expect(dataJSON.total).toEqual(3);
expect(dataJSON.pending).toEqual(0);
expect(dataJSON.failed).toEqual(0);
expect(dataJSON.failedWillRetry).toEqual(0);
expect(dataJSON.limit.max).toEqual(3);
expect(dataJSON.limit.hit).toBe(true);
expect(dataJSON.pendingPages.length).toEqual(0);

View file

@ -1,5 +1,6 @@
import { execSync, spawn } from "child_process";
import { exec, execSync } from "child_process";
import fs from "fs";
import http from "http";
import Redis from "ioredis";
const DOCKER_HOST_NAME = process.env.DOCKER_HOST_NAME || "host.docker.internal";
@ -8,19 +9,37 @@ async function sleep(time) {
await new Promise((resolve) => setTimeout(resolve, time));
}
test("run crawl", async () => {
let status = 0;
execSync(`docker run -d -v $PWD/test-crawls:/crawls -e CRAWL_ID=test -p 36387:6379 --rm webrecorder/browsertrix-crawler crawl --url http://${DOCKER_HOST_NAME}:31501 --url https://example.com/ --limit 2 --pageExtraDelay 10 --debugAccessRedis --collection retry-fail --numRetries 5`);
let requests = 0;
let success = false;
let server = null;
/*
async function runServer() {
console.log("Waiting to start server");
await sleep(2000);
beforeAll(() => {
server = http.createServer((req, res) => {
// 3 requests: 2 from browser, 1 direct fetch per attempt
// succeed on 6th request == after 2 retries
if (requests >= 6) {
res.writeHead(200, {"Content-Type": "text/html"});
res.end("<html><body>Test Data</body></html>");
success = true;
} else {
res.writeHead(503, {"Content-Type": "text/html"});
res.end("<html><body>Test Data</body></html>");
}
requests++;
});
server.listen(31501, "0.0.0.0");
});
afterAll(() => {
server.close();
});
test("run crawl with retries for no response", async () => {
execSync(`docker run -d -v $PWD/test-crawls:/crawls -e CRAWL_ID=test -p 36387:6379 --rm webrecorder/browsertrix-crawler crawl --url http://invalid-host-x:31501 --url https://example.com/ --limit 2 --pageExtraDelay 10 --debugAccessRedis --collection retry-fail --retries 5`);
console.log("Starting server");
//spawn("../../node_modules/.bin/http-server", ["-p", "31501", "--username", "user", "--password", "pass"], {cwd: "./docs/site"});
}
*/
const redis = new Redis("redis://127.0.0.1:36387/0", { lazyConnect: true, retryStrategy: () => null });
await sleep(3000);
@ -32,10 +51,8 @@ test("run crawl", async () => {
maxRetriesPerRequest: 100,
});
//runServer();
while (true) {
const res = await redis.lrange("test:ff", 0, -1);
const res = await redis.lrange("test:f", 0, -1);
if (res.length) {
const data = JSON.parse(res);
if (data.retry) {
@ -69,3 +86,82 @@ test("check only one failed page entry is made", () => {
});
test("run crawl with retries for 503, enough retries to succeed", async () => {
requests = 0;
success = false;
const child = exec(`docker run -v $PWD/test-crawls:/crawls --rm webrecorder/browsertrix-crawler crawl --url http://${DOCKER_HOST_NAME}:31501 --url https://example.com/ --limit 2 --collection retry-fail-2 --retries 2 --failOnInvalidStatus --failOnFailedSeed --logging stats,debug`);
let status = 0;
const crawlFinished = new Promise(r => resolve = r);
// detect crawler exit
let crawler_exited = false;
child.on("exit", function (code) {
status = code;
resolve();
});
await crawlFinished;
expect(status).toBe(0);
// (1 + 2) * 3 == 9 requests
expect(requests).toBe(9);
expect(success).toBe(true);
});
test("run crawl with retries for 503, not enough retries, fail", async () => {
requests = 0;
success = false;
const child = exec(`docker run -v $PWD/test-crawls:/crawls --rm webrecorder/browsertrix-crawler crawl --url http://${DOCKER_HOST_NAME}:31501 --url https://example.com/ --limit 2 --collection retry-fail-3 --retries 1 --failOnInvalidStatus --failOnFailedSeed --logging stats,debug`);
let status = 0;
const crawlFinished = new Promise(r => resolve = r);
// detect crawler exit
let crawler_exited = false;
child.on("exit", function (code) {
status = code;
resolve();
});
await crawlFinished;
expect(status).toBe(1);
// (1 + 1) * 3 requests == 6 requests
expect(requests).toBe(6);
expect(success).toBe(false);
});
test("run crawl with retries for 503, no retries, fail", async () => {
requests = 0;
success = false;
const child = exec(`docker run -v $PWD/test-crawls:/crawls --rm webrecorder/browsertrix-crawler crawl --url http://${DOCKER_HOST_NAME}:31501 --url https://example.com/ --limit 2 --collection retry-fail-4 --retries 0 --failOnInvalidStatus --failOnFailedSeed --logging stats,debug`);
let status = 0;
const crawlFinished = new Promise(r => resolve = r);
// detect crawler exit
let crawler_exited = false;
child.on("exit", function (code) {
status = code;
resolve();
});
await crawlFinished;
expect(status).toBe(1);
// (1) * 3 requests == 3 requests
expect(requests).toBe(3);
expect(success).toBe(false);
});