add indexer entrypoint:

- populate dedup index from remote wacz/multi wacz/multiwacz json

refactor:
- move WACZLoader to wacz to be shared with indexer
- state: move hash-based dedup to RedisDedupIndex

cli args:
- add --minPageDedupDepth to indicate when pages are skipped for dedup

- skip same URLs by same hash within same crawl
This commit is contained in:
Ilya Kreymer 2025-09-17 19:23:32 -07:00
parent eb6b87fbaf
commit 2ecf290d38
7 changed files with 292 additions and 72 deletions

View file

@ -44,11 +44,12 @@ ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/ui.js /app/html/rw
ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/sw.js /app/html/rwp/
ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/adblock/adblock.gz /app/html/rwp/adblock.gz
RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js && chmod a+r /app/html/rwp/*
RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js /app/dist/indexer.js && chmod a+r /app/html/rwp/*
RUN ln -s /app/dist/main.js /usr/bin/crawl; \
ln -s /app/dist/main.js /usr/bin/qa; \
ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile
ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile; \
ln -s /app/dist/indexer.js /usr/bin/indexer;
RUN mkdir -p /app/behaviors

180
src/indexer.ts Normal file
View file

@ -0,0 +1,180 @@
#!/usr/bin/env node
import yargs from "yargs";
import { logger } from "./util/logger.js";
import { getInfoString } from "./util/file_reader.js";
import { openAsBlob } from "node:fs";
import { WACZLoader } from "./util/wacz.js";
import { ExitCodes } from "./util/constants.js";
import { initRedisWaitForSuccess } from "./util/redis.js";
import { AsyncIterReader } from "warcio";
import { RedisDedupIndex } from "./util/state.js";
export class CrawlIndexer {
constructor() {}
initArgs() {
return yargs(process.argv)
.usage("indexer [options]")
.options({
dedupStoreUrl: {
describe: "URL for remote redis instance to index into",
type: "string",
required: true,
},
sourceUrl: {
describe: "Source WACZ or Multi WACZ or Multi WACZ JSON to index",
type: "string",
required: true,
},
})
.parseSync();
}
async run() {
logger.setDebugLogging(true);
process.on("SIGINT", () => this.handleTerminate("SIGINT"));
process.on("SIGTERM", () => this.handleTerminate("SIGTERM"));
logger.info(await getInfoString());
const params = this.initArgs();
const redis = await initRedisWaitForSuccess(params.dedupStoreUrl);
const dedupIndex = new RedisDedupIndex(redis);
const allFiles = [];
for await (const waczfile of this.iterWACZ(params.sourceUrl)) {
allFiles.push(waczfile);
}
let count = 0;
const total = allFiles.length;
for (const waczfile of allFiles) {
count += 1;
const loader = new WACZLoader(waczfile);
logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile });
for await (const file of loader.iterFiles("indexes/")) {
const filename = file.filename;
if (filename.endsWith(".cdx.gz")) {
logger.debug("Processing CDX GZ Index", { filename });
await this.ingestCDXJ(dedupIndex, loader, filename, "gzip");
} else if (filename.endsWith(".cdx") || filename.endsWith(".cdxj")) {
logger.debug("Processing CDX Index", { filename });
await this.ingestCDXJ(dedupIndex, loader, filename);
}
}
}
logger.info("Done!");
process.exit(ExitCodes.Success);
}
async ingestCDXJ(
dedupIndex: RedisDedupIndex,
loader: WACZLoader,
filename: string,
compression?: string,
) {
let reader = await loader.loadFile(filename);
if (!reader) {
logger.error("File not found, skipping!");
return;
}
if (compression === "gzip") {
reader = new AsyncIterReader(reader, "gzip", false);
}
let count = 0;
for await (const line of reader.iterLines()) {
const inx = line.indexOf(" {");
if (inx < 0) {
logger.error("Skipping invalid CDXJ, no JSON", { line });
continue;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let cdx: Record<string, any>;
try {
cdx = JSON.parse(line.slice(inx));
} catch (e) {
logger.error("Skipping invalid CDXJ, JSON invalid", { line });
continue;
}
const date = line.split(" ", 2)[1];
const url = cdx.url;
const hash = cdx.digest;
if (url.startsWith("urn:")) {
continue;
}
// only adding originals to dedup against, don't want to dedup against existing revisits
if (cdx.mime === "warc/revisit") {
continue;
}
if (url && date && hash) {
await dedupIndex.addHashDupe(hash, url, date);
} else {
logger.warn("Skipping invalid CDXJ, data missing", {
url,
date,
digest: hash,
});
continue;
}
count += 1;
}
logger.debug("Processed", { count });
}
async *iterWACZ(url: string): AsyncIterable<string> {
let path: string = url;
try {
path = new URL(url).pathname;
} catch (e) {
// ignore
}
if (path.endsWith(".wacz")) {
yield url;
} else if (path.endsWith(".json")) {
if (!url.startsWith("http://") && !url.startsWith("https://")) {
const blob = await openAsBlob(url);
url = URL.createObjectURL(blob);
}
const resp = await fetch(url);
const json = await resp.json();
for (const entry of json.resources) {
if (entry.path) {
yield* this.iterWACZ(entry.path);
}
}
} else {
logger.warn("Unknown source", { url }, "replay");
}
}
handleTerminate(signame: string) {
logger.info(`Got signal ${signame}, exiting`);
process.exit(ExitCodes.SignalInterrupted);
}
}
await new CrawlIndexer().run();

View file

@ -449,7 +449,13 @@ class ArgParser {
describe:
"If set, url for remote redis server to store state. Otherwise, using local redis instance",
type: "string",
default: "redis://localhost:6379/0",
},
minPageDedupDepth: {
describe:
"If set >= 0, minimum depth at which duplicate pages can be skipped. -1 means never skip duplicate pages",
type: "number",
default: -1,
},
saveState: {

View file

@ -22,6 +22,8 @@ export const DETECT_SITEMAP = "<detect>";
export const EXTRACT_TEXT_TYPES = ["to-pages", "to-warc", "final-to-warc"];
export const HASH_DUPE_KEY = "dupe";
export enum BxFunctionBindings {
BehaviorLogFunc = "__bx_log",
AddLinkFunc = "__bx_addLink",

View file

@ -15,12 +15,19 @@ import {
removeRangeAsQuery,
rewriteDASH,
rewriteHLS,
tsToDate,
} from "@webrecorder/wabac";
import { WARCRecord, multiValueHeader } from "warcio";
import { TempFileBuffer, WARCSerializer } from "warcio/node";
import { WARCWriter } from "./warcwriter.js";
import { LoadState, PageState, RedisCrawlState, WorkerId } from "./state.js";
import {
LoadState,
normalizeDedupStatus,
PageState,
RedisCrawlState,
WorkerId,
} from "./state.js";
import { CDPSession, Protocol } from "puppeteer-core";
import { Crawler } from "../crawler.js";
import { getProxyDispatcher } from "./proxy.js";
@ -38,7 +45,7 @@ const TAKE_STREAM_BUFF_SIZE = 1024 * 64;
const ASYNC_FETCH_DUPE_KEY = "s:fetchdupe";
const WRITE_DUPE_KEY = "dupe";
const WRITE_DUPE_KEY = "s:writedupe";
const MIME_EVENT_STREAM = "text/event-stream";
@ -142,6 +149,7 @@ export class Recorder extends EventEmitter {
pageSeed?: ScopedSeed;
pageSeedDepth = 0;
minPageDedupDepth = -1;
frameIdToExecId: Map<string, number> | null;
@ -165,6 +173,8 @@ export class Recorder extends EventEmitter {
this.shouldSaveStorage = !!crawler.params.saveStorage;
this.minPageDedupDepth = crawler.params.minPageDedupDepth;
this.writer = writer;
this.fetcherQ = new PQueue({ concurrency: 1 });
@ -821,11 +831,16 @@ export class Recorder extends EventEmitter {
const rewritten = await this.rewriteResponse(reqresp, mimeType);
if (url === this.pageUrl && reqresp.payload && this.pageSeedDepth >= 1) {
if (
url === this.pageUrl &&
reqresp.payload &&
this.minPageDedupDepth >= 0 &&
this.pageSeedDepth >= this.minPageDedupDepth
) {
const hash =
"sha256:" + createHash("sha256").update(reqresp.payload).digest("hex");
const res = await this.crawlState.getHashDupe(WRITE_DUPE_KEY, hash, url);
if (res && res.dupe) {
const { origUrl } = await this.crawlState.getHashDupe(hash);
if (origUrl) {
const errorReason = "BlockedByResponse";
await cdp.send("Fetch.failRequest", {
requestId,
@ -1497,7 +1512,11 @@ export class Recorder extends EventEmitter {
if (
method === "GET" &&
url &&
!(await this.crawlState.addIfNoDupe(ASYNC_FETCH_DUPE_KEY, url, status))
!(await this.crawlState.addIfNoDupe(
ASYNC_FETCH_DUPE_KEY,
url,
normalizeDedupStatus(status),
))
) {
reqresp.asyncLoading = false;
return true;
@ -1608,7 +1627,7 @@ export class Recorder extends EventEmitter {
// !isRedirectStatus(status) &&
// !(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, status))
// ) {
// logNetwork("Skipping dupe", { url, status, ...this.logDetails });
// logNetwork("Skipping exact URL dupe in this crawl", { url, status, ...this.logDetails });
// return false;
// }
@ -1625,7 +1644,11 @@ export class Recorder extends EventEmitter {
!(await this.checkStreamingRecordPayload(reqresp, serializer, false))
) {
serializer.externalBuffer?.purge();
await this.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status);
await this.crawlState.removeDupe(
ASYNC_FETCH_DUPE_KEY,
url,
normalizeDedupStatus(status),
);
//await this.crawlState.removeDupe(WRITE_DUPE_KEY, url, status);
return false;
}
@ -1659,29 +1682,29 @@ export class Recorder extends EventEmitter {
}
const hash = responseRecord.warcPayloadDigest || "";
if (!(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, hash))) {
serializer.externalBuffer?.purge();
return false;
}
const date = responseRecord.warcDate || "";
const isEmpty = reqresp.readSize === 0;
if (!isEmpty && url && method === "GET" && !isRedirectStatus(status)) {
const { dupe, origUrl, origDate } = await this.crawlState.getHashDupe(
WRITE_DUPE_KEY,
hash,
url,
);
const { origUrl, origDate } = await this.crawlState.getHashDupe(hash);
if (dupe) {
// duplicate url at origTs
// skip, no need for revisit
logNetwork("Skipping dupe", { url, status, ...this.logDetails });
return false;
} else if (origUrl && origDate) {
if (hash && origUrl && origDate) {
const date = tsToDate(origDate).toISOString();
// always write revisit here
// duplicate URLs in same crawl filtered out separately
serializer.externalBuffer?.purge();
({ responseRecord, serializer } = await createRevisitForResponse(
responseRecord,
serializer,
origUrl,
origDate,
date,
));
} else {
// no dupe, continue
@ -1717,7 +1740,7 @@ export class Recorder extends EventEmitter {
this.addPageRecord(reqresp);
if (!isEmpty) {
await this.crawlState.addHashDupe(WRITE_DUPE_KEY, hash, url, date);
await this.crawlState.addHashDupe(hash, url, date);
}
return true;
@ -2095,6 +2118,7 @@ async function createRevisitForResponse(
const warcHeaders: Record<string, string> = {
"WARC-Page-ID": responseRecord.warcHeaders.headers.get("WARC-Page-ID")!,
"WARC-Payload-Digest": origPayloadDigest!,
};
const revisitRecord = WARCRecord.create({
@ -2115,13 +2139,6 @@ async function createRevisitForResponse(
await serializer.digestRecord();
if (origPayloadDigest) {
revisitRecord.warcHeaders.headers.set(
"WARC-Payload-Digest",
origPayloadDigest,
);
}
return { serializer, responseRecord: revisitRecord };
}

View file

@ -7,6 +7,7 @@ import {
MAX_DEPTH,
DEFAULT_MAX_RETRIES,
ROBOTS_CACHE_LIMIT,
HASH_DUPE_KEY
} from "./constants.js";
import { ScopedSeed } from "./seeds.js";
import { Frame } from "puppeteer-core";
@ -30,11 +31,11 @@ export enum QueueState {
// ============================================================================
// treat 0 or 206 as 200 for purposes of dedup
function normalizeDedupStatus(status: number): number {
export function normalizeDedupStatus(status: number): string {
if (status === 0 || status === 206) {
return 200;
return "200";
}
return status;
return status + "";
}
// ============================================================================
@ -189,10 +190,41 @@ export type SaveState = {
};
// ============================================================================
export class RedisCrawlState {
export class RedisDedupIndex {
dedupRedis: Redis;
constructor(dedupRedis: Redis) {
this.dedupRedis = dedupRedis;
}
async getHashDupe(
hash: string,
key = HASH_DUPE_KEY,
//url: string,
): Promise<{ origDate?: string; origUrl?: string }> {
const value = await this.dedupRedis.hget(key, hash);
if (!value) {
return {};
}
const val = value.split("|");
return { origUrl: val[1], origDate: val[0] };
}
async addHashDupe(
hash: string,
url: string,
date: string,
key = HASH_DUPE_KEY,
) {
const val = date.replace(/[^\d]/g, "") + "|" + url;
await this.dedupRedis.hsetnx(key, hash, val);
}
}
// ============================================================================
export class RedisCrawlState extends RedisDedupIndex {
redis: Redis;
maxRetries: number;
dedupRedis: Redis;
uid: string;
key: string;
@ -224,8 +256,8 @@ export class RedisCrawlState {
maxRetries?: number,
dedupRedis?: Redis,
) {
super(dedupRedis || redis);
this.redis = redis;
this.dedupRedis = dedupRedis || redis;
this.uid = uid;
this.key = key;
@ -1012,43 +1044,12 @@ return inx;
return await this.redis.zcard(this.qkey);
}
async addIfNoDupe(key: string, url: string, status: number) {
return (
(await this.redis.sadd(key, normalizeDedupStatus(status) + "|" + url)) ===
1
);
async addIfNoDupe(key: string, url: string, other_id: string) {
return (await this.redis.sadd(key, other_id + "|" + url)) === 1;
}
async removeDupe(key: string, url: string, status: number) {
return await this.redis.srem(key, normalizeDedupStatus(status) + "|" + url);
}
async getHashDupe(
key: string,
hash: string,
url: string,
): Promise<{ dupe?: boolean; origDate?: string; origUrl?: string }> {
const value = await this.dedupRedis.hget(key, hash);
if (!value) {
return {};
}
const val = value.split("|");
// if matches the first entry, return
if (val[1] === url) {
return { dupe: true };
}
// otherwise, check if a revisit entry
if (await this.dedupRedis.sismember(`${key}:${hash}`, url)) {
return { dupe: true };
}
return { origUrl: val[1], origDate: val[0] };
}
async addHashDupe(key: string, hash: string, url: string, date: string) {
const val = date + "|" + url;
if (!(await this.dedupRedis.hsetnx(key, hash, val))) {
await this.dedupRedis.sadd(`${key}:${hash}`, url);
}
async removeDupe(key: string, url: string, other_id: string) {
return await this.redis.srem(key, other_id + "|" + url);
}
async isInUserSet(value: string) {

View file

@ -464,4 +464,17 @@ export class WACZLoader {
return reader;
}
async *iterFiles(prefix: string) {
if (!this.zipreader) {
await this.init();
}
const entries = await this.zipreader!.load();
for (const [key, value] of Object.entries(entries)) {
if (key.startsWith(prefix)) {
yield value;
}
}
}
}