use dedup redis for queue up wacz files that need to be updated

use pending queue to support retries in case of failure
store both id and actual URL in case URL changes in subsequent retries
This commit is contained in:
Ilya Kreymer 2025-09-22 22:30:08 -07:00
parent ca02f09b5d
commit 78b8847323
2 changed files with 69 additions and 11 deletions

View file

@ -46,19 +46,18 @@ export class CrawlIndexer {
const redis = await initRedisWaitForSuccess(params.redisDedupUrl);
const dedupIndex = new RedisDedupIndex(redis);
const allFiles = [];
for await (const waczfile of this.iterWACZ(params.sourceUrl)) {
allFiles.push(waczfile);
for await (const [name, waczfile] of this.iterWACZ(params.sourceUrl)) {
await dedupIndex.addHashSource(name, waczfile);
}
let count = 0;
const total = allFiles.length;
let res;
for (const waczfile of allFiles) {
while ((res = await dedupIndex.nextQueuedHashSource())) {
const { id, url, total } = res;
count += 1;
const loader = new WACZLoader(waczfile);
logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile });
const loader = new WACZLoader(url);
logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile: url });
for await (const file of loader.iterFiles("indexes/")) {
const filename = file.filename;
if (filename.endsWith(".cdx.gz")) {
@ -69,6 +68,7 @@ export class CrawlIndexer {
await this.ingestCDXJ(dedupIndex, loader, filename);
}
}
await dedupIndex.addDoneSource(id);
}
logger.info("Done!");
@ -141,7 +141,7 @@ export class CrawlIndexer {
logger.debug("Processed", { count });
}
async *iterWACZ(url: string): AsyncIterable<string> {
async *iterWACZ(url: string, name?: string): AsyncIterable<[string, string]> {
let path: string = url;
try {
@ -151,7 +151,7 @@ export class CrawlIndexer {
}
if (path.endsWith(".wacz")) {
yield url;
yield [name || url, url];
} else if (path.endsWith(".json")) {
if (!url.startsWith("http://") && !url.startsWith("https://")) {
const blob = await openAsBlob(url);
@ -163,7 +163,7 @@ export class CrawlIndexer {
for (const entry of json.resources) {
if (entry.path) {
yield* this.iterWACZ(entry.path);
yield* this.iterWACZ(entry.path, entry.name);
}
}
} else {

View file

@ -193,6 +193,12 @@ export type SaveState = {
export class RedisDedupIndex {
dedupRedis: Redis;
sourceDone = "src:d";
sourceQ = "src:q";
pendingQ = "pending:q";
sourceP = "src:p";
pendingPrefix = "pending:q:";
constructor(dedupRedis: Redis) {
this.dedupRedis = dedupRedis;
}
@ -221,6 +227,58 @@ export class RedisDedupIndex {
hash = hash.split(":").at(-1)!;
await this.dedupRedis.hsetnx(key, hash, val);
}
async addHashSource(id: string, url: string) {
// already handled this source
if (await this.dedupRedis.sismember(this.sourceDone, id)) {
return;
}
await this.dedupRedis.lpush(this.sourceQ, JSON.stringify({ id, url }));
}
async addDoneSource(id: string) {
await this.dedupRedis.sadd(this.sourceDone, id);
}
async nextQueuedHashSource() {
let res: string | null = await this.dedupRedis.lmove(
this.sourceQ,
this.pendingQ,
"RIGHT",
"LEFT",
);
// use circular pending Q to support retries
if (!res) {
const len = await this.dedupRedis.llen(this.pendingQ);
for (let i = 0; i < len; i++) {
res = await this.dedupRedis.lmove(
this.pendingQ,
this.pendingQ,
"RIGHT",
"LEFT",
);
if (res) {
const { id } = JSON.parse(res);
if (await this.dedupRedis.get(this.pendingPrefix + id)) {
res = null;
continue;
} else {
break;
}
}
}
}
if (!res) {
return null;
}
await this.dedupRedis.lrem(this.pendingQ, 1, res);
const { id, url } = JSON.parse(res);
const total = await this.dedupRedis.llen(this.sourceQ);
await this.dedupRedis.setex(this.pendingPrefix + id, "1", 300);
return { id, url, total };
}
}
// ============================================================================