args: add separate --dedupIndexUrl to support separate redis for dedup

indexing prep:
- move WACZLoader to wacz for reuse
This commit is contained in:
Ilya Kreymer 2025-09-16 17:48:13 -07:00
parent 00eca5329d
commit eb6b87fbaf
6 changed files with 78 additions and 56 deletions

View file

@ -31,7 +31,7 @@ import {
} from "./util/storage.js";
import { ScreenCaster, WSTransport } from "./util/screencaster.js";
import { Screenshots } from "./util/screenshots.js";
import { initRedis } from "./util/redis.js";
import { initRedisWaitForSuccess } from "./util/redis.js";
import { logger, formatErr, LogDetails, LogContext } from "./util/logger.js";
import { WorkerState, closeWorkers, runWorkers } from "./util/worker.js";
import { sleep, timedRun, secondsElapsed } from "./util/timing.js";
@ -341,6 +341,7 @@ export class Crawler {
async initCrawlState() {
const redisUrl = this.params.redisStoreUrl || "redis://localhost:6379/0";
const dedupRedisUrl = this.params.dedupStoreUrl || redisUrl;
if (!redisUrl.startsWith("redis://")) {
logger.fatal(
@ -348,18 +349,7 @@ export class Crawler {
);
}
let redis;
while (true) {
try {
redis = await initRedis(redisUrl);
break;
} catch (e) {
//logger.fatal("Unable to connect to state store Redis: " + redisUrl);
logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state");
await sleep(1);
}
}
const redis = await initRedisWaitForSuccess(redisUrl);
logger.debug(
`Storing state via Redis ${redisUrl} @ key prefix "${this.crawlId}"`,
@ -367,6 +357,12 @@ export class Crawler {
"state",
);
let dedupRedis = redis;
if (redisUrl !== dedupRedisUrl) {
dedupRedis = await initRedisWaitForSuccess(dedupRedisUrl);
}
logger.debug(`Max Page Time: ${this.maxPageTime} seconds`, {}, "state");
this.crawlState = new RedisCrawlState(
@ -375,6 +371,7 @@ export class Crawler {
this.maxPageTime,
os.hostname(),
this.params.maxPageRetries,
dedupRedis,
);
if (this.params.logErrorsToRedis) {

View file

@ -10,9 +10,6 @@ import { PageInfoRecord, PageInfoValue, Recorder } from "./util/recorder.js";
import fsp from "fs/promises";
import path from "path";
import { ZipRangeReader, createLoader } from "@webrecorder/wabac";
import { AsyncIterReader } from "warcio";
import { parseArgs } from "./util/argParser.js";
import { PNG } from "pngjs";
@ -23,6 +20,7 @@ import { MAX_URL_LENGTH } from "./util/reqresp.js";
import { openAsBlob } from "fs";
import { WARCWriter } from "./util/warcwriter.js";
import { parseRx } from "./util/seeds.js";
import { WACZLoader } from "./util/wacz.js";
// RWP Replay Prefix
const REPLAY_PREFIX = "http://localhost:9990/replay/w/replay/";
@ -784,38 +782,3 @@ export class ReplayCrawler extends Crawler {
return null;
}
}
class WACZLoader {
url: string;
zipreader: ZipRangeReader | null;
constructor(url: string) {
this.url = url;
this.zipreader = null;
}
async init() {
if (!this.url.startsWith("http://") && !this.url.startsWith("https://")) {
const blob = await openAsBlob(this.url);
this.url = URL.createObjectURL(blob);
}
const loader = await createLoader({ url: this.url });
this.zipreader = new ZipRangeReader(loader);
}
async loadFile(fileInZip: string) {
const { reader } = await this.zipreader!.loadFile(fileInZip);
if (!reader) {
return null;
}
if (!reader.iterLines) {
return new AsyncIterReader(reader);
}
return reader;
}
}

View file

@ -445,6 +445,13 @@ class ArgParser {
default: "redis://localhost:6379/0",
},
dedupStoreUrl: {
describe:
"If set, url for remote redis server to store state. Otherwise, using local redis instance",
type: "string",
default: "redis://localhost:6379/0",
},
saveState: {
describe:
"If the crawl state should be serialized to the crawls/ directory. Defaults to 'partial', only saved when crawl is interrupted",

View file

@ -1,5 +1,6 @@
import { Redis } from "ioredis";
import { logger } from "./logger.js";
import { sleep } from "./timing.js";
const error = console.error;
@ -34,6 +35,19 @@ export async function initRedis(url: string) {
return redis;
}
export async function initRedisWaitForSuccess(redisUrl: string, retrySecs = 1) {
while (true) {
try {
return await initRedis(redisUrl);
break;
} catch (e) {
//logger.fatal("Unable to connect to state store Redis: " + redisUrl);
logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state");
await sleep(retrySecs);
}
}
}
export function setExitOnRedisError() {
exitOnError = true;
}

View file

@ -192,6 +192,7 @@ export type SaveState = {
export class RedisCrawlState {
redis: Redis;
maxRetries: number;
dedupRedis: Redis;
uid: string;
key: string;
@ -221,8 +222,10 @@ export class RedisCrawlState {
maxPageTime: number,
uid: string,
maxRetries?: number,
dedupRedis?: Redis,
) {
this.redis = redis;
this.dedupRedis = dedupRedis || redis;
this.uid = uid;
this.key = key;
@ -1025,7 +1028,7 @@ return inx;
hash: string,
url: string,
): Promise<{ dupe?: boolean; origDate?: string; origUrl?: string }> {
const value = await this.redis.hget(key, hash);
const value = await this.dedupRedis.hget(key, hash);
if (!value) {
return {};
}
@ -1035,7 +1038,7 @@ return inx;
return { dupe: true };
}
// otherwise, check if a revisit entry
if (await this.redis.sismember(`${key}:${hash}`, url)) {
if (await this.dedupRedis.sismember(`${key}:${hash}`, url)) {
return { dupe: true };
}
return { origUrl: val[1], origDate: val[0] };
@ -1043,8 +1046,8 @@ return inx;
async addHashDupe(key: string, hash: string, url: string, date: string) {
const val = date + "|" + url;
if (!(await this.redis.hsetnx(key, hash, val))) {
await this.redis.sadd(`${key}:${hash}`, url);
if (!(await this.dedupRedis.hsetnx(key, hash, val))) {
await this.dedupRedis.sadd(`${key}:${hash}`, url);
}
}

View file

@ -1,5 +1,5 @@
import path, { basename } from "node:path";
import fs from "node:fs";
import fs, { openAsBlob } from "node:fs";
import fsp from "node:fs/promises";
import { Writable, Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
@ -16,6 +16,8 @@ import { makeZip, InputWithoutMeta } from "client-zip";
import { logger, formatErr } from "./logger.js";
import { streamFinish } from "./warcwriter.js";
import { getDirSize } from "./storage.js";
import { createLoader, ZipRangeReader } from "@webrecorder/wabac";
import { AsyncIterReader } from "warcio";
const DATAPACKAGE_JSON = "datapackage.json";
const DATAPACKAGE_DIGEST_JSON = "datapackage-digest.json";
@ -427,3 +429,39 @@ export async function mergeCDXJ(
await removeIndexFile(INDEX_CDXJ);
}
}
// ============================================================================
export class WACZLoader {
url: string;
zipreader: ZipRangeReader | null;
constructor(url: string) {
this.url = url;
this.zipreader = null;
}
async init() {
if (!this.url.startsWith("http://") && !this.url.startsWith("https://")) {
const blob = await openAsBlob(this.url);
this.url = URL.createObjectURL(blob);
}
const loader = await createLoader({ url: this.url });
this.zipreader = new ZipRangeReader(loader);
}
async loadFile(fileInZip: string) {
const { reader } = await this.zipreader!.loadFile(fileInZip);
if (!reader) {
return null;
}
if (!reader.iterLines) {
return new AsyncIterReader(reader);
}
return reader;
}
}