From 00eca5329d76930d4ec691bafe7d642659670530 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 30 Aug 2025 12:41:10 -0700 Subject: [PATCH] dedup work: - resource dedup via page digest - page dedup via page digest check, blocking of dupe page --- src/crawler.ts | 11 +++- src/util/recorder.ts | 129 +++++++++++++++++++++++++++++++++++++------ src/util/state.ts | 28 ++++++++++ 3 files changed, 148 insertions(+), 20 deletions(-) diff --git a/src/crawler.ts b/src/crawler.ts index d822791b..233cc412 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -1075,7 +1075,7 @@ self.__bx_behaviors.selectMainBehavior(); const { page, cdp, data, workerid, callbacks, recorder } = opts; data.callbacks = callbacks; - const { url, seedId } = data; + const { url, seedId, depth } = data; const auth = this.seeds[seedId].authHeader(); @@ -1148,6 +1148,7 @@ self.__bx_behaviors.selectMainBehavior(); if (recorder) { recorder.pageSeed = seed; + recorder.pageSeedDepth = depth; } // run custom driver here, if any @@ -1326,6 +1327,7 @@ self.__bx_behaviors.selectMainBehavior(); } else { if (pageSkipped) { await this.crawlState.markExcluded(url); + this.limitHit = false; } else { const retry = await this.crawlState.markFailed(url); @@ -2198,7 +2200,12 @@ self.__bx_behaviors.selectMainBehavior(); // excluded in recorder if (msg.startsWith("net::ERR_BLOCKED_BY_RESPONSE")) { data.pageSkipped = true; - logger.warn("Page Load Blocked, skipping", { msg, loadState }); + logger.warn( + "Page Load Blocked, skipping", + { msg, loadState }, + "pageStatus", + ); + throw new Error("logged"); } else { return this.pageFailed("Page Load Failed", retry, { msg, diff --git a/src/util/recorder.ts b/src/util/recorder.ts index d36930c5..e8e19822 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -27,6 +27,7 @@ import { getProxyDispatcher } from "./proxy.js"; import { ScopedSeed } from "./seeds.js"; import EventEmitter from "events"; import { DEFAULT_MAX_RETRIES } from "./constants.js"; +import { createHash } from "crypto"; const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000; const MAX_TEXT_REWRITE_SIZE = 25_000_000; @@ -37,7 +38,7 @@ const TAKE_STREAM_BUFF_SIZE = 1024 * 64; const ASYNC_FETCH_DUPE_KEY = "s:fetchdupe"; -const WRITE_DUPE_KEY = "s:writedupe"; +const WRITE_DUPE_KEY = "dupe"; const MIME_EVENT_STREAM = "text/event-stream"; @@ -140,6 +141,7 @@ export class Recorder extends EventEmitter { pageid!: string; pageSeed?: ScopedSeed; + pageSeedDepth = 0; frameIdToExecId: Map | null; @@ -819,6 +821,20 @@ export class Recorder extends EventEmitter { const rewritten = await this.rewriteResponse(reqresp, mimeType); + if (url === this.pageUrl && reqresp.payload && this.pageSeedDepth >= 1) { + const hash = + "sha256:" + createHash("sha256").update(reqresp.payload).digest("hex"); + const res = await this.crawlState.getHashDupe(WRITE_DUPE_KEY, hash, url); + if (res && res.dupe) { + const errorReason = "BlockedByResponse"; + await cdp.send("Fetch.failRequest", { + requestId, + errorReason, + }); + return true; + } + } + // not rewritten, and not streaming, return false to continue if (!rewritten && !streamingConsume) { if (!reqresp.payload) { @@ -1498,11 +1514,9 @@ export class Recorder extends EventEmitter { const { url } = reqresp; const { logDetails } = this; try { - let readSize = await serializer.digestRecord(); - if (serializer.httpHeadersBuff) { - readSize -= serializer.httpHeadersBuff.length; - } - reqresp.readSize = readSize; + reqresp.readSize = await serializer.digestRecord({ + includeHeadersSize: false, + }); // set truncated field and recompute header buff if (reqresp.truncated) { logger.warn( @@ -1588,20 +1602,20 @@ export class Recorder extends EventEmitter { return false; } - if ( - url && - method === "GET" && - !isRedirectStatus(status) && - !(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, status)) - ) { - logNetwork("Skipping dupe", { url, status, ...this.logDetails }); - return false; - } + // if ( + // url && + // method === "GET" && + // !isRedirectStatus(status) && + // !(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, status)) + // ) { + // logNetwork("Skipping dupe", { url, status, ...this.logDetails }); + // return false; + // } - const responseRecord = createResponse(reqresp, pageid, iter); + let responseRecord = createResponse(reqresp, pageid, iter); const requestRecord = createRequest(reqresp, responseRecord, pageid); - const serializer = new WARCSerializer(responseRecord, { + let serializer = new WARCSerializer(responseRecord, { gzip, maxMemSize: MAX_BROWSER_DEFAULT_FETCH_SIZE, }); @@ -1612,6 +1626,7 @@ export class Recorder extends EventEmitter { ) { serializer.externalBuffer?.purge(); await this.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status); + //await this.crawlState.removeDupe(WRITE_DUPE_KEY, url, status); return false; } @@ -1638,7 +1653,39 @@ export class Recorder extends EventEmitter { } } } else { - await serializer.digestRecord(); + reqresp.readSize = await serializer.digestRecord({ + includeHeadersSize: false, + }); + } + + const hash = responseRecord.warcPayloadDigest || ""; + const date = responseRecord.warcDate || ""; + + const isEmpty = reqresp.readSize === 0; + + if (!isEmpty && url && method === "GET" && !isRedirectStatus(status)) { + const { dupe, origUrl, origDate } = await this.crawlState.getHashDupe( + WRITE_DUPE_KEY, + hash, + url, + ); + + if (dupe) { + // duplicate url at origTs + // skip, no need for revisit + logNetwork("Skipping dupe", { url, status, ...this.logDetails }); + return false; + } else if (origUrl && origDate) { + serializer.externalBuffer?.purge(); + ({ responseRecord, serializer } = await createRevisitForResponse( + responseRecord, + serializer, + origUrl, + origDate, + )); + } else { + // no dupe, continue + } } let modified = false; @@ -1669,6 +1716,10 @@ export class Recorder extends EventEmitter { this.addPageRecord(reqresp); + if (!isEmpty) { + await this.crawlState.addHashDupe(WRITE_DUPE_KEY, hash, url, date); + } + return true; } } @@ -2032,6 +2083,48 @@ function createResponse( ); } +// ================================================================= +// revisit +async function createRevisitForResponse( + responseRecord: WARCRecord, + serializer: WARCSerializer, + refersToUrl: string, + refersToDate: string, +) { + const origPayloadDigest = responseRecord.warcPayloadDigest; + + const warcHeaders: Record = { + "WARC-Page-ID": responseRecord.warcHeaders.headers.get("WARC-Page-ID")!, + }; + + const revisitRecord = WARCRecord.create({ + url: responseRecord.warcTargetURI!, + date: responseRecord.warcDate!, + warcVersion: "WARC/1.1", + type: "revisit", + warcHeaders, + refersToUrl, + refersToDate, + }); + revisitRecord.httpHeaders = responseRecord.httpHeaders; + + serializer = new WARCSerializer(revisitRecord, { + gzip: true, + maxMemSize: MAX_BROWSER_DEFAULT_FETCH_SIZE, + }); + + await serializer.digestRecord(); + + if (origPayloadDigest) { + revisitRecord.warcHeaders.headers.set( + "WARC-Payload-Digest", + origPayloadDigest, + ); + } + + return { serializer, responseRecord: revisitRecord }; +} + // ================================================================= // request function createRequest( diff --git a/src/util/state.ts b/src/util/state.ts index 3df430fc..a8d97874 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -1020,6 +1020,34 @@ return inx; return await this.redis.srem(key, normalizeDedupStatus(status) + "|" + url); } + async getHashDupe( + key: string, + hash: string, + url: string, + ): Promise<{ dupe?: boolean; origDate?: string; origUrl?: string }> { + const value = await this.redis.hget(key, hash); + if (!value) { + return {}; + } + const val = value.split("|"); + // if matches the first entry, return + if (val[1] === url) { + return { dupe: true }; + } + // otherwise, check if a revisit entry + if (await this.redis.sismember(`${key}:${hash}`, url)) { + return { dupe: true }; + } + return { origUrl: val[1], origDate: val[0] }; + } + + async addHashDupe(key: string, hash: string, url: string, date: string) { + const val = date + "|" + url; + if (!(await this.redis.hsetnx(key, hash, val))) { + await this.redis.sadd(`${key}:${hash}`, url); + } + } + async isInUserSet(value: string) { return (await this.redis.sismember(this.key + ":user", value)) === 1; }