From 7c37672ae931a0e609ab4654d45f31ccee10c3f9 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 25 Oct 2025 15:41:31 -0700 Subject: [PATCH] add removing option to also remove unused crawls if doing a full sync, disable by default --- src/indexer.ts | 32 +++++++++++++++++++++++----- src/util/state.ts | 54 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/src/indexer.ts b/src/indexer.ts index f08e59ca..ac31f6fb 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -9,6 +9,7 @@ import { initRedisWaitForSuccess } from "./util/redis.js"; import { AsyncIterReader } from "warcio"; import { RedisDedupeIndex } from "./util/state.js"; import { basename } from "node:path"; +import { sleep } from "./util/timing.js"; export type DedupeIndexEntry = { name: string; @@ -42,6 +43,13 @@ export class CrawlIndexer { type: "string", required: false, }, + + removing: { + describe: "If set, also remove unsued crawls/hashes from index", + type: "boolean", + required: false, + default: false, + }, }) .parseSync(); } @@ -62,16 +70,24 @@ export class CrawlIndexer { for await (const entry of this.iterWACZ({ url: params.sourceUrl, - name: params.sourceCrawlId || params.sourceUrl, + name: basename(params.sourceUrl), + crawlId: params.sourceCrawlId, })) { await dedupeIndex.queueImportSource(entry.name, JSON.stringify(entry)); + if (params.removing && entry.crawlId) { + await dedupeIndex.markNotRemoved(entry.crawlId); + } } let count = 0; + let total = 0; let res; while ((res = await dedupeIndex.nextQueuedImportSource())) { - const { name, entry, total } = res; + const { name, entry, remaining } = res; + if (!total) { + total = remaining; + } const { url, crawlId, size, hash } = JSON.parse( entry, ) as DedupeIndexEntry; @@ -107,7 +123,15 @@ export class CrawlIndexer { await dedupeIndex.markImportSourceDone(name, crawlIdReal); } + if (params.removing) { + const removeset = await dedupeIndex.getRemoveSet(); + if (removeset.size > 0) { + await dedupeIndex.removeCrawlIds(removeset); + } + } + logger.info("Done!"); + await sleep(30); await dedupeIndex.markImportFinishedTS(); process.exit(ExitCodes.Success); } @@ -180,7 +204,6 @@ export class CrawlIndexer { } async *iterWACZ(entry: DedupeIndexEntry): AsyncIterable { - const { name } = entry; let { url } = entry; let path = url; @@ -191,8 +214,7 @@ export class CrawlIndexer { } if (path.endsWith(".wacz")) { - console.log({ ...entry, name: basename(name || url) }); - yield { ...entry, name: basename(name || url) }; + yield entry; } else if (path.endsWith(".json")) { if (!url.startsWith("http://") && !url.startsWith("https://")) { const blob = await openAsBlob(url); diff --git a/src/util/state.ts b/src/util/state.ts index 2a91ae48..6a8de1ba 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -263,12 +263,12 @@ export class RedisDedupeIndex { for await (const hashes of this.dedupeRedis.hscanStream( `h:${this.crawlId}`, )) { - let value = false; + let isValue = false; for (const hash of hashes) { - if (!value) { + if (!isValue) { await this.dedupeRedis.hsetnx(DUPE_ALL_HASH_KEY, hash, this.crawlId); } - value = !value; + isValue = !isValue; } } @@ -371,14 +371,58 @@ export class RedisDedupeIndex { await this.dedupeRedis.lrem(this.pendingQ, 1, res); const { name } = JSON.parse(res); - const total = (await this.dedupeRedis.llen(this.sourceQ)) + 1; + const remaining = (await this.dedupeRedis.llen(this.sourceQ)) + 1; await this.dedupeRedis.setex(this.pendingPrefix + name, "1", 300); - return { name, entry: res, total }; + return { name, entry: res, remaining }; } async markImportFinishedTS() { await this.dedupeRedis.set("last_update_ts", new Date().toISOString()); } + + // REMOVE ON IMPORT + + async markNotRemoved(crawlId: string) { + await this.dedupeRedis.sadd("noremove", crawlId); + } + + async getRemoveSet() { + const removeSet = await this.dedupeRedis.sdiff(DUPE_ALL_CRAWLS, "noremove"); + await this.dedupeRedis.del("noremove"); + return new Set(removeSet); + } + + async removeCrawlIds(toRemove: Set) { + for await (const hashes of this.dedupeRedis.hscanStream( + DUPE_ALL_HASH_KEY, + )) { + let isValue = false; + let key = ""; + for (const hash of hashes) { + if (!isValue) { + key = hash; + } + if (key && isValue && toRemove.has(hash)) { + await this.dedupeRedis.hdel(DUPE_ALL_HASH_KEY, key); + } + isValue = !isValue; + } + } + + for (const crawlId of toRemove) { + const allWACZ = await this.dedupeRedis.lrange(`c:${crawlId}:wacz`, 0, -1); + for (const waczdata of allWACZ) { + try { + const { filename } = JSON.parse(waczdata); + await this.dedupeRedis.srem(this.sourceDone, filename); + } catch (e) { + // ignore + } + } + await this.dedupeRedis.del(`h:${crawlId}`, `c:${crawlId}:wacz`); + await this.dedupeRedis.srem(DUPE_ALL_CRAWLS, crawlId); + } + } } // ============================================================================