cleanup pass:

- support dedupe without requiring wacz, no crawl dependency tracking stored
- add dedupe test w/o wacz
- cleanup dedupe related naming
This commit is contained in:
Ilya Kreymer 2025-11-28 01:16:58 -08:00
parent 352ac726f8
commit b9e48baa0b
4 changed files with 127 additions and 61 deletions

View file

@ -1714,7 +1714,10 @@ self.__bx_behaviors.selectMainBehavior();
} }
if (this.params.generateWACZ && (this.storage || this.deduping)) { if (this.params.generateWACZ && (this.storage || this.deduping)) {
await this.crawlState.setWACZFilename(); const filename = await this.crawlState.setWACZFilename();
if (this.deduping) {
await this.crawlState.addSourceWACZForDedupe(filename);
}
} }
if (POST_CRAWL_STATES.includes(initState)) { if (POST_CRAWL_STATES.includes(initState)) {
@ -1916,30 +1919,34 @@ self.__bx_behaviors.selectMainBehavior();
const wacz = await this.generateWACZ(); const wacz = await this.generateWACZ();
if (wacz) { if (wacz) {
if (this.deduping) {
await this.crawlState.setStatus("post-crawl");
await this.crawlState.updateDedupeSource(wacz);
await this.crawlState.clearDupeFileRef();
}
await this.crawlState.clearWACZFilename(); await this.crawlState.clearWACZFilename();
}
if (wacz && this.storage && this.uploadAndDeleteLocal) { if (this.deduping) {
await this.crawlState.setArchiveSize(0); await this.crawlState.updateDedupeSourceWACZ(wacz);
}
logger.info( if (this.storage && this.uploadAndDeleteLocal) {
`Uploaded WACZ, deleting local data to free up space: ${this.collDir}`, await this.crawlState.setArchiveSize(0);
);
try { logger.info(
fs.rmSync(this.collDir, { recursive: true, force: true }); `Uploaded WACZ, deleting local data to free up space: ${this.collDir}`,
} catch (e) { );
logger.warn(`Unable to clear ${this.collDir} before exit`, e); try {
fs.rmSync(this.collDir, { recursive: true, force: true });
} catch (e) {
logger.warn(`Unable to clear ${this.collDir} before exit`, e);
}
} }
} }
} }
if (this.deduping) {
//await this.crawlState.clearDupeCrawlRef();
// commit crawl data to main index
await this.crawlState.commitDedupeDone();
}
if (this.finalExit && generateFiles && this.params.saveProfile) { if (this.finalExit && generateFiles && this.params.saveProfile) {
const resource = await this.browser.saveProfile( const resource = await this.browser.saveProfile(
this.params.saveProfile, this.params.saveProfile,
@ -2015,7 +2022,7 @@ self.__bx_behaviors.selectMainBehavior();
await this.closeLog(); await this.closeLog();
const requires = await this.crawlState.getDupeDependentSources(); const requires = await this.crawlState.getDupeDependentCrawls();
const waczOpts: WACZInitOpts = { const waczOpts: WACZInitOpts = {
input: warcFileList.map((x) => path.join(this.archivesDir, x)), input: warcFileList.map((x) => path.join(this.archivesDir, x)),

View file

@ -841,7 +841,7 @@ export class Recorder extends EventEmitter {
requestId, requestId,
errorReason, errorReason,
}); });
await this.crawlState.addDupeCrawlRef(crawlId, index); await this.crawlState.addDupeCrawlDependency(crawlId, index);
return true; return true;
} }
} }
@ -1701,7 +1701,7 @@ export class Recorder extends EventEmitter {
origUrl, origUrl,
date, date,
)); ));
await this.crawlState.addDupeCrawlRef(crawlId, index); await this.crawlState.addDupeCrawlDependency(crawlId, index);
isDupe = true; isDupe = true;
} else { } else {
// no dupe, continue // no dupe, continue

View file

@ -210,7 +210,7 @@ export type DedupeSourceEntry = {
export class RedisDedupeIndex { export class RedisDedupeIndex {
dedupeRedis: Redis; dedupeRedis: Redis;
crawlId: string; crawlId: string;
dedupeKeyIndex = -1; dedupeKeyIndex = 0;
dedupeCurrFilename = ""; dedupeCurrFilename = "";
sourceDone = "src:d"; sourceDone = "src:d";
@ -224,37 +224,32 @@ export class RedisDedupeIndex {
this.crawlId = crawlId; this.crawlId = crawlId;
} }
// DEDUPE SOURCE // DEDUPE SOURCE WACZ (to track dependencies)
async addSourceForDedupe(filename: string) { async addSourceWACZForDedupe(filename: string) {
//const count = await this.dedupeRedis.incr(`c:${key}:count`) - 1; const crawlId = this.crawlId;
const count = const count =
(await this.dedupeRedis.rpush( (await this.dedupeRedis.rpush(
`c:${this.crawlId}:wacz`, `c:${crawlId}:wacz`,
JSON.stringify({ filename }), JSON.stringify({ filename }),
)) - 1; )) - 1;
this.dedupeCurrFilename = filename; this.dedupeCurrFilename = filename;
this.dedupeKeyIndex = count; this.dedupeKeyIndex = count;
} }
async updateDedupeSource(wacz: WACZ) { async updateDedupeSourceWACZ(wacz: WACZ) {
if (this.dedupeKeyIndex < 0) {
return;
}
const value: DedupeSourceEntry = { const value: DedupeSourceEntry = {
filename: wacz.getLocalFilename() || this.dedupeCurrFilename, filename: wacz.getLocalFilename() || this.dedupeCurrFilename,
hash: wacz.getHash(), hash: wacz.getHash(),
size: wacz.getSize(), size: wacz.getSize(),
}; };
const crawlId = this.crawlId;
await this.dedupeRedis.lset( await this.dedupeRedis.lset(
`c:${this.crawlId}:wacz`, `c:${crawlId}:wacz`,
this.dedupeKeyIndex, this.dedupeKeyIndex,
JSON.stringify(value), JSON.stringify(value),
); );
await this.commitDedupeDone();
} }
// COMMIT DEDUPE TO SHARED INDEX // COMMIT DEDUPE TO SHARED INDEX
@ -325,9 +320,12 @@ export class RedisDedupeIndex {
await this.dedupeRedis.lpush(this.sourceQ, data); await this.dedupeRedis.lpush(this.sourceQ, data);
} }
async addImportedSourceForDedupe(key: string, entry: DedupeSourceEntry) { async addImportedSourceForDedupe(crawlId: string, entry: DedupeSourceEntry) {
return ( return (
(await this.dedupeRedis.rpush(`c:${key}:wacz`, JSON.stringify(entry))) - 1 (await this.dedupeRedis.rpush(
`c:${crawlId}:wacz`,
JSON.stringify(entry),
)) - 1
); );
} }
@ -766,7 +764,6 @@ return inx;
"state", "state",
); );
} }
await this.addSourceForDedupe(this.waczFilename!);
return this.waczFilename!; return this.waczFilename!;
} }
@ -1396,29 +1393,32 @@ return inx;
); );
} }
// DEPENDENT CRAWLS FOR DEDUPE // DEPENDENT CRAWLS FOR DEDUPE (requires WACZ)
async addDupeCrawlRef(crawlId: string, index: string) { async addDupeCrawlDependency(crawlId: string, index: string) {
await this.redis.sadd(`${this.uid}:duperef`, crawlId + " " + index); await this.redis.sadd(`${this.uid}:duperef`, crawlId + " " + index);
await this.redis.sadd(`${this.crawlId}:reqCrawls`, crawlId); await this.redis.sadd(`${this.crawlId}:reqCrawls`, crawlId);
} }
async clearDupeFileRef() { // async clearDupeCrawlDependency() {
await this.redis.del(`${this.uid}:duperef`); // await this.redis.del(`${this.uid}:duperef`);
} // }
async getDupeDependentSources() { // Requires crawling with WACZ to match dependencies
async getDupeDependentCrawls() {
const dependRefs = await this.redis.smembers(`${this.uid}:duperef`); const dependRefs = await this.redis.smembers(`${this.uid}:duperef`);
const crawlIds = []; const crawlIds = [];
for (const value of dependRefs) { for (const value of dependRefs) {
const [crawlId, index] = value.split(" "); const [crawlId, index] = value.split(" ");
const source = await this.dedupeRedis.lindex( if (crawlId && crawlId !== this.crawlId) {
`c:${crawlId}:wacz`, const source = await this.dedupeRedis.lindex(
Number(index), `c:${crawlId}:wacz`,
); Number(index),
if (crawlId && crawlId !== this.crawlId && source) { );
const entry = JSON.parse(source); if (source) {
entry.crawlId = crawlId; const entry = JSON.parse(source);
crawlIds.push(entry); entry.crawlId = crawlId;
crawlIds.push(entry);
}
} }
} }
return crawlIds; return crawlIds;

View file

@ -28,10 +28,10 @@ afterAll(async () => {
execSync("docker network rm dedupe"); execSync("docker network rm dedupe");
}); });
function runCrawl(name, db="0") { function runCrawl(name, {db = 0, limit = 4, wacz = true} = {}) {
fs.rmSync(`./test-crawls/collections/${name}`, { recursive: true, force: true }); fs.rmSync(`./test-crawls/collections/${name}`, { recursive: true, force: true });
const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupeUrl redis://dedupe-redis:6379/${db} --generateWACZ`); const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit ${limit} --exclude community --collection ${name} --redisDedupeUrl redis://dedupe-redis:6379/${db} ${wacz ? "--generateWACZ" : ""}`);
return new Promise((resolve) => { return new Promise((resolve) => {
crawler.on("exit", (code) => { crawler.on("exit", (code) => {
@ -54,6 +54,16 @@ function loadFirstWARC(name) {
return parser; return parser;
} }
function deleteFirstWARC(name) {
const archiveWarcLists = fs.readdirSync(
`test-crawls/collections/${name}/archive`,
);
const warcName = path.join(`test-crawls/collections/${name}/archive`, archiveWarcLists[0]);
fs.unlinkSync(warcName);
}
function loadDataPackageRelated(name) { function loadDataPackageRelated(name) {
execSync( execSync(
`unzip test-crawls/collections/${name}/${name}.wacz -d test-crawls/collections/${name}/wacz`, `unzip test-crawls/collections/${name}/${name}.wacz -d test-crawls/collections/${name}/wacz`,
@ -67,11 +77,60 @@ function loadDataPackageRelated(name) {
return dataPackageJSON.relation; return dataPackageJSON.relation;
} }
test("check revisit records written on duplicate crawl, same collection, no wacz", async () => {
test("check revisit records written on duplicate crawl", async () => { const collName = "dedupe-test-same-coll";
expect(await runCrawl("dedupe-test-orig")).toBe(0); expect(await runCrawl(collName, {limit: 1, wacz: false})).toBe(0);
expect(await runCrawl("dedupe-test-dupe")).toBe(0);
let statusCode = -1;
let response = 0;
let revisit = 0;
const parserOrig = loadFirstWARC(collName);
for await (const record of parserOrig) {
if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) {
continue;
}
if (record.warcType === "response") {
response++;
}
}
deleteFirstWARC(collName);
expect(await runCrawl(collName, {limit: 1, wacz: false})).toBe(0);
const dupeOrig = loadFirstWARC(collName);
for await (const record of dupeOrig) {
if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) {
continue;
}
if (record.warcType === "revisit") {
revisit++;
}
}
expect(response).toBeGreaterThan(0);
// revisits should match number of responses for non urn:
expect(response).toBe(revisit);
numResponses = response;
});
test("check revisit records written on duplicate crawl, different collections, with wacz", async () => {
expect(await runCrawl("dedupe-test-orig", {db: 1})).toBe(0);
expect(await runCrawl("dedupe-test-dupe", {db: 1})).toBe(0);
let statusCode = -1; let statusCode = -1;
@ -111,11 +170,11 @@ test("check revisit records written on duplicate crawl", async () => {
}); });
test("import index and crawl dupe", async () => { test("import dupe index from wacz", async () => {
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedupe-test-orig/dedupe-test-orig.wacz --sourceCrawlId dedupe-test-orig --redisDedupeUrl redis://dedupe-redis:6379/1`); execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedupe-test-orig/dedupe-test-orig.wacz --sourceCrawlId dedupe-test-orig --redisDedupeUrl redis://dedupe-redis:6379/2`);
const redis = new Redis("redis://127.0.0.1:37379/1", { lazyConnect: true, retryStrategy: () => null }); const redis = new Redis("redis://127.0.0.1:37379/2", { lazyConnect: true, retryStrategy: () => null });
await redis.connect({maxRetriesPerRequest: 50}); await redis.connect({maxRetriesPerRequest: 50});
@ -123,8 +182,8 @@ test("import index and crawl dupe", async () => {
}); });
test("imported crawl dupe matches previous dupe count", async () => { test("verify crawl with imported dupe index has same dupes as dedupe against original", async () => {
expect(await runCrawl("dedupe-test-dupe-2", 1)).toBe(0); expect(await runCrawl("dedupe-test-dupe-2", {db: 2})).toBe(0);
const dupeOrig = loadFirstWARC("dedupe-test-dupe-2"); const dupeOrig = loadFirstWARC("dedupe-test-dupe-2");