- track source index for each hash, so entry becomes '<source index> <date> <url>'

- entry for source index can contain the crawl id (or possibly wacz and crawl id)
- also store dependent sources in relation.requires in datapackage.json
- tests: update tests to check for relation.requires
This commit is contained in:
Ilya Kreymer 2025-10-17 18:08:38 -07:00
parent 3397eb1899
commit b9db2ef4f5
7 changed files with 117 additions and 16 deletions

View file

@ -1920,6 +1920,8 @@ self.__bx_behaviors.selectMainBehavior();
await this.closeLog();
const requires = await this.crawlState.getDupeDependentSources();
const waczOpts: WACZInitOpts = {
input: warcFileList.map((x) => path.join(this.archivesDir, x)),
output: waczPath,
@ -1928,6 +1930,7 @@ self.__bx_behaviors.selectMainBehavior();
warcCdxDir: this.warcCdxDir,
indexesDir: this.indexesDir,
softwareString: this.infoString,
requires,
};
if (process.env.WACZ_SIGN_URL) {

View file

@ -28,6 +28,12 @@ export class CrawlIndexer {
type: "string",
required: true,
},
sourceId: {
describe: "If single WACZ, use this id as source id",
type: "string",
required: false,
},
})
.parseSync();
}
@ -44,7 +50,7 @@ export class CrawlIndexer {
const params = this.initArgs();
const redis = await initRedisWaitForSuccess(params.redisDedupUrl);
const dedupIndex = new RedisDedupIndex(redis);
const dedupIndex = new RedisDedupIndex(redis, "");
for await (const [name, waczfile] of this.iterWACZ(params.sourceUrl)) {
await dedupIndex.addHashSource(name, waczfile);
@ -58,6 +64,11 @@ export class CrawlIndexer {
count += 1;
const loader = new WACZLoader(url);
logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile: url });
const sourceId = params.sourceId && total === 1 ? params.sourceId : url;
dedupIndex.dedupKeyIndex = await dedupIndex.addToSourcesList(sourceId);
for await (const file of loader.iterFiles("indexes/")) {
const filename = file.filename;
if (filename.endsWith(".cdx.gz")) {

View file

@ -23,6 +23,7 @@ export const DETECT_SITEMAP = "<detect>";
export const EXTRACT_TEXT_TYPES = ["to-pages", "to-warc", "final-to-warc"];
export const HASH_DUPE_KEY = "dupe";
export const HASH_DUPE_SOURCE_LIST_KEY = "sources";
export enum BxFunctionBindings {
BehaviorLogFunc = "__bx_log",

View file

@ -831,13 +831,16 @@ export class Recorder extends EventEmitter {
) {
const hash =
"sha256:" + createHash("sha256").update(reqresp.payload).digest("hex");
const { origUrl } = await this.crawlState.getHashDupe(hash);
const { origUrl, origId } = await this.crawlState.getHashDupe(hash);
if (origUrl) {
const errorReason = "BlockedByResponse";
await cdp.send("Fetch.failRequest", {
requestId,
errorReason,
});
if (origId) {
await this.crawlState.addDupeCrawlRef(origId);
}
return true;
}
}
@ -1651,9 +1654,10 @@ export class Recorder extends EventEmitter {
const isEmpty = reqresp.readSize === 0;
if (!isEmpty && url) {
const { origUrl, origDate } = await this.crawlState.getHashDupe(hash);
const { origUrl, origDate, origId } =
await this.crawlState.getHashDupe(hash);
if (hash && origUrl && origDate) {
if (hash && origUrl && origDate && origId) {
const date = tsToDate(origDate).toISOString();
// always write revisit here
// duplicate URLs in same crawl filtered out separately
@ -1664,6 +1668,7 @@ export class Recorder extends EventEmitter {
origUrl,
date,
));
await this.crawlState.addDupeCrawlRef(origId);
} else {
// no dupe, continue
}

View file

@ -3,7 +3,12 @@ import { v4 as uuidv4 } from "uuid";
import { logger } from "./logger.js";
import { MAX_DEPTH, DEFAULT_MAX_RETRIES, HASH_DUPE_KEY } from "./constants.js";
import {
MAX_DEPTH,
DEFAULT_MAX_RETRIES,
HASH_DUPE_KEY,
HASH_DUPE_SOURCE_LIST_KEY,
} from "./constants.js";
import { ScopedSeed } from "./seeds.js";
import { Frame } from "puppeteer-core";
import { interpolateFilename } from "./storage.js";
@ -187,6 +192,8 @@ export type SaveState = {
// ============================================================================
export class RedisDedupIndex {
dedupRedis: Redis;
key: string;
dedupKeyIndex = -1;
sourceDone = "src:d";
sourceQ = "src:q";
@ -194,22 +201,42 @@ export class RedisDedupIndex {
sourceP = "src:p";
pendingPrefix = "pending:q:";
constructor(dedupRedis: Redis) {
constructor(dedupRedis: Redis, key: string) {
this.dedupRedis = dedupRedis;
this.key = key;
}
private async getKeyIndex() {
if (!this.key) {
return;
}
const res = await this.dedupRedis.lpos(HASH_DUPE_SOURCE_LIST_KEY, this.key);
if (res) {
this.dedupKeyIndex = res;
} else {
this.dedupKeyIndex = await this.addToSourcesList(this.key);
}
return this.dedupKeyIndex;
}
async addToSourcesList(crawlId: string) {
return (
(await this.dedupRedis.rpush(HASH_DUPE_SOURCE_LIST_KEY, crawlId)) - 1
);
}
async getHashDupe(
hash: string,
key = HASH_DUPE_KEY,
//url: string,
): Promise<{ origDate?: string; origUrl?: string }> {
): Promise<{ origDate?: string; origUrl?: string; origId?: string }> {
hash = hash.split(":").at(-1)!;
const value = await this.dedupRedis.hget(key, hash);
if (!value) {
return {};
}
const val = value.split("|");
return { origUrl: val[1], origDate: val[0] };
const val = value.split(" ");
return { origUrl: val[2], origDate: val[1], origId: val[0] };
}
async addHashDupe(
@ -218,8 +245,12 @@ export class RedisDedupIndex {
date: string,
key = HASH_DUPE_KEY,
) {
const val = date.replace(/[^\d]/g, "") + "|" + url;
date = date.replace(/[^\d]/g, "");
hash = hash.split(":").at(-1)!;
if (this.dedupKeyIndex < 0) {
await this.getKeyIndex();
}
const val = `${this.dedupKeyIndex} ${date} ${url}`;
await this.dedupRedis.hsetnx(key, hash, val);
}
@ -270,7 +301,7 @@ export class RedisDedupIndex {
await this.dedupRedis.lrem(this.pendingQ, 1, res);
const { id, url } = JSON.parse(res);
const total = await this.dedupRedis.llen(this.sourceQ);
const total = (await this.dedupRedis.llen(this.sourceQ)) + 1;
await this.dedupRedis.setex(this.pendingPrefix + id, "1", 300);
return { id, url, total };
}
@ -286,7 +317,6 @@ export class RedisCrawlState extends RedisDedupIndex {
maxRetries: number;
uid: string;
key: string;
maxPageTime: number;
qkey: string;
@ -312,11 +342,10 @@ export class RedisCrawlState extends RedisDedupIndex {
maxRetries?: number,
dedupRedis?: Redis,
) {
super(dedupRedis || redis);
super(dedupRedis || redis, key);
this.redis = redis;
this.uid = uid;
this.key = key;
this.maxPageTime = maxPageTime;
this.maxRetries = maxRetries ?? DEFAULT_MAX_RETRIES;
@ -1199,4 +1228,23 @@ return inx;
async markSitemapDone() {
await this.redis.set(this.sitemapDoneKey, "1");
}
async addDupeCrawlRef(id: string) {
await this.redis.sadd(`${this.key}:dindex`, id);
}
async getDupeDependentSources(): Promise<string[]> {
const dependIndexes = await this.redis.smembers(`${this.key}:dindex`);
const crawlIds = [];
for (const inx of dependIndexes) {
const crawlId = await this.dedupRedis.lindex(
HASH_DUPE_SOURCE_LIST_KEY,
Number(inx),
);
if (crawlId && crawlId !== this.key) {
crawlIds.push(crawlId);
}
}
return crawlIds;
}
}

View file

@ -45,6 +45,7 @@ export type WACZInitOpts = {
signingToken?: string;
title?: string;
description?: string;
requires?: string[];
};
export type WACZResourceEntry = {
@ -61,6 +62,7 @@ export type WACZDataPackage = {
software: string;
title?: string;
description?: string;
relation?: { requires: string[] };
};
type WACZDigest = {
@ -131,6 +133,11 @@ export class WACZ {
this.datapackage.description = config.description;
}
if (config.requires && config.requires.length) {
this.datapackage.relation = { requires: config.requires };
}
console.log("REQUIRES", config.requires);
this.signingUrl = config.signingUrl || null;
this.signingToken = config.signingToken || null;
}

View file

@ -31,7 +31,7 @@ afterAll(async () => {
function runCrawl(name, db="0") {
fs.rmSync(`./test-crawls/collections/${name}`, { recursive: true, force: true });
const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupUrl redis://dedup-redis:6379/${db} --generateWACZ`);
const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupUrl redis://dedup-redis:6379/${db} --generateWACZ`);
return new Promise((resolve) => {
crawler.on("exit", (code) => {
@ -54,6 +54,20 @@ function loadFirstWARC(name) {
return parser;
}
function loadDataPackageRelated(name) {
execSync(
`unzip test-crawls/collections/${name}/${name}.wacz -d test-crawls/collections/${name}/wacz`,
);
const data = fs.readFileSync(
`test-crawls/collections/${name}/wacz/datapackage.json`,
"utf8",
);
const dataPackageJSON = JSON.parse(data);
return dataPackageJSON.relation;
}
test("check revisit records written on duplicate crawl", async () => {
expect(await runCrawl("dedup-test-orig")).toBe(0);
@ -99,7 +113,7 @@ test("check revisit records written on duplicate crawl", async () => {
test("import index and crawl dupe", async () => {
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedup-test-orig/dedup-test-orig.wacz --redisDedupUrl redis://dedup-redis:6379/1`);
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedup-test-orig/dedup-test-orig.wacz --sourceId dedup-test-orig --redisDedupUrl redis://dedup-redis:6379/1`);
const redis = new Redis("redis://127.0.0.1:37379/1", { lazyConnect: true, retryStrategy: () => null });
@ -130,4 +144,16 @@ test("imported crawl dupe matches previous dupe count", async () => {
expect(revisit).toBe(numResponses);
});
test("test requires in wacz 1", () => {
const expected = {"requires": ["dedup-test-orig"]};
const res1 = loadDataPackageRelated("dedup-test-dupe");
const res2 = loadDataPackageRelated("dedup-test-dupe-2");
expect(res1).toEqual(expected);
expect(res1).toEqual(res2);
});