diff --git a/src/crawler.ts b/src/crawler.ts index b48e0846..9b12bec3 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -1714,7 +1714,10 @@ self.__bx_behaviors.selectMainBehavior(); } if (this.params.generateWACZ && (this.storage || this.deduping)) { - await this.crawlState.setWACZFilename(); + const filename = await this.crawlState.setWACZFilename(); + if (this.deduping) { + await this.crawlState.addSourceWACZForDedupe(filename); + } } if (POST_CRAWL_STATES.includes(initState)) { @@ -1916,30 +1919,34 @@ self.__bx_behaviors.selectMainBehavior(); const wacz = await this.generateWACZ(); if (wacz) { - if (this.deduping) { - await this.crawlState.setStatus("post-crawl"); - await this.crawlState.updateDedupeSource(wacz); - - await this.crawlState.clearDupeFileRef(); - } - await this.crawlState.clearWACZFilename(); - } - if (wacz && this.storage && this.uploadAndDeleteLocal) { - await this.crawlState.setArchiveSize(0); + if (this.deduping) { + await this.crawlState.updateDedupeSourceWACZ(wacz); + } - logger.info( - `Uploaded WACZ, deleting local data to free up space: ${this.collDir}`, - ); - try { - fs.rmSync(this.collDir, { recursive: true, force: true }); - } catch (e) { - logger.warn(`Unable to clear ${this.collDir} before exit`, e); + if (this.storage && this.uploadAndDeleteLocal) { + await this.crawlState.setArchiveSize(0); + + logger.info( + `Uploaded WACZ, deleting local data to free up space: ${this.collDir}`, + ); + try { + fs.rmSync(this.collDir, { recursive: true, force: true }); + } catch (e) { + logger.warn(`Unable to clear ${this.collDir} before exit`, e); + } } } } + if (this.deduping) { + //await this.crawlState.clearDupeCrawlRef(); + + // commit crawl data to main index + await this.crawlState.commitDedupeDone(); + } + if (this.finalExit && generateFiles && this.params.saveProfile) { const resource = await this.browser.saveProfile( this.params.saveProfile, @@ -2015,7 +2022,7 @@ self.__bx_behaviors.selectMainBehavior(); await this.closeLog(); - const requires = await this.crawlState.getDupeDependentSources(); + const requires = await this.crawlState.getDupeDependentCrawls(); const waczOpts: WACZInitOpts = { input: warcFileList.map((x) => path.join(this.archivesDir, x)), diff --git a/src/util/recorder.ts b/src/util/recorder.ts index fca0619f..46c42977 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -841,7 +841,7 @@ export class Recorder extends EventEmitter { requestId, errorReason, }); - await this.crawlState.addDupeCrawlRef(crawlId, index); + await this.crawlState.addDupeCrawlDependency(crawlId, index); return true; } } @@ -1701,7 +1701,7 @@ export class Recorder extends EventEmitter { origUrl, date, )); - await this.crawlState.addDupeCrawlRef(crawlId, index); + await this.crawlState.addDupeCrawlDependency(crawlId, index); isDupe = true; } else { // no dupe, continue diff --git a/src/util/state.ts b/src/util/state.ts index 8f814739..39732fba 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -210,7 +210,7 @@ export type DedupeSourceEntry = { export class RedisDedupeIndex { dedupeRedis: Redis; crawlId: string; - dedupeKeyIndex = -1; + dedupeKeyIndex = 0; dedupeCurrFilename = ""; sourceDone = "src:d"; @@ -224,37 +224,32 @@ export class RedisDedupeIndex { this.crawlId = crawlId; } - // DEDUPE SOURCE + // DEDUPE SOURCE WACZ (to track dependencies) - async addSourceForDedupe(filename: string) { - //const count = await this.dedupeRedis.incr(`c:${key}:count`) - 1; + async addSourceWACZForDedupe(filename: string) { + const crawlId = this.crawlId; const count = (await this.dedupeRedis.rpush( - `c:${this.crawlId}:wacz`, + `c:${crawlId}:wacz`, JSON.stringify({ filename }), )) - 1; this.dedupeCurrFilename = filename; this.dedupeKeyIndex = count; } - async updateDedupeSource(wacz: WACZ) { - if (this.dedupeKeyIndex < 0) { - return; - } - + async updateDedupeSourceWACZ(wacz: WACZ) { const value: DedupeSourceEntry = { filename: wacz.getLocalFilename() || this.dedupeCurrFilename, hash: wacz.getHash(), size: wacz.getSize(), }; + const crawlId = this.crawlId; await this.dedupeRedis.lset( - `c:${this.crawlId}:wacz`, + `c:${crawlId}:wacz`, this.dedupeKeyIndex, JSON.stringify(value), ); - - await this.commitDedupeDone(); } // COMMIT DEDUPE TO SHARED INDEX @@ -325,9 +320,12 @@ export class RedisDedupeIndex { await this.dedupeRedis.lpush(this.sourceQ, data); } - async addImportedSourceForDedupe(key: string, entry: DedupeSourceEntry) { + async addImportedSourceForDedupe(crawlId: string, entry: DedupeSourceEntry) { return ( - (await this.dedupeRedis.rpush(`c:${key}:wacz`, JSON.stringify(entry))) - 1 + (await this.dedupeRedis.rpush( + `c:${crawlId}:wacz`, + JSON.stringify(entry), + )) - 1 ); } @@ -766,7 +764,6 @@ return inx; "state", ); } - await this.addSourceForDedupe(this.waczFilename!); return this.waczFilename!; } @@ -1396,29 +1393,32 @@ return inx; ); } - // DEPENDENT CRAWLS FOR DEDUPE - async addDupeCrawlRef(crawlId: string, index: string) { + // DEPENDENT CRAWLS FOR DEDUPE (requires WACZ) + async addDupeCrawlDependency(crawlId: string, index: string) { await this.redis.sadd(`${this.uid}:duperef`, crawlId + " " + index); await this.redis.sadd(`${this.crawlId}:reqCrawls`, crawlId); } - async clearDupeFileRef() { - await this.redis.del(`${this.uid}:duperef`); - } + // async clearDupeCrawlDependency() { + // await this.redis.del(`${this.uid}:duperef`); + // } - async getDupeDependentSources() { + // Requires crawling with WACZ to match dependencies + async getDupeDependentCrawls() { const dependRefs = await this.redis.smembers(`${this.uid}:duperef`); const crawlIds = []; for (const value of dependRefs) { const [crawlId, index] = value.split(" "); - const source = await this.dedupeRedis.lindex( - `c:${crawlId}:wacz`, - Number(index), - ); - if (crawlId && crawlId !== this.crawlId && source) { - const entry = JSON.parse(source); - entry.crawlId = crawlId; - crawlIds.push(entry); + if (crawlId && crawlId !== this.crawlId) { + const source = await this.dedupeRedis.lindex( + `c:${crawlId}:wacz`, + Number(index), + ); + if (source) { + const entry = JSON.parse(source); + entry.crawlId = crawlId; + crawlIds.push(entry); + } } } return crawlIds; diff --git a/tests/dedupe-basic.test.js b/tests/dedupe-basic.test.js index 759b1c89..2c2526d1 100644 --- a/tests/dedupe-basic.test.js +++ b/tests/dedupe-basic.test.js @@ -28,10 +28,10 @@ afterAll(async () => { execSync("docker network rm dedupe"); }); -function runCrawl(name, db="0") { +function runCrawl(name, {db = 0, limit = 4, wacz = true} = {}) { fs.rmSync(`./test-crawls/collections/${name}`, { recursive: true, force: true }); - const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupeUrl redis://dedupe-redis:6379/${db} --generateWACZ`); + const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit ${limit} --exclude community --collection ${name} --redisDedupeUrl redis://dedupe-redis:6379/${db} ${wacz ? "--generateWACZ" : ""}`); return new Promise((resolve) => { crawler.on("exit", (code) => { @@ -54,6 +54,16 @@ function loadFirstWARC(name) { return parser; } +function deleteFirstWARC(name) { + const archiveWarcLists = fs.readdirSync( + `test-crawls/collections/${name}/archive`, + ); + + const warcName = path.join(`test-crawls/collections/${name}/archive`, archiveWarcLists[0]); + + fs.unlinkSync(warcName); +} + function loadDataPackageRelated(name) { execSync( `unzip test-crawls/collections/${name}/${name}.wacz -d test-crawls/collections/${name}/wacz`, @@ -67,11 +77,60 @@ function loadDataPackageRelated(name) { return dataPackageJSON.relation; } +test("check revisit records written on duplicate crawl, same collection, no wacz", async () => { -test("check revisit records written on duplicate crawl", async () => { + const collName = "dedupe-test-same-coll"; - expect(await runCrawl("dedupe-test-orig")).toBe(0); - expect(await runCrawl("dedupe-test-dupe")).toBe(0); + expect(await runCrawl(collName, {limit: 1, wacz: false})).toBe(0); + + let statusCode = -1; + + let response = 0; + let revisit = 0; + + const parserOrig = loadFirstWARC(collName); + + for await (const record of parserOrig) { + if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) { + continue; + } + + if (record.warcType === "response") { + response++; + } + } + + deleteFirstWARC(collName); + + expect(await runCrawl(collName, {limit: 1, wacz: false})).toBe(0); + + const dupeOrig = loadFirstWARC(collName); + + for await (const record of dupeOrig) { + if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) { + continue; + } + + if (record.warcType === "revisit") { + revisit++; + } + } + + expect(response).toBeGreaterThan(0); + + // revisits should match number of responses for non urn: + expect(response).toBe(revisit); + + numResponses = response; +}); + + + + +test("check revisit records written on duplicate crawl, different collections, with wacz", async () => { + + expect(await runCrawl("dedupe-test-orig", {db: 1})).toBe(0); + expect(await runCrawl("dedupe-test-dupe", {db: 1})).toBe(0); let statusCode = -1; @@ -111,11 +170,11 @@ test("check revisit records written on duplicate crawl", async () => { }); -test("import index and crawl dupe", async () => { +test("import dupe index from wacz", async () => { - execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedupe-test-orig/dedupe-test-orig.wacz --sourceCrawlId dedupe-test-orig --redisDedupeUrl redis://dedupe-redis:6379/1`); + execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedupe-test-orig/dedupe-test-orig.wacz --sourceCrawlId dedupe-test-orig --redisDedupeUrl redis://dedupe-redis:6379/2`); - const redis = new Redis("redis://127.0.0.1:37379/1", { lazyConnect: true, retryStrategy: () => null }); + const redis = new Redis("redis://127.0.0.1:37379/2", { lazyConnect: true, retryStrategy: () => null }); await redis.connect({maxRetriesPerRequest: 50}); @@ -123,8 +182,8 @@ test("import index and crawl dupe", async () => { }); -test("imported crawl dupe matches previous dupe count", async () => { - expect(await runCrawl("dedupe-test-dupe-2", 1)).toBe(0); +test("verify crawl with imported dupe index has same dupes as dedupe against original", async () => { + expect(await runCrawl("dedupe-test-dupe-2", {db: 2})).toBe(0); const dupeOrig = loadFirstWARC("dedupe-test-dupe-2");