dedup work:

- resource dedup via page digest
- page dedup via page digest check, blocking of dupe page
This commit is contained in:
Ilya Kreymer 2025-08-30 12:41:10 -07:00
parent 8e44b31b45
commit 00eca5329d
3 changed files with 148 additions and 20 deletions

View file

@ -1075,7 +1075,7 @@ self.__bx_behaviors.selectMainBehavior();
const { page, cdp, data, workerid, callbacks, recorder } = opts;
data.callbacks = callbacks;
const { url, seedId } = data;
const { url, seedId, depth } = data;
const auth = this.seeds[seedId].authHeader();
@ -1148,6 +1148,7 @@ self.__bx_behaviors.selectMainBehavior();
if (recorder) {
recorder.pageSeed = seed;
recorder.pageSeedDepth = depth;
}
// run custom driver here, if any
@ -1326,6 +1327,7 @@ self.__bx_behaviors.selectMainBehavior();
} else {
if (pageSkipped) {
await this.crawlState.markExcluded(url);
this.limitHit = false;
} else {
const retry = await this.crawlState.markFailed(url);
@ -2198,7 +2200,12 @@ self.__bx_behaviors.selectMainBehavior();
// excluded in recorder
if (msg.startsWith("net::ERR_BLOCKED_BY_RESPONSE")) {
data.pageSkipped = true;
logger.warn("Page Load Blocked, skipping", { msg, loadState });
logger.warn(
"Page Load Blocked, skipping",
{ msg, loadState },
"pageStatus",
);
throw new Error("logged");
} else {
return this.pageFailed("Page Load Failed", retry, {
msg,

View file

@ -27,6 +27,7 @@ import { getProxyDispatcher } from "./proxy.js";
import { ScopedSeed } from "./seeds.js";
import EventEmitter from "events";
import { DEFAULT_MAX_RETRIES } from "./constants.js";
import { createHash } from "crypto";
const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000;
const MAX_TEXT_REWRITE_SIZE = 25_000_000;
@ -37,7 +38,7 @@ const TAKE_STREAM_BUFF_SIZE = 1024 * 64;
const ASYNC_FETCH_DUPE_KEY = "s:fetchdupe";
const WRITE_DUPE_KEY = "s:writedupe";
const WRITE_DUPE_KEY = "dupe";
const MIME_EVENT_STREAM = "text/event-stream";
@ -140,6 +141,7 @@ export class Recorder extends EventEmitter {
pageid!: string;
pageSeed?: ScopedSeed;
pageSeedDepth = 0;
frameIdToExecId: Map<string, number> | null;
@ -819,6 +821,20 @@ export class Recorder extends EventEmitter {
const rewritten = await this.rewriteResponse(reqresp, mimeType);
if (url === this.pageUrl && reqresp.payload && this.pageSeedDepth >= 1) {
const hash =
"sha256:" + createHash("sha256").update(reqresp.payload).digest("hex");
const res = await this.crawlState.getHashDupe(WRITE_DUPE_KEY, hash, url);
if (res && res.dupe) {
const errorReason = "BlockedByResponse";
await cdp.send("Fetch.failRequest", {
requestId,
errorReason,
});
return true;
}
}
// not rewritten, and not streaming, return false to continue
if (!rewritten && !streamingConsume) {
if (!reqresp.payload) {
@ -1498,11 +1514,9 @@ export class Recorder extends EventEmitter {
const { url } = reqresp;
const { logDetails } = this;
try {
let readSize = await serializer.digestRecord();
if (serializer.httpHeadersBuff) {
readSize -= serializer.httpHeadersBuff.length;
}
reqresp.readSize = readSize;
reqresp.readSize = await serializer.digestRecord({
includeHeadersSize: false,
});
// set truncated field and recompute header buff
if (reqresp.truncated) {
logger.warn(
@ -1588,20 +1602,20 @@ export class Recorder extends EventEmitter {
return false;
}
if (
url &&
method === "GET" &&
!isRedirectStatus(status) &&
!(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, status))
) {
logNetwork("Skipping dupe", { url, status, ...this.logDetails });
return false;
}
// if (
// url &&
// method === "GET" &&
// !isRedirectStatus(status) &&
// !(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, status))
// ) {
// logNetwork("Skipping dupe", { url, status, ...this.logDetails });
// return false;
// }
const responseRecord = createResponse(reqresp, pageid, iter);
let responseRecord = createResponse(reqresp, pageid, iter);
const requestRecord = createRequest(reqresp, responseRecord, pageid);
const serializer = new WARCSerializer(responseRecord, {
let serializer = new WARCSerializer(responseRecord, {
gzip,
maxMemSize: MAX_BROWSER_DEFAULT_FETCH_SIZE,
});
@ -1612,6 +1626,7 @@ export class Recorder extends EventEmitter {
) {
serializer.externalBuffer?.purge();
await this.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status);
//await this.crawlState.removeDupe(WRITE_DUPE_KEY, url, status);
return false;
}
@ -1638,7 +1653,39 @@ export class Recorder extends EventEmitter {
}
}
} else {
await serializer.digestRecord();
reqresp.readSize = await serializer.digestRecord({
includeHeadersSize: false,
});
}
const hash = responseRecord.warcPayloadDigest || "";
const date = responseRecord.warcDate || "";
const isEmpty = reqresp.readSize === 0;
if (!isEmpty && url && method === "GET" && !isRedirectStatus(status)) {
const { dupe, origUrl, origDate } = await this.crawlState.getHashDupe(
WRITE_DUPE_KEY,
hash,
url,
);
if (dupe) {
// duplicate url at origTs
// skip, no need for revisit
logNetwork("Skipping dupe", { url, status, ...this.logDetails });
return false;
} else if (origUrl && origDate) {
serializer.externalBuffer?.purge();
({ responseRecord, serializer } = await createRevisitForResponse(
responseRecord,
serializer,
origUrl,
origDate,
));
} else {
// no dupe, continue
}
}
let modified = false;
@ -1669,6 +1716,10 @@ export class Recorder extends EventEmitter {
this.addPageRecord(reqresp);
if (!isEmpty) {
await this.crawlState.addHashDupe(WRITE_DUPE_KEY, hash, url, date);
}
return true;
}
}
@ -2032,6 +2083,48 @@ function createResponse(
);
}
// =================================================================
// revisit
async function createRevisitForResponse(
responseRecord: WARCRecord,
serializer: WARCSerializer,
refersToUrl: string,
refersToDate: string,
) {
const origPayloadDigest = responseRecord.warcPayloadDigest;
const warcHeaders: Record<string, string> = {
"WARC-Page-ID": responseRecord.warcHeaders.headers.get("WARC-Page-ID")!,
};
const revisitRecord = WARCRecord.create({
url: responseRecord.warcTargetURI!,
date: responseRecord.warcDate!,
warcVersion: "WARC/1.1",
type: "revisit",
warcHeaders,
refersToUrl,
refersToDate,
});
revisitRecord.httpHeaders = responseRecord.httpHeaders;
serializer = new WARCSerializer(revisitRecord, {
gzip: true,
maxMemSize: MAX_BROWSER_DEFAULT_FETCH_SIZE,
});
await serializer.digestRecord();
if (origPayloadDigest) {
revisitRecord.warcHeaders.headers.set(
"WARC-Payload-Digest",
origPayloadDigest,
);
}
return { serializer, responseRecord: revisitRecord };
}
// =================================================================
// request
function createRequest(

View file

@ -1020,6 +1020,34 @@ return inx;
return await this.redis.srem(key, normalizeDedupStatus(status) + "|" + url);
}
async getHashDupe(
key: string,
hash: string,
url: string,
): Promise<{ dupe?: boolean; origDate?: string; origUrl?: string }> {
const value = await this.redis.hget(key, hash);
if (!value) {
return {};
}
const val = value.split("|");
// if matches the first entry, return
if (val[1] === url) {
return { dupe: true };
}
// otherwise, check if a revisit entry
if (await this.redis.sismember(`${key}:${hash}`, url)) {
return { dupe: true };
}
return { origUrl: val[1], origDate: val[0] };
}
async addHashDupe(key: string, hash: string, url: string, date: string) {
const val = date + "|" + url;
if (!(await this.redis.hsetnx(key, hash, val))) {
await this.redis.sadd(`${key}:${hash}`, url);
}
}
async isInUserSet(value: string) {
return (await this.redis.sismember(this.key + ":user", value)) === 1;
}