rename 'dedup' -> 'dedupe' for consistency

This commit is contained in:
Ilya Kreymer 2025-10-25 09:33:37 -07:00
parent c4f07c4e59
commit dd8d2e1ea7
6 changed files with 113 additions and 106 deletions

View file

@ -342,9 +342,9 @@ export class Crawler {
async initCrawlState() {
const redisUrl = this.params.redisStoreUrl || "redis://localhost:6379/0";
const dedupRedisUrl = this.params.redisDedupUrl || redisUrl;
const dedupeRedisUrl = this.params.redisDedupeUrl || redisUrl;
this.deduping = dedupRedisUrl !== redisUrl;
this.deduping = dedupeRedisUrl !== redisUrl;
if (!redisUrl.startsWith("redis://")) {
logger.fatal(
@ -360,10 +360,10 @@ export class Crawler {
"state",
);
let dedupRedis = redis;
let dedupeRedis = redis;
if (redisUrl !== dedupRedisUrl) {
dedupRedis = await initRedisWaitForSuccess(dedupRedisUrl);
if (redisUrl !== dedupeRedisUrl) {
dedupeRedis = await initRedisWaitForSuccess(dedupeRedisUrl);
}
logger.debug(`Max Page Time: ${this.maxPageTime} seconds`, {}, "state");
@ -374,7 +374,7 @@ export class Crawler {
this.maxPageTime,
os.hostname(),
this.params.maxPageRetries,
dedupRedis,
dedupeRedis,
);
if (this.params.logErrorsToRedis) {
@ -1898,7 +1898,7 @@ self.__bx_behaviors.selectMainBehavior();
if (wacz) {
if (this.deduping) {
await this.crawlState.setStatus("post-crawl");
await this.crawlState.updateDedupSource(wacz);
await this.crawlState.updateDedupeSource(wacz);
await this.crawlState.clearDupeFileRef();
}

View file

@ -7,10 +7,10 @@ import { WACZLoader } from "./util/wacz.js";
import { ExitCodes } from "./util/constants.js";
import { initRedisWaitForSuccess } from "./util/redis.js";
import { AsyncIterReader } from "warcio";
import { RedisDedupIndex } from "./util/state.js";
import { RedisDedupeIndex } from "./util/state.js";
import { basename } from "node:path";
export type DedupIndexEntry = {
export type DedupeIndexEntry = {
name: string;
url: string;
crawlId?: string;
@ -25,7 +25,7 @@ export class CrawlIndexer {
return yargs(process.argv)
.usage("indexer [options]")
.options({
redisDedupUrl: {
redisDedupeUrl: {
describe: "URL for remote redis instance to index into",
type: "string",
required: true,
@ -57,26 +57,28 @@ export class CrawlIndexer {
const params = this.initArgs();
const redis = await initRedisWaitForSuccess(params.redisDedupUrl);
const dedupIndex = new RedisDedupIndex(redis, "");
const redis = await initRedisWaitForSuccess(params.redisDedupeUrl);
const dedupeIndex = new RedisDedupeIndex(redis, "");
for await (const entry of this.iterWACZ(params.sourceUrl)) {
await dedupIndex.queueImportSource(entry.name, JSON.stringify(entry));
await dedupeIndex.queueImportSource(entry.name, JSON.stringify(entry));
}
let count = 0;
let res;
while ((res = await dedupIndex.nextQueuedImportSource())) {
while ((res = await dedupeIndex.nextQueuedImportSource())) {
const { name, entry, total } = res;
const { url, crawlId, size, hash } = JSON.parse(entry) as DedupIndexEntry;
const { url, crawlId, size, hash } = JSON.parse(
entry,
) as DedupeIndexEntry;
count += 1;
const loader = new WACZLoader(url);
logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile: url });
const crawlIdReal = crawlId || params.sourceCrawlId || url;
await dedupIndex.addImportedSourceForDedup(crawlIdReal, {
await dedupeIndex.addImportedSourceForDedupe(crawlIdReal, {
filename: name,
size,
hash,
@ -87,7 +89,7 @@ export class CrawlIndexer {
if (filename.endsWith(".cdx.gz")) {
logger.debug("Processing CDX GZ Index", { filename });
await this.ingestCDXJ(
dedupIndex,
dedupeIndex,
loader,
filename,
crawlIdReal,
@ -95,20 +97,20 @@ export class CrawlIndexer {
);
} else if (filename.endsWith(".cdx") || filename.endsWith(".cdxj")) {
logger.debug("Processing CDX Index", { filename });
await this.ingestCDXJ(dedupIndex, loader, filename, crawlIdReal);
await this.ingestCDXJ(dedupeIndex, loader, filename, crawlIdReal);
}
}
await dedupIndex.markImportSourceDone(name, crawlIdReal);
await dedupeIndex.markImportSourceDone(name, crawlIdReal);
}
logger.info("Done!");
await dedupIndex.markImportFinishedTS();
await dedupeIndex.markImportFinishedTS();
process.exit(ExitCodes.Success);
}
async ingestCDXJ(
dedupIndex: RedisDedupIndex,
dedupeIndex: RedisDedupeIndex,
loader: WACZLoader,
filename: string,
crawlId: string,
@ -152,14 +154,14 @@ export class CrawlIndexer {
continue;
}
// only adding originals to dedup against, don't want to dedup against existing revisits
// only adding originals to dedupe against, don't want to dedupe against existing revisits
if (cdx.mime === "warc/revisit") {
continue;
}
if (url && date && hash) {
await dedupIndex.addHashDupe(hash, url, date, crawlId);
await dedupIndex.addImportedForCrawl(hash, crawlId);
await dedupeIndex.addHashDupe(hash, url, date, crawlId);
await dedupeIndex.addImportedForCrawl(hash, crawlId);
} else {
logger.warn("Skipping invalid CDXJ, data missing", {
url,
@ -175,7 +177,7 @@ export class CrawlIndexer {
logger.debug("Processed", { count });
}
async *iterWACZ(url: string, name?: string): AsyncIterable<DedupIndexEntry> {
async *iterWACZ(url: string, name?: string): AsyncIterable<DedupeIndexEntry> {
let path: string = url;
try {

View file

@ -445,13 +445,13 @@ class ArgParser {
default: "redis://localhost:6379/0",
},
redisDedupUrl: {
redisDedupeUrl: {
describe:
"If set, url for remote redis server to store state. Otherwise, using local redis instance",
type: "string",
},
minPageDedupDepth: {
minPageDedupeDepth: {
describe:
"If set >= 0, minimum depth at which duplicate pages can be skipped. -1 means never skip duplicate pages",
type: "number",

View file

@ -143,7 +143,7 @@ export class Recorder extends EventEmitter {
pageSeed?: ScopedSeed;
pageSeedDepth = 0;
minPageDedupDepth = -1;
minPageDedupeDepth = -1;
frameIdToExecId: Map<string, number> | null;
@ -167,7 +167,7 @@ export class Recorder extends EventEmitter {
this.shouldSaveStorage = !!crawler.params.saveStorage;
this.minPageDedupDepth = crawler.params.minPageDedupDepth;
this.minPageDedupeDepth = crawler.params.minPageDedupeDepth;
this.writer = writer;
@ -828,8 +828,8 @@ export class Recorder extends EventEmitter {
if (
url === this.pageUrl &&
reqresp.payload &&
this.minPageDedupDepth >= 0 &&
this.pageSeedDepth >= this.minPageDedupDepth
this.minPageDedupeDepth >= 0 &&
this.pageSeedDepth >= this.minPageDedupeDepth
) {
const hash =
"sha256:" + createHash("sha256").update(reqresp.payload).digest("hex");

View file

@ -33,7 +33,7 @@ export enum QueueState {
// ============================================================================
// treat 0 or 206 as 200 for purposes of dedup
export function normalizeDedupStatus(status: number): string {
export function normalizeDedupeStatus(status: number): string {
if (status === 0 || status === 206) {
return "200";
}
@ -200,18 +200,18 @@ export type DedupeEntry = {
};
// ============================================================================
export type DedupSourceEntry = {
export type DedupeSourceEntry = {
filename: string;
size?: number;
hash?: string;
};
// ============================================================================
export class RedisDedupIndex {
dedupRedis: Redis;
export class RedisDedupeIndex {
dedupeRedis: Redis;
crawlId: string;
dedupKeyIndex = -1;
dedupCurrFilename = "";
dedupeKeyIndex = -1;
dedupeCurrFilename = "";
sourceDone = "src:d";
sourceQ = "src:q";
@ -219,61 +219,61 @@ export class RedisDedupIndex {
sourceP = "src:p";
pendingPrefix = "pending:q:";
constructor(dedupRedis: Redis, crawlId: string) {
this.dedupRedis = dedupRedis;
constructor(dedupeRedis: Redis, crawlId: string) {
this.dedupeRedis = dedupeRedis;
this.crawlId = crawlId;
}
// DEDUP SOURCE
// DEDUPE SOURCE
async addSourceForDedup(filename: string) {
//const count = await this.dedupRedis.incr(`c:${key}:count`) - 1;
async addSourceForDedupe(filename: string) {
//const count = await this.dedupeRedis.incr(`c:${key}:count`) - 1;
const count =
(await this.dedupRedis.rpush(
(await this.dedupeRedis.rpush(
`c:${this.crawlId}:wacz`,
JSON.stringify({ filename }),
)) - 1;
this.dedupCurrFilename = filename;
this.dedupKeyIndex = count;
this.dedupeCurrFilename = filename;
this.dedupeKeyIndex = count;
}
async updateDedupSource(wacz: WACZ) {
if (this.dedupKeyIndex < 0) {
async updateDedupeSource(wacz: WACZ) {
if (this.dedupeKeyIndex < 0) {
return;
}
const value: DedupSourceEntry = {
filename: wacz.getLocalFilename() || this.dedupCurrFilename,
const value: DedupeSourceEntry = {
filename: wacz.getLocalFilename() || this.dedupeCurrFilename,
hash: wacz.getHash(),
size: wacz.getSize(),
};
await this.dedupRedis.lset(
await this.dedupeRedis.lset(
`c:${this.crawlId}:wacz`,
this.dedupKeyIndex,
this.dedupeKeyIndex,
JSON.stringify(value),
);
await this.commitDedupDone();
await this.commitDedupeDone();
}
// COMMIT DEDUP TO SHARED INDEX
// COMMIT DEDUPE TO SHARED INDEX
async commitDedupDone() {
for await (const hashes of this.dedupRedis.hscanStream(
async commitDedupeDone() {
for await (const hashes of this.dedupeRedis.hscanStream(
`h:${this.crawlId}`,
)) {
let value = false;
for (const hash of hashes) {
if (!value) {
await this.dedupRedis.hset(DUPE_ALL_HASH_KEY, hash, this.crawlId);
await this.dedupeRedis.hsetnx(DUPE_ALL_HASH_KEY, hash, this.crawlId);
}
value = !value;
}
}
// add to crawls list
await this.dedupRedis.sadd(DUPE_ALL_CRAWLS, this.crawlId);
await this.dedupeRedis.sadd(DUPE_ALL_CRAWLS, this.crawlId);
}
// GET OR ADD INDIVIDUAL HASHES
@ -286,12 +286,12 @@ export class RedisDedupIndex {
hash = hash.split(":").at(-1)!;
// first, check the shared key
let crawlId = await this.dedupRedis.hget(key, hash);
let crawlId = await this.dedupeRedis.hget(key, hash);
if (!crawlId) {
// otherwise, try current crawl
crawlId = this.crawlId;
}
const value = await this.dedupRedis.hget(`h:${crawlId}`, hash);
const value = await this.dedupeRedis.hget(`h:${crawlId}`, hash);
if (!value) {
return null;
}
@ -302,37 +302,37 @@ export class RedisDedupIndex {
async addHashDupe(hash: string, url: string, date: string, crawlId?: string) {
date = date.replace(/[^\d]/g, "");
hash = hash.split(":").at(-1)!;
const val = `${this.dedupKeyIndex} ${date} ${url}`;
await this.dedupRedis.hsetnx(`h:${crawlId || this.crawlId}`, hash, val);
const val = `${this.dedupeKeyIndex} ${date} ${url}`;
await this.dedupeRedis.hsetnx(`h:${crawlId || this.crawlId}`, hash, val);
}
// IMPORT
async queueImportSource(id: string, data: string) {
// already handled this source
if (await this.dedupRedis.sismember(this.sourceDone, id)) {
if (await this.dedupeRedis.sismember(this.sourceDone, id)) {
return;
}
await this.dedupRedis.lpush(this.sourceQ, data);
await this.dedupeRedis.lpush(this.sourceQ, data);
}
async addImportedForCrawl(hash: string, crawlId: string) {
await this.dedupRedis.hset(DUPE_ALL_HASH_KEY, hash, crawlId);
await this.dedupeRedis.hset(DUPE_ALL_HASH_KEY, hash, crawlId);
}
async addImportedSourceForDedup(key: string, entry: DedupSourceEntry) {
async addImportedSourceForDedupe(key: string, entry: DedupeSourceEntry) {
return (
(await this.dedupRedis.rpush(`c:${key}:wacz`, JSON.stringify(entry))) - 1
(await this.dedupeRedis.rpush(`c:${key}:wacz`, JSON.stringify(entry))) - 1
);
}
async markImportSourceDone(id: string, crawlId: string) {
await this.dedupRedis.sadd(this.sourceDone, id);
await this.dedupRedis.sadd(DUPE_ALL_CRAWLS, crawlId);
await this.dedupeRedis.sadd(this.sourceDone, id);
await this.dedupeRedis.sadd(DUPE_ALL_CRAWLS, crawlId);
}
async nextQueuedImportSource() {
let res: string | null = await this.dedupRedis.lmove(
let res: string | null = await this.dedupeRedis.lmove(
this.sourceQ,
this.pendingQ,
"RIGHT",
@ -340,9 +340,9 @@ export class RedisDedupIndex {
);
// use circular pending Q to support retries
if (!res) {
const len = await this.dedupRedis.llen(this.pendingQ);
const len = await this.dedupeRedis.llen(this.pendingQ);
for (let i = 0; i < len; i++) {
res = await this.dedupRedis.lmove(
res = await this.dedupeRedis.lmove(
this.pendingQ,
this.pendingQ,
"RIGHT",
@ -350,7 +350,7 @@ export class RedisDedupIndex {
);
if (res) {
const { id } = JSON.parse(res);
if (await this.dedupRedis.get(this.pendingPrefix + id)) {
if (await this.dedupeRedis.get(this.pendingPrefix + id)) {
res = null;
continue;
} else {
@ -364,20 +364,20 @@ export class RedisDedupIndex {
return null;
}
await this.dedupRedis.lrem(this.pendingQ, 1, res);
await this.dedupeRedis.lrem(this.pendingQ, 1, res);
const { name } = JSON.parse(res);
const total = (await this.dedupRedis.llen(this.sourceQ)) + 1;
await this.dedupRedis.setex(this.pendingPrefix + name, "1", 300);
const total = (await this.dedupeRedis.llen(this.sourceQ)) + 1;
await this.dedupeRedis.setex(this.pendingPrefix + name, "1", 300);
return { name, entry: res, total };
}
async markImportFinishedTS() {
await this.dedupRedis.set("last_update_ts", new Date().toISOString());
await this.dedupeRedis.set("last_update_ts", new Date().toISOString());
}
}
// ============================================================================
export class RedisCrawlState extends RedisDedupIndex {
export class RedisCrawlState extends RedisDedupeIndex {
redis: Redis;
maxRetries: number;
@ -408,9 +408,9 @@ export class RedisCrawlState extends RedisDedupIndex {
maxPageTime: number,
uid: string,
maxRetries?: number,
dedupRedis?: Redis,
dedupeRedis?: Redis,
) {
super(dedupRedis || redis, key);
super(dedupeRedis || redis, key);
this.redis = redis;
this.uid = uid;
@ -717,7 +717,7 @@ return inx;
"state",
);
}
await this.addSourceForDedup(this.waczFilename!);
await this.addSourceForDedupe(this.waczFilename!);
return this.waczFilename!;
}
@ -1200,13 +1200,18 @@ return inx;
async addIfNoDupe(key: string, url: string, status: number) {
return (
(await this.redis.sadd(key, normalizeDedupStatus(status) + "|" + url)) ===
1
(await this.redis.sadd(
key,
normalizeDedupeStatus(status) + "|" + url,
)) === 1
);
}
async removeDupe(key: string, url: string, status: number) {
return await this.redis.srem(key, normalizeDedupStatus(status) + "|" + url);
return await this.redis.srem(
key,
normalizeDedupeStatus(status) + "|" + url,
);
}
async isInUserSet(value: string) {
@ -1341,20 +1346,20 @@ return inx;
// DEPENDENT CRAWLS FOR DEDUPE
async addDupeCrawlRef(crawlId: string, index: string) {
await this.redis.sadd(`${this.uid}:dindex`, crawlId + " " + index);
await this.redis.sadd(`${this.crawlId}:depCrawls`, crawlId);
await this.redis.sadd(`${this.uid}:duperef`, crawlId + " " + index);
await this.redis.sadd(`${this.crawlId}:reqCrawls`, crawlId);
}
async clearDupeFileRef() {
await this.redis.del(`${this.uid}:dindex`);
await this.redis.del(`${this.uid}:duperef`);
}
async getDupeDependentSources() {
const dependIndexes = await this.redis.smembers(`${this.uid}:dindex`);
const dependRefs = await this.redis.smembers(`${this.uid}:duperef`);
const crawlIds = [];
for (const value of dependIndexes) {
for (const value of dependRefs) {
const [crawlId, index] = value.split(" ");
const source = await this.dedupRedis.lindex(
const source = await this.dedupeRedis.lindex(
`c:${crawlId}:wacz`,
Number(index),
);

View file

@ -13,9 +13,9 @@ let redisId;
let numResponses = 0;
beforeAll(() => {
execSync("docker network create dedup");
execSync("docker network create dedupe");
redisId = execSync("docker run --rm --network=dedup -p 37379:6379 --name dedup-redis -d redis");
redisId = execSync("docker run --rm --network=dedupe -p 37379:6379 --name dedupe-redis -d redis");
});
afterAll(async () => {
@ -25,13 +25,13 @@ afterAll(async () => {
//await Promise.allSettled([crawler1, crawler2]);
execSync("docker network rm dedup");
execSync("docker network rm dedupe");
});
function runCrawl(name, db="0") {
fs.rmSync(`./test-crawls/collections/${name}`, { recursive: true, force: true });
const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupUrl redis://dedup-redis:6379/${db} --generateWACZ`);
const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupeUrl redis://dedupe-redis:6379/${db} --generateWACZ`);
return new Promise((resolve) => {
crawler.on("exit", (code) => {
@ -70,15 +70,15 @@ function loadDataPackageRelated(name) {
test("check revisit records written on duplicate crawl", async () => {
expect(await runCrawl("dedup-test-orig")).toBe(0);
expect(await runCrawl("dedup-test-dupe")).toBe(0);
expect(await runCrawl("dedupe-test-orig")).toBe(0);
expect(await runCrawl("dedupe-test-dupe")).toBe(0);
let statusCode = -1;
let response = 0;
let revisit = 0;
const parserOrig = loadFirstWARC("dedup-test-orig");
const parserOrig = loadFirstWARC("dedupe-test-orig");
for await (const record of parserOrig) {
if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) {
@ -90,7 +90,7 @@ test("check revisit records written on duplicate crawl", async () => {
}
}
const dupeOrig = loadFirstWARC("dedup-test-dupe");
const dupeOrig = loadFirstWARC("dedupe-test-dupe");
for await (const record of dupeOrig) {
if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) {
@ -113,7 +113,7 @@ test("check revisit records written on duplicate crawl", async () => {
test("import index and crawl dupe", async () => {
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedup-test-orig/dedup-test-orig.wacz --sourceCrawlId dedup-test-orig --redisDedupUrl redis://dedup-redis:6379/1`);
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedupe-test-orig/dedupe-test-orig.wacz --sourceCrawlId dedupe-test-orig --redisDedupeUrl redis://dedupe-redis:6379/1`);
const redis = new Redis("redis://127.0.0.1:37379/1", { lazyConnect: true, retryStrategy: () => null });
@ -124,9 +124,9 @@ test("import index and crawl dupe", async () => {
test("imported crawl dupe matches previous dupe count", async () => {
expect(await runCrawl("dedup-test-dupe-2", 1)).toBe(0);
expect(await runCrawl("dedupe-test-dupe-2", 1)).toBe(0);
const dupeOrig = loadFirstWARC("dedup-test-dupe-2");
const dupeOrig = loadFirstWARC("dedupe-test-dupe-2");
let revisit = 0;
@ -145,22 +145,22 @@ test("imported crawl dupe matches previous dupe count", async () => {
});
test("test requires in datapackage.json of wacz deduped against previous crawl", () => {
const res1 = loadDataPackageRelated("dedup-test-dupe");
const res1 = loadDataPackageRelated("dedupe-test-dupe");
expect(res1.requires.length).toBe(1);
const entry = res1.requires[0];
expect(entry.crawlId).toBe("dedup-test-orig");
expect(entry.filename).toBe("dedup-test-orig.wacz");
expect(entry.crawlId).toBe("dedupe-test-orig");
expect(entry.filename).toBe("dedupe-test-orig.wacz");
expect(entry.size).toBeDefined();
expect(entry.hash).toBeDefined();
});
test("test requires in datapackage.json of wacz deduped against import from wacz", () => {
const res2 = loadDataPackageRelated("dedup-test-dupe-2");
const res2 = loadDataPackageRelated("dedupe-test-dupe-2");
expect(res2.requires.length).toBe(1);
const entry2 = res2.requires[0];
expect(entry2.crawlId).toBe("dedup-test-orig");
expect(entry2.filename).toBe("dedup-test-orig.wacz");
expect(entry2.crawlId).toBe("dedupe-test-orig");
expect(entry2.filename).toBe("dedupe-test-orig.wacz");
// undefined as importing from single WACZ and not computing
expect(entry2.size).toBeUndefined();
expect(entry2.hash).toBeUndefined();