indexer optimize: commit only if added

This commit is contained in:
Ilya Kreymer 2025-10-25 13:17:01 -07:00
parent dd8d2e1ea7
commit 0d414f72f1
2 changed files with 24 additions and 19 deletions

View file

@ -60,7 +60,10 @@ export class CrawlIndexer {
const redis = await initRedisWaitForSuccess(params.redisDedupeUrl);
const dedupeIndex = new RedisDedupeIndex(redis, "");
for await (const entry of this.iterWACZ(params.sourceUrl)) {
for await (const entry of this.iterWACZ({
url: params.sourceUrl,
name: params.sourceCrawlId || params.sourceUrl,
})) {
await dedupeIndex.queueImportSource(entry.name, JSON.stringify(entry));
}
@ -160,8 +163,7 @@ export class CrawlIndexer {
}
if (url && date && hash) {
await dedupeIndex.addHashDupe(hash, url, date, crawlId);
await dedupeIndex.addImportedForCrawl(hash, crawlId);
await dedupeIndex.addHashDupe(hash, url, date, crawlId, true);
} else {
logger.warn("Skipping invalid CDXJ, data missing", {
url,
@ -177,8 +179,10 @@ export class CrawlIndexer {
logger.debug("Processed", { count });
}
async *iterWACZ(url: string, name?: string): AsyncIterable<DedupeIndexEntry> {
let path: string = url;
async *iterWACZ(entry: DedupeIndexEntry): AsyncIterable<DedupeIndexEntry> {
const { name } = entry;
let { url } = entry;
let path = url;
try {
path = new URL(url).pathname;
@ -187,7 +191,8 @@ export class CrawlIndexer {
}
if (path.endsWith(".wacz")) {
yield { name: basename(name || url), url };
console.log({ ...entry, name: basename(name || url) });
yield { ...entry, name: basename(name || url) };
} else if (path.endsWith(".json")) {
if (!url.startsWith("http://") && !url.startsWith("https://")) {
const blob = await openAsBlob(url);
@ -198,13 +203,8 @@ export class CrawlIndexer {
const json = await resp.json();
for (const entry of json.resources) {
const url = entry.path;
if (url && url.endsWith(".wacz")) {
const { size, hash, crawlId, name } = entry;
yield { crawlId, name, url, size, hash };
} else {
yield* this.iterWACZ(entry.path, entry.name);
}
entry.url = entry.path;
yield* this.iterWACZ(entry);
}
} else {
logger.warn("Unknown source", { url }, "replay");

View file

@ -299,11 +299,20 @@ export class RedisDedupeIndex {
return { origUrl: val[2], origDate: val[1], index: val[0], crawlId };
}
async addHashDupe(hash: string, url: string, date: string, crawlId?: string) {
async addHashDupe(
hash: string,
url: string,
date: string,
crawlId?: string,
commit = false,
) {
date = date.replace(/[^\d]/g, "");
hash = hash.split(":").at(-1)!;
const val = `${this.dedupeKeyIndex} ${date} ${url}`;
await this.dedupeRedis.hsetnx(`h:${crawlId || this.crawlId}`, hash, val);
crawlId = crawlId || this.crawlId;
if ((await this.dedupeRedis.hsetnx(`h:${crawlId}`, hash, val)) && commit) {
await this.dedupeRedis.hsetnx(DUPE_ALL_HASH_KEY, hash, crawlId);
}
}
// IMPORT
@ -316,10 +325,6 @@ export class RedisDedupeIndex {
await this.dedupeRedis.lpush(this.sourceQ, data);
}
async addImportedForCrawl(hash: string, crawlId: string) {
await this.dedupeRedis.hset(DUPE_ALL_HASH_KEY, hash, crawlId);
}
async addImportedSourceForDedupe(key: string, entry: DedupeSourceEntry) {
return (
(await this.dedupeRedis.rpush(`c:${key}:wacz`, JSON.stringify(entry))) - 1