Fix Save/Load State (#495)

- Fixes state serialization, which was missing the done list. Instead,
adds a 'finished' list computed from the seen list, minus failed and
queued URLs.
- Also adds serialization support for 'extraSeeds', seeds added
dynamically from a redirect (via #475). Extra seeds are added to Redis
and also included in the serialization.

Fixes #491
This commit is contained in:
Ilya Kreymer 2024-03-15 20:54:43 -04:00 committed by GitHub
parent fa37f62c86
commit 6d04c9575f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 220 additions and 90 deletions

View file

@ -329,6 +329,18 @@ export class Crawler {
os.hostname(),
);
// load full state from config
if (this.params.state) {
await this.crawlState.load(
this.params.state,
this.params.scopedSeeds,
true,
);
// otherwise, just load extra seeds
} else {
await this.loadExtraSeeds();
}
// clear any pending URLs from this instance
await this.crawlState.clearOwnPendingLocks();
@ -348,6 +360,15 @@ export class Crawler {
return this.crawlState;
}
async loadExtraSeeds() {
const extraSeeds = await this.crawlState.getExtraSeeds();
for (const { origSeedId, newUrl } of extraSeeds) {
const seed = this.params.scopedSeeds[origSeedId];
this.params.scopedSeeds.push(seed.newScopedSeed(newUrl));
}
}
initScreenCaster() {
let transport;
@ -1190,14 +1211,6 @@ self.__bx_behaviors.selectMainBehavior();
await this.crawlState.setStatus("running");
if (this.params.state) {
await this.crawlState.load(
this.params.state,
this.params.scopedSeeds,
true,
);
}
await this.initPages();
this.adBlockRules = new AdBlockRules(
@ -1577,9 +1590,11 @@ self.__bx_behaviors.selectMainBehavior();
const isChromeError = page.url().startsWith("chrome-error://");
if (depth === 0 && !isChromeError && respUrl !== url) {
const seed = this.params.scopedSeeds[data.seedId];
this.params.scopedSeeds.push(seed.newScopedSeed(respUrl));
data.seedId = this.params.scopedSeeds.length - 1;
data.seedId = await this.crawlState.addExtraSeed(
this.params.scopedSeeds,
data.seedId,
respUrl,
);
logger.info("Seed page redirected, adding redirected seed", {
origUrl: url,
newUrl: respUrl,

View file

@ -35,6 +35,12 @@ export type QueueEntry = {
extraHops: number;
};
// ============================================================================
export type ExtraRedirectSeed = {
newUrl: string;
origSeedId: number;
};
// ============================================================================
export type PageCallbacks = {
addLink?: (url: string) => Promise<void>;
@ -127,6 +133,17 @@ declare module "ioredis" {
}
}
// ============================================================================
type SaveState = {
done?: number | string[];
finished: string[];
queued: string[];
pending: string[];
failed: string[];
errors: string[];
extraSeeds: string[];
};
// ============================================================================
export class RedisCrawlState {
redis: Redis;
@ -143,6 +160,7 @@ export class RedisCrawlState {
fkey: string;
ekey: string;
pageskey: string;
esKey: string;
constructor(redis: Redis, key: string, maxPageTime: number, uid: string) {
this.redis = redis;
@ -163,6 +181,8 @@ export class RedisCrawlState {
// pages
this.pageskey = this.key + ":pages";
this.esKey = this.key + ":extraSeeds";
this._initLuaCommands(this.redis);
}
@ -492,22 +512,50 @@ return 0;
return !!(await this.redis.sismember(this.skey, url));
}
async serialize() {
async serialize(): Promise<SaveState> {
//const queued = await this._iterSortKey(this.qkey);
const done = await this.numDone();
const queued = await this._iterSortedKey(this.qkey);
// const done = await this.numDone();
const seen = await this._iterSet(this.skey);
const queued = await this._iterSortedKey(this.qkey, seen);
const pending = await this.getPendingList();
const failed = await this._iterListKeys(this.fkey);
const failed = await this._iterListKeys(this.fkey, seen);
const errors = await this.getErrorList();
const extraSeeds = await this._iterListKeys(this.esKey, seen);
return { done, queued, pending, failed, errors };
const finished = [...seen.values()];
return { extraSeeds, finished, queued, pending, failed, errors };
}
_getScore(data: QueueEntry) {
return (data.depth || 0) + (data.extraHops || 0) * MAX_DEPTH;
}
async _iterSortedKey(key: string, inc = 100) {
async _iterSet(key: string, count = 100) {
const stream = this.redis.sscanStream(key, { count });
const results: Set<string> = new Set<string>();
stream.on("data", async (someResults: string[]) => {
stream.pause();
for (const result of someResults) {
results.add(result);
}
stream.resume();
});
await new Promise<void>((resolve) => {
stream.on("end", () => {
resolve();
});
});
return results;
}
async _iterSortedKey(key: string, seenSet: Set<string>, inc = 100) {
const results: string[] = [];
const len = await this.redis.zcard(key);
@ -521,33 +569,35 @@ return 0;
i,
inc,
);
results.push(...someResults);
for (const result of someResults) {
const json = JSON.parse(result);
seenSet.delete(json.url);
results.push(result);
}
}
return results;
}
async _iterListKeys(key: string, inc = 100) {
async _iterListKeys(key: string, seenSet: Set<string>, inc = 100) {
const results: string[] = [];
const len = await this.redis.llen(key);
for (let i = 0; i < len; i += inc) {
const someResults = await this.redis.lrange(key, i, i + inc - 1);
results.push(...someResults);
for (const result of someResults) {
const json = JSON.parse(result);
seenSet.delete(json.url);
results.push(result);
}
}
return results;
}
async load(
// TODO: Fix this the next time the file is edited.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
state: Record<string, any>,
seeds: ScopedSeed[],
checkScope: boolean,
) {
const seen: string[] = [];
async load(state: SaveState, seeds: ScopedSeed[], checkScope: boolean) {
// need to delete existing keys, if exist to fully reset state
await this.redis.del(this.qkey);
await this.redis.del(this.pkey);
@ -556,6 +606,21 @@ return 0;
await this.redis.del(this.skey);
await this.redis.del(this.ekey);
let seen: string[] = [];
if (state.finished) {
seen = state.finished;
await this.redis.set(this.dkey, state.finished.length);
}
if (state.extraSeeds) {
for (const extraSeed of state.extraSeeds) {
const { newUrl, origSeedId }: ExtraRedirectSeed = JSON.parse(extraSeed);
await this.addExtraSeed(seeds, origSeedId, newUrl);
}
}
for (const json of state.queued) {
const data = JSON.parse(json);
if (checkScope) {
@ -580,19 +645,23 @@ return 0;
seen.push(data.url);
}
if (typeof state.done === "number") {
// done key is just an int counter
await this.redis.set(this.dkey, state.done);
} else if (state.done instanceof Array) {
// for backwards compatibility with old save states
for (const json of state.done) {
const data = JSON.parse(json);
if (data.failed) {
await this.redis.zadd(this.qkey, this._getScore(data), json);
} else {
await this.redis.incr(this.dkey);
// backwards compatibility: not using done, instead 'finished'
// contains list of finished URLs
if (state.done) {
if (typeof state.done === "number") {
// done key is just an int counter
await this.redis.set(this.dkey, state.done);
} else if (state.done instanceof Array) {
// for backwards compatibility with old save states
for (const json of state.done) {
const data = JSON.parse(json);
if (data.failed) {
await this.redis.zadd(this.qkey, this._getScore(data), json);
} else {
await this.redis.incr(this.dkey);
}
seen.push(data.url);
}
seen.push(data.url);
}
}
@ -698,4 +767,30 @@ return 0;
async writeToPagesQueue(value: string) {
return await this.redis.lpush(this.pageskey, value);
}
// add extra seeds from redirect
async addExtraSeed(seeds: ScopedSeed[], origSeedId: number, newUrl: string) {
if (!seeds[origSeedId]) {
logger.fatal(
"State load, original seed missing",
{ origSeedId },
"state",
);
}
seeds.push(seeds[origSeedId].newScopedSeed(newUrl));
const newSeedId = seeds.length - 1;
const redirectSeed: ExtraRedirectSeed = { origSeedId, newUrl };
await this.redis.sadd(this.skey, newUrl);
await this.redis.lpush(this.esKey, JSON.stringify(redirectSeed));
return newSeedId;
}
async getExtraSeeds() {
const seeds: ExtraRedirectSeed[] = [];
const res = await this.redis.lrange(this.esKey, 0, -1);
for (const key of res) {
seeds.push(JSON.parse(key));
}
return seeds;
}
}

View file

@ -1,37 +1,51 @@
import { exec } from "child_process";
import { execSync } from "child_process";
import fs from "fs";
import path from "path";
import yaml from "js-yaml";
import Redis from "ioredis";
function waitForProcess() {
let callback = null;
const p = new Promise((resolve) => {
callback = (/*error, stdout, stderr*/) => {
//console.log(stdout);
resolve(0);
};
});
return { p, callback };
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
var savedStateFile;
var state;
var numDone;
var redis;
var finishProcess;
async function waitContainer(containerId) {
try {
execSync(`docker kill -s SIGINT ${containerId}`);
} catch (e) {
return;
}
// containerId is initially the full id, but docker ps
// only prints the short id (first 12 characters)
containerId = containerId.slice(0, 12);
while (true) {
try {
const res = execSync("docker ps -q", { encoding: "utf-8" });
if (res.indexOf(containerId) < 0) {
return;
}
} catch (e) {
console.error(e);
}
await sleep(500);
}
}
let savedStateFile;
let state;
let numDone;
let numQueued;
let finished;
test("check crawl interrupted + saved state written", async () => {
let proc = null;
const wait = waitForProcess();
let containerId = null;
try {
proc = exec(
"docker run -v $PWD/test-crawls:/crawls -v $PWD/tests/fixtures:/tests/fixtures webrecorder/browsertrix-crawler crawl --collection int-state-test --url https://webrecorder.net/ --limit 20",
{ shell: "/bin/bash" },
wait.callback,
containerId = execSync(
"docker run -d -v $PWD/test-crawls:/crawls -v $PWD/tests/fixtures:/tests/fixtures webrecorder/browsertrix-crawler crawl --collection int-state-test --url https://www.webrecorder.net/ --limit 10",
{ encoding: "utf-8" },
//wait.callback,
);
} catch (error) {
console.log(error);
@ -60,12 +74,10 @@ test("check crawl interrupted + saved state written", async () => {
// ignore
}
await new Promise((resolve) => setTimeout(resolve, 500));
await sleep(500);
}
proc.kill("SIGINT");
await wait.p;
await waitContainer(containerId);
const savedStates = fs.readdirSync(
"test-crawls/collections/int-state-test/crawls",
@ -87,31 +99,36 @@ test("check parsing saved state + page done + queue present", () => {
expect(!!saved.state).toBe(true);
state = saved.state;
numDone = state.finished.length;
numQueued = state.queued.length;
numDone = state.done;
expect(numDone > 0).toEqual(true);
expect(numQueued > 0).toEqual(true);
expect(numDone + numQueued).toEqual(10);
expect(state.done > 0).toEqual(true);
expect(state.queued.length > 0).toEqual(true);
// ensure extra seeds also set
expect(state.extraSeeds).toEqual([
`{"origSeedId":0,"newUrl":"https://webrecorder.net/"}`,
]);
finished = state.finished;
});
test("check crawl restarted with saved state", async () => {
let proc = null;
const wait = waitForProcess();
let containerId = null;
try {
proc = exec(
`docker run -p 36379:6379 -e CRAWL_ID=test -v $PWD/test-crawls:/crawls -v $PWD/tests/fixtures:/tests/fixtures webrecorder/browsertrix-crawler crawl --collection int-state-test --url https://webrecorder.net/ --config /crawls/collections/int-state-test/crawls/${savedStateFile} --debugAccessRedis --limit 5`,
{ shell: "/bin/bash" },
wait.callback,
containerId = execSync(
`docker run -d -p 36379:6379 -e CRAWL_ID=test -v $PWD/test-crawls:/crawls -v $PWD/tests/fixtures:/tests/fixtures webrecorder/browsertrix-crawler crawl --collection int-state-test --url https://webrecorder.net/ --config /crawls/collections/int-state-test/crawls/${savedStateFile} --debugAccessRedis --limit 5`,
{ encoding: "utf-8" },
);
} catch (error) {
console.log(error);
}
await new Promise((resolve) => setTimeout(resolve, 2000));
await sleep(2000);
redis = new Redis("redis://127.0.0.1:36379/0", { lazyConnect: true });
const redis = new Redis("redis://127.0.0.1:36379/0", { lazyConnect: true });
try {
await redis.connect({
@ -121,20 +138,23 @@ test("check crawl restarted with saved state", async () => {
},
});
await new Promise((resolve) => setTimeout(resolve, 2000));
await sleep(2000);
expect(await redis.get("test:d")).toBe(numDone + "");
for (const url of finished) {
const res = await redis.sismember("test:s", url);
expect(res).toBe(1);
}
} catch (e) {
console.log(e);
} finally {
proc.kill("SIGINT");
await waitContainer(containerId);
try {
await redis.disconnect();
} catch (e) {
// ignore
}
}
finishProcess = wait.p;
});
test("interrupt crawl and exit", async () => {
const res = await Promise.allSettled([finishProcess, redis.quit()]);
expect(res[0].value).toBe(0);
});