mirror of
https://github.com/webrecorder/browsertrix-crawler.git
synced 2025-12-08 06:09:48 +00:00
update to new data model:
- hashes stored in separate crawl specific entries, h:<crawlid> - wacz files stored in crawl specific list, c:<crawlid>:wacz - hashes committed to 'alldupes' hashset when crawl is complete, crawls added to 'allcrawls' set - store filename, crawlId in related.requires list entries for each wacz
This commit is contained in:
parent
c3dc62dae5
commit
61acafd234
9 changed files with 244 additions and 130 deletions
|
|
@ -25,7 +25,7 @@
|
|||
"fetch-socks": "^1.3.0",
|
||||
"get-folder-size": "^4.0.0",
|
||||
"husky": "^8.0.3",
|
||||
"ioredis": "^5.3.2",
|
||||
"ioredis": "^5.8.2",
|
||||
"iso-639-1": "^3.1.5",
|
||||
"js-levenshtein": "^1.1.6",
|
||||
"js-yaml": "^4.1.0",
|
||||
|
|
|
|||
|
|
@ -2041,9 +2041,13 @@ self.__bx_behaviors.selectMainBehavior();
|
|||
|
||||
await this.storage.uploadCollWACZ(wacz, targetFilename, isFinished);
|
||||
|
||||
await this.crawlState.updateDedupSource(wacz);
|
||||
|
||||
await this.crawlState.clearWACZFilename();
|
||||
|
||||
return true;
|
||||
} else {
|
||||
await this.crawlState.updateDedupSource(wacz);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
#!/usr/bin/env node
|
||||
|
||||
import yargs from "yargs";
|
||||
import { logger } from "./util/logger.js";
|
||||
import { getInfoString } from "./util/file_reader.js";
|
||||
|
|
@ -9,6 +8,15 @@ import { ExitCodes } from "./util/constants.js";
|
|||
import { initRedisWaitForSuccess } from "./util/redis.js";
|
||||
import { AsyncIterReader } from "warcio";
|
||||
import { RedisDedupIndex } from "./util/state.js";
|
||||
import { basename } from "node:path";
|
||||
|
||||
export type DedupIndexEntry = {
|
||||
name: string;
|
||||
url: string;
|
||||
crawlId?: string;
|
||||
size?: number;
|
||||
hash?: string;
|
||||
};
|
||||
|
||||
export class CrawlIndexer {
|
||||
constructor() {}
|
||||
|
|
@ -29,7 +37,7 @@ export class CrawlIndexer {
|
|||
required: true,
|
||||
},
|
||||
|
||||
sourceId: {
|
||||
sourceCrawlId: {
|
||||
describe: "If single WACZ, use this id as source id",
|
||||
type: "string",
|
||||
required: false,
|
||||
|
|
@ -52,38 +60,50 @@ export class CrawlIndexer {
|
|||
const redis = await initRedisWaitForSuccess(params.redisDedupUrl);
|
||||
const dedupIndex = new RedisDedupIndex(redis, "");
|
||||
|
||||
for await (const [name, waczfile] of this.iterWACZ(params.sourceUrl)) {
|
||||
await dedupIndex.addHashSource(name, waczfile);
|
||||
for await (const entry of this.iterWACZ(params.sourceUrl)) {
|
||||
await dedupIndex.queueImportSource(entry.name, JSON.stringify(entry));
|
||||
}
|
||||
|
||||
let count = 0;
|
||||
let res;
|
||||
|
||||
while ((res = await dedupIndex.nextQueuedHashSource())) {
|
||||
const { id, url, total } = res;
|
||||
while ((res = await dedupIndex.nextQueuedImportSource())) {
|
||||
const { name, entry, total } = res;
|
||||
const { url, crawlId, size, hash } = JSON.parse(entry) as DedupIndexEntry;
|
||||
count += 1;
|
||||
const loader = new WACZLoader(url);
|
||||
logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile: url });
|
||||
|
||||
const sourceId = params.sourceId && total === 1 ? params.sourceId : url;
|
||||
const crawlIdReal = crawlId || params.sourceCrawlId || url;
|
||||
|
||||
dedupIndex.dedupKeyIndex = await dedupIndex.addToSourcesList(sourceId);
|
||||
await dedupIndex.addImportedSourceForDedup(crawlIdReal, {
|
||||
filename: name,
|
||||
size,
|
||||
hash,
|
||||
});
|
||||
|
||||
for await (const file of loader.iterFiles("indexes/")) {
|
||||
const filename = file.filename;
|
||||
if (filename.endsWith(".cdx.gz")) {
|
||||
logger.debug("Processing CDX GZ Index", { filename });
|
||||
await this.ingestCDXJ(dedupIndex, loader, filename, "gzip");
|
||||
await this.ingestCDXJ(
|
||||
dedupIndex,
|
||||
loader,
|
||||
filename,
|
||||
crawlIdReal,
|
||||
"gzip",
|
||||
);
|
||||
} else if (filename.endsWith(".cdx") || filename.endsWith(".cdxj")) {
|
||||
logger.debug("Processing CDX Index", { filename });
|
||||
await this.ingestCDXJ(dedupIndex, loader, filename);
|
||||
await this.ingestCDXJ(dedupIndex, loader, filename, crawlIdReal);
|
||||
}
|
||||
}
|
||||
await dedupIndex.addDoneSource(id);
|
||||
|
||||
await dedupIndex.markImportSourceDone(name, crawlIdReal);
|
||||
}
|
||||
|
||||
logger.info("Done!");
|
||||
await dedupIndex.markDoneImport();
|
||||
await dedupIndex.markImportFinishedTS();
|
||||
process.exit(ExitCodes.Success);
|
||||
}
|
||||
|
||||
|
|
@ -91,6 +111,7 @@ export class CrawlIndexer {
|
|||
dedupIndex: RedisDedupIndex,
|
||||
loader: WACZLoader,
|
||||
filename: string,
|
||||
crawlId: string,
|
||||
compression?: string,
|
||||
) {
|
||||
let reader = await loader.loadFile(filename);
|
||||
|
|
@ -137,7 +158,8 @@ export class CrawlIndexer {
|
|||
}
|
||||
|
||||
if (url && date && hash) {
|
||||
await dedupIndex.addHashDupe(hash, url, date);
|
||||
await dedupIndex.addHashDupe(hash, url, date, crawlId);
|
||||
await dedupIndex.addImportedForCrawl(hash, crawlId);
|
||||
} else {
|
||||
logger.warn("Skipping invalid CDXJ, data missing", {
|
||||
url,
|
||||
|
|
@ -153,7 +175,7 @@ export class CrawlIndexer {
|
|||
logger.debug("Processed", { count });
|
||||
}
|
||||
|
||||
async *iterWACZ(url: string, name?: string): AsyncIterable<[string, string]> {
|
||||
async *iterWACZ(url: string, name?: string): AsyncIterable<DedupIndexEntry> {
|
||||
let path: string = url;
|
||||
|
||||
try {
|
||||
|
|
@ -163,7 +185,7 @@ export class CrawlIndexer {
|
|||
}
|
||||
|
||||
if (path.endsWith(".wacz")) {
|
||||
yield [name || url, url];
|
||||
yield { name: basename(name || url), url };
|
||||
} else if (path.endsWith(".json")) {
|
||||
if (!url.startsWith("http://") && !url.startsWith("https://")) {
|
||||
const blob = await openAsBlob(url);
|
||||
|
|
@ -174,7 +196,11 @@ export class CrawlIndexer {
|
|||
const json = await resp.json();
|
||||
|
||||
for (const entry of json.resources) {
|
||||
if (entry.path) {
|
||||
const url = entry.path;
|
||||
if (url && url.endsWith(".wacz")) {
|
||||
const { size, hash, crawlId, name } = entry;
|
||||
yield { crawlId, name, url, size, hash };
|
||||
} else {
|
||||
yield* this.iterWACZ(entry.path, entry.name);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,8 +22,8 @@ export const DETECT_SITEMAP = "<detect>";
|
|||
|
||||
export const EXTRACT_TEXT_TYPES = ["to-pages", "to-warc", "final-to-warc"];
|
||||
|
||||
export const HASH_DUPE_KEY = "dupe";
|
||||
export const HASH_DUPE_SOURCE_LIST_KEY = "sources";
|
||||
export const DUPE_ALL_HASH_KEY = "alldupes";
|
||||
export const DUPE_ALL_CRAWLS = "allcrawls";
|
||||
|
||||
export enum BxFunctionBindings {
|
||||
BehaviorLogFunc = "__bx_log",
|
||||
|
|
|
|||
|
|
@ -833,16 +833,15 @@ export class Recorder extends EventEmitter {
|
|||
) {
|
||||
const hash =
|
||||
"sha256:" + createHash("sha256").update(reqresp.payload).digest("hex");
|
||||
const { origUrl, origId } = await this.crawlState.getHashDupe(hash);
|
||||
if (origUrl) {
|
||||
const res = await this.crawlState.getHashDupe(hash);
|
||||
if (res) {
|
||||
const { index, crawlId } = res;
|
||||
const errorReason = "BlockedByResponse";
|
||||
await cdp.send("Fetch.failRequest", {
|
||||
requestId,
|
||||
errorReason,
|
||||
});
|
||||
if (origId) {
|
||||
await this.crawlState.addDupeCrawlRef(origId);
|
||||
}
|
||||
await this.crawlState.addDupeCrawlRef(crawlId, index);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
@ -1685,11 +1684,13 @@ export class Recorder extends EventEmitter {
|
|||
|
||||
const isEmpty = reqresp.readSize === 0;
|
||||
|
||||
if (!isEmpty && url) {
|
||||
const { origUrl, origDate, origId } =
|
||||
await this.crawlState.getHashDupe(hash);
|
||||
let isDupe = false;
|
||||
|
||||
if (hash && origUrl && origDate && origId) {
|
||||
if (!isEmpty && url) {
|
||||
const res = await this.crawlState.getHashDupe(hash);
|
||||
|
||||
if (res) {
|
||||
const { origUrl, origDate, crawlId, index } = res;
|
||||
const date = tsToDate(origDate).toISOString();
|
||||
// always write revisit here
|
||||
// duplicate URLs in same crawl filtered out separately
|
||||
|
|
@ -1700,7 +1701,8 @@ export class Recorder extends EventEmitter {
|
|||
origUrl,
|
||||
date,
|
||||
));
|
||||
await this.crawlState.addDupeCrawlRef(origId);
|
||||
await this.crawlState.addDupeCrawlRef(crawlId, index);
|
||||
isDupe = true;
|
||||
} else {
|
||||
// no dupe, continue
|
||||
}
|
||||
|
|
@ -1734,7 +1736,7 @@ export class Recorder extends EventEmitter {
|
|||
|
||||
this.addPageRecord(reqresp);
|
||||
|
||||
if (!isEmpty) {
|
||||
if (!isEmpty && !isDupe) {
|
||||
await this.crawlState.addHashDupe(hash, url, date);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,12 +7,13 @@ import {
|
|||
MAX_DEPTH,
|
||||
DEFAULT_MAX_RETRIES,
|
||||
ROBOTS_CACHE_LIMIT,
|
||||
HASH_DUPE_KEY,
|
||||
HASH_DUPE_SOURCE_LIST_KEY,
|
||||
DUPE_ALL_HASH_KEY,
|
||||
DUPE_ALL_CRAWLS,
|
||||
} from "./constants.js";
|
||||
import { ScopedSeed } from "./seeds.js";
|
||||
import { Frame } from "puppeteer-core";
|
||||
import { interpolateFilename, UploadResult } from "./storage.js";
|
||||
import { WACZ } from "./wacz.js";
|
||||
|
||||
// ============================================================================
|
||||
export enum LoadState {
|
||||
|
|
@ -190,11 +191,27 @@ export type SaveState = {
|
|||
sitemapDone: boolean;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
export type DedupeEntry = {
|
||||
origDate: string;
|
||||
origUrl: string;
|
||||
index: string;
|
||||
crawlId: string;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
export type DedupSourceEntry = {
|
||||
filename: string;
|
||||
size?: number;
|
||||
hash?: string;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
export class RedisDedupIndex {
|
||||
dedupRedis: Redis;
|
||||
key: string;
|
||||
crawlId: string;
|
||||
dedupKeyIndex = -1;
|
||||
dedupCurrFilename = "";
|
||||
|
||||
sourceDone = "src:d";
|
||||
sourceQ = "src:q";
|
||||
|
|
@ -202,72 +219,118 @@ export class RedisDedupIndex {
|
|||
sourceP = "src:p";
|
||||
pendingPrefix = "pending:q:";
|
||||
|
||||
constructor(dedupRedis: Redis, key: string) {
|
||||
constructor(dedupRedis: Redis, crawlId: string) {
|
||||
this.dedupRedis = dedupRedis;
|
||||
this.key = key;
|
||||
this.crawlId = crawlId;
|
||||
}
|
||||
|
||||
private async getKeyIndex() {
|
||||
if (!this.key) {
|
||||
// DEDUP SOURCE
|
||||
|
||||
async addSourceForDedup(filename: string) {
|
||||
//const count = await this.dedupRedis.incr(`c:${key}:count`) - 1;
|
||||
const count =
|
||||
(await this.dedupRedis.rpush(
|
||||
`c:${this.crawlId}:wacz`,
|
||||
JSON.stringify({ filename }),
|
||||
)) - 1;
|
||||
this.dedupCurrFilename = filename;
|
||||
this.dedupKeyIndex = count;
|
||||
}
|
||||
|
||||
async updateDedupSource(wacz: WACZ) {
|
||||
if (this.dedupKeyIndex < 0) {
|
||||
return;
|
||||
}
|
||||
const res = await this.dedupRedis.lpos(HASH_DUPE_SOURCE_LIST_KEY, this.key);
|
||||
if (res) {
|
||||
this.dedupKeyIndex = res;
|
||||
} else {
|
||||
this.dedupKeyIndex = await this.addToSourcesList(this.key);
|
||||
}
|
||||
return this.dedupKeyIndex;
|
||||
|
||||
const value: DedupSourceEntry = {
|
||||
filename: wacz.getLocalFilename() || this.dedupCurrFilename,
|
||||
hash: wacz.getHash(),
|
||||
size: wacz.getSize(),
|
||||
};
|
||||
|
||||
await this.dedupRedis.lset(
|
||||
`c:${this.crawlId}:wacz`,
|
||||
this.dedupKeyIndex,
|
||||
JSON.stringify(value),
|
||||
);
|
||||
|
||||
await this.commitDedupDone();
|
||||
}
|
||||
|
||||
async addToSourcesList(crawlId: string) {
|
||||
return (
|
||||
(await this.dedupRedis.rpush(HASH_DUPE_SOURCE_LIST_KEY, crawlId)) - 1
|
||||
);
|
||||
// COMMIT DEDUP TO SHARED INDEX
|
||||
|
||||
async commitDedupDone() {
|
||||
for await (const hashes of this.dedupRedis.hscanStream(
|
||||
`h:${this.crawlId}`,
|
||||
{
|
||||
noValues: true,
|
||||
},
|
||||
)) {
|
||||
for (const hash of hashes) {
|
||||
await this.dedupRedis.hset(DUPE_ALL_HASH_KEY, hash, this.crawlId);
|
||||
}
|
||||
}
|
||||
|
||||
// add to crawls list
|
||||
await this.dedupRedis.sadd(DUPE_ALL_CRAWLS, this.crawlId);
|
||||
}
|
||||
|
||||
// GET OR ADD INDIVIDUAL HASHES
|
||||
|
||||
async getHashDupe(
|
||||
hash: string,
|
||||
key = HASH_DUPE_KEY,
|
||||
key = DUPE_ALL_HASH_KEY,
|
||||
//url: string,
|
||||
): Promise<{ origDate?: string; origUrl?: string; origId?: string }> {
|
||||
): Promise<DedupeEntry | null> {
|
||||
hash = hash.split(":").at(-1)!;
|
||||
const value = await this.dedupRedis.hget(key, hash);
|
||||
|
||||
// first, check the shared key
|
||||
let crawlId = await this.dedupRedis.hget(key, hash);
|
||||
if (!crawlId) {
|
||||
// otherwise, try current crawl
|
||||
crawlId = this.crawlId;
|
||||
}
|
||||
const value = await this.dedupRedis.hget(`h:${crawlId}`, hash);
|
||||
if (!value) {
|
||||
return {};
|
||||
return null;
|
||||
}
|
||||
const val = value.split(" ");
|
||||
return { origUrl: val[2], origDate: val[1], origId: val[0] };
|
||||
return { origUrl: val[2], origDate: val[1], index: val[0], crawlId };
|
||||
}
|
||||
|
||||
async addHashDupe(
|
||||
hash: string,
|
||||
url: string,
|
||||
date: string,
|
||||
key = HASH_DUPE_KEY,
|
||||
) {
|
||||
async addHashDupe(hash: string, url: string, date: string, crawlId?: string) {
|
||||
date = date.replace(/[^\d]/g, "");
|
||||
hash = hash.split(":").at(-1)!;
|
||||
if (this.dedupKeyIndex < 0) {
|
||||
await this.getKeyIndex();
|
||||
}
|
||||
const val = `${this.dedupKeyIndex} ${date} ${url}`;
|
||||
await this.dedupRedis.hsetnx(key, hash, val);
|
||||
await this.dedupRedis.hsetnx(`h:${crawlId || this.crawlId}`, hash, val);
|
||||
}
|
||||
|
||||
async addHashSource(id: string, url: string) {
|
||||
// IMPORT
|
||||
|
||||
async queueImportSource(id: string, data: string) {
|
||||
// already handled this source
|
||||
if (await this.dedupRedis.sismember(this.sourceDone, id)) {
|
||||
return;
|
||||
}
|
||||
await this.dedupRedis.lpush(this.sourceQ, JSON.stringify({ id, url }));
|
||||
await this.dedupRedis.lpush(this.sourceQ, data);
|
||||
}
|
||||
|
||||
async addDoneSource(id: string) {
|
||||
async addImportedForCrawl(hash: string, crawlId: string) {
|
||||
await this.dedupRedis.hset(DUPE_ALL_HASH_KEY, hash, crawlId);
|
||||
}
|
||||
|
||||
async addImportedSourceForDedup(key: string, entry: DedupSourceEntry) {
|
||||
return (
|
||||
(await this.dedupRedis.rpush(`c:${key}:wacz`, JSON.stringify(entry))) - 1
|
||||
);
|
||||
}
|
||||
|
||||
async markImportSourceDone(id: string, crawlId: string) {
|
||||
await this.dedupRedis.sadd(this.sourceDone, id);
|
||||
await this.dedupRedis.sadd(DUPE_ALL_CRAWLS, crawlId);
|
||||
}
|
||||
|
||||
async nextQueuedHashSource() {
|
||||
async nextQueuedImportSource() {
|
||||
let res: string | null = await this.dedupRedis.lmove(
|
||||
this.sourceQ,
|
||||
this.pendingQ,
|
||||
|
|
@ -301,13 +364,13 @@ export class RedisDedupIndex {
|
|||
}
|
||||
|
||||
await this.dedupRedis.lrem(this.pendingQ, 1, res);
|
||||
const { id, url } = JSON.parse(res);
|
||||
const { name } = JSON.parse(res);
|
||||
const total = (await this.dedupRedis.llen(this.sourceQ)) + 1;
|
||||
await this.dedupRedis.setex(this.pendingPrefix + id, "1", 300);
|
||||
return { id, url, total };
|
||||
await this.dedupRedis.setex(this.pendingPrefix + name, "1", 300);
|
||||
return { name, entry: res, total };
|
||||
}
|
||||
|
||||
async markDoneImport() {
|
||||
async markImportFinishedTS() {
|
||||
await this.dedupRedis.set("last_update_ts", new Date().toISOString());
|
||||
}
|
||||
}
|
||||
|
|
@ -353,28 +416,28 @@ export class RedisCrawlState extends RedisDedupIndex {
|
|||
this.maxPageTime = maxPageTime;
|
||||
this.maxRetries = maxRetries ?? DEFAULT_MAX_RETRIES;
|
||||
|
||||
this.qkey = this.key + ":q";
|
||||
this.pkey = this.key + ":p";
|
||||
this.skey = this.key + ":s";
|
||||
this.qkey = this.crawlId + ":q";
|
||||
this.pkey = this.crawlId + ":p";
|
||||
this.skey = this.crawlId + ":s";
|
||||
// done (integer)
|
||||
this.dkey = this.key + ":d";
|
||||
this.dkey = this.crawlId + ":d";
|
||||
// failed final, no more retry
|
||||
this.fkey = this.key + ":f";
|
||||
this.fkey = this.crawlId + ":f";
|
||||
// crawler errors
|
||||
this.ekey = this.key + ":e";
|
||||
this.ekey = this.crawlId + ":e";
|
||||
// crawler behavior script messages
|
||||
this.bkey = this.key + ":b";
|
||||
this.bkey = this.crawlId + ":b";
|
||||
// cached robots.txt bodies (per-origin)
|
||||
this.rkey = this.key + ":r";
|
||||
this.rkey = this.crawlId + ":r";
|
||||
// LRU cache of robots.txt keys
|
||||
this.lkey = this.key + ":l";
|
||||
this.lkey = this.crawlId + ":l";
|
||||
// pages
|
||||
this.pageskey = this.key + ":pages";
|
||||
this.pageskey = this.crawlId + ":pages";
|
||||
|
||||
this.esKey = this.key + ":extraSeeds";
|
||||
this.esMap = this.key + ":esMap";
|
||||
this.esKey = this.crawlId + ":extraSeeds";
|
||||
this.esMap = this.crawlId + ":esMap";
|
||||
|
||||
this.sitemapDoneKey = this.key + ":sitemapDone";
|
||||
this.sitemapDoneKey = this.crawlId + ":sitemapDone";
|
||||
|
||||
this._initLuaCommands(this.redis);
|
||||
}
|
||||
|
|
@ -616,29 +679,29 @@ return inx;
|
|||
}
|
||||
|
||||
async setFailReason(reason: string) {
|
||||
await this.redis.set(`${this.key}:failReason`, reason);
|
||||
await this.redis.set(`${this.crawlId}:failReason`, reason);
|
||||
}
|
||||
|
||||
async setStatus(status_: string) {
|
||||
await this.redis.hset(`${this.key}:status`, this.uid, status_);
|
||||
await this.redis.hset(`${this.crawlId}:status`, this.uid, status_);
|
||||
}
|
||||
|
||||
async getStatus(): Promise<string> {
|
||||
return (await this.redis.hget(`${this.key}:status`, this.uid)) || "";
|
||||
return (await this.redis.hget(`${this.crawlId}:status`, this.uid)) || "";
|
||||
}
|
||||
|
||||
async setWACZFilename(): Promise<string> {
|
||||
const filename = process.env.STORE_FILENAME || "@ts-@id.wacz";
|
||||
this.waczFilename = interpolateFilename(filename, this.key);
|
||||
this.waczFilename = interpolateFilename(filename, this.crawlId);
|
||||
if (
|
||||
!(await this.redis.hsetnx(
|
||||
`${this.key}:nextWacz`,
|
||||
`${this.crawlId}:nextWacz`,
|
||||
this.uid,
|
||||
this.waczFilename,
|
||||
))
|
||||
) {
|
||||
this.waczFilename = await this.redis.hget(
|
||||
`${this.key}:nextWacz`,
|
||||
`${this.crawlId}:nextWacz`,
|
||||
this.uid,
|
||||
);
|
||||
logger.debug(
|
||||
|
|
@ -653,6 +716,7 @@ return inx;
|
|||
"state",
|
||||
);
|
||||
}
|
||||
await this.addSourceForDedup(this.waczFilename!);
|
||||
return this.waczFilename!;
|
||||
}
|
||||
|
||||
|
|
@ -664,20 +728,20 @@ return inx;
|
|||
}
|
||||
|
||||
async clearWACZFilename(): Promise<void> {
|
||||
await this.redis.hdel(`${this.key}:nextWacz`, this.uid);
|
||||
await this.redis.hdel(`${this.crawlId}:nextWacz`, this.uid);
|
||||
this.waczFilename = null;
|
||||
}
|
||||
|
||||
async setArchiveSize(size: number) {
|
||||
return await this.redis.hset(`${this.key}:size`, this.uid, size);
|
||||
return await this.redis.hset(`${this.crawlId}:size`, this.uid, size);
|
||||
}
|
||||
|
||||
async isCrawlStopped() {
|
||||
if ((await this.redis.get(`${this.key}:stopping`)) === "1") {
|
||||
if ((await this.redis.get(`${this.crawlId}:stopping`)) === "1") {
|
||||
return true;
|
||||
}
|
||||
|
||||
if ((await this.redis.hget(`${this.key}:stopone`, this.uid)) === "1") {
|
||||
if ((await this.redis.hget(`${this.crawlId}:stopone`, this.uid)) === "1") {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -685,7 +749,7 @@ return inx;
|
|||
}
|
||||
|
||||
async isCrawlPaused() {
|
||||
if ((await this.redis.get(`${this.key}:paused`)) === "1") {
|
||||
if ((await this.redis.get(`${this.crawlId}:paused`)) === "1") {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -693,13 +757,13 @@ return inx;
|
|||
}
|
||||
|
||||
async isCrawlCanceled() {
|
||||
return (await this.redis.get(`${this.key}:canceled`)) === "1";
|
||||
return (await this.redis.get(`${this.crawlId}:canceled`)) === "1";
|
||||
}
|
||||
|
||||
// note: not currently called in crawler, but could be
|
||||
// crawl may be stopped by setting this elsewhere in shared redis
|
||||
async stopCrawl() {
|
||||
await this.redis.set(`${this.key}:stopping`, "1");
|
||||
await this.redis.set(`${this.crawlId}:stopping`, "1");
|
||||
}
|
||||
|
||||
async processMessage(seeds: ScopedSeed[]) {
|
||||
|
|
@ -789,7 +853,7 @@ return inx;
|
|||
}
|
||||
|
||||
async incFailCount() {
|
||||
const key = `${this.key}:status:failcount:${this.uid}`;
|
||||
const key = `${this.crawlId}:status:failcount:${this.uid}`;
|
||||
const res = await this.redis.incr(key);
|
||||
|
||||
// consider failed if 3 failed retries in 60 secs
|
||||
|
|
@ -1145,11 +1209,11 @@ return inx;
|
|||
}
|
||||
|
||||
async isInUserSet(value: string) {
|
||||
return (await this.redis.sismember(this.key + ":user", value)) === 1;
|
||||
return (await this.redis.sismember(this.crawlId + ":user", value)) === 1;
|
||||
}
|
||||
|
||||
async addToUserSet(value: string) {
|
||||
return (await this.redis.sadd(this.key + ":user", value)) === 1;
|
||||
return (await this.redis.sadd(this.crawlId + ":user", value)) === 1;
|
||||
}
|
||||
|
||||
async logError(error: string) {
|
||||
|
|
@ -1274,20 +1338,23 @@ return inx;
|
|||
await this.redis.set(`${this.key}:profileUploaded`, JSON.stringify(result));
|
||||
}
|
||||
|
||||
async addDupeCrawlRef(id: string) {
|
||||
await this.redis.sadd(`${this.key}:dindex`, id);
|
||||
async addDupeCrawlRef(crawlId: string, index: string) {
|
||||
await this.redis.sadd(`${this.crawlId}:dindex`, crawlId + " " + index);
|
||||
}
|
||||
|
||||
async getDupeDependentSources(): Promise<string[]> {
|
||||
const dependIndexes = await this.redis.smembers(`${this.key}:dindex`);
|
||||
async getDupeDependentSources() {
|
||||
const dependIndexes = await this.redis.smembers(`${this.crawlId}:dindex`);
|
||||
const crawlIds = [];
|
||||
for (const inx of dependIndexes) {
|
||||
const crawlId = await this.dedupRedis.lindex(
|
||||
HASH_DUPE_SOURCE_LIST_KEY,
|
||||
Number(inx),
|
||||
for (const value of dependIndexes) {
|
||||
const [crawlId, index] = value.split(" ");
|
||||
const source = await this.dedupRedis.lindex(
|
||||
`c:${crawlId}:wacz`,
|
||||
Number(index),
|
||||
);
|
||||
if (crawlId && crawlId !== this.key) {
|
||||
crawlIds.push(crawlId);
|
||||
if (crawlId && crawlId !== this.crawlId && source) {
|
||||
const entry = JSON.parse(source);
|
||||
entry.crawlId = crawlId;
|
||||
crawlIds.push(entry);
|
||||
}
|
||||
}
|
||||
return crawlIds;
|
||||
|
|
|
|||
|
|
@ -109,6 +109,7 @@ export class WACZ {
|
|||
|
||||
private size = 0;
|
||||
private hash: string = "";
|
||||
private localFilename = "";
|
||||
|
||||
constructor(config: WACZInitOpts, collDir: string) {
|
||||
this.warcs = config.input;
|
||||
|
|
@ -136,7 +137,6 @@ export class WACZ {
|
|||
if (config.requires && config.requires.length) {
|
||||
this.datapackage.relation = { requires: config.requires };
|
||||
}
|
||||
console.log("REQUIRES", config.requires);
|
||||
|
||||
this.signingUrl = config.signingUrl || null;
|
||||
this.signingToken = config.signingToken || null;
|
||||
|
|
@ -201,7 +201,12 @@ export class WACZ {
|
|||
return this.size;
|
||||
}
|
||||
|
||||
getLocalFilename() {
|
||||
return this.localFilename;
|
||||
}
|
||||
|
||||
async generateToFile(filename: string) {
|
||||
this.localFilename = path.basename(filename);
|
||||
await pipeline(this.generate(), fs.createWriteStream(filename));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -113,13 +113,13 @@ test("check revisit records written on duplicate crawl", async () => {
|
|||
|
||||
test("import index and crawl dupe", async () => {
|
||||
|
||||
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedup-test-orig/dedup-test-orig.wacz --sourceId dedup-test-orig --redisDedupUrl redis://dedup-redis:6379/1`);
|
||||
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedup-test-orig/dedup-test-orig.wacz --sourceCrawlId dedup-test-orig --redisDedupUrl redis://dedup-redis:6379/1`);
|
||||
|
||||
const redis = new Redis("redis://127.0.0.1:37379/1", { lazyConnect: true, retryStrategy: () => null });
|
||||
|
||||
await redis.connect({maxRetriesPerRequest: 50});
|
||||
|
||||
expect(await redis.hlen("dupe")).toBe(numResponses);
|
||||
expect(await redis.hlen("alldupes")).toBe(numResponses);
|
||||
});
|
||||
|
||||
|
||||
|
|
@ -144,16 +144,26 @@ test("imported crawl dupe matches previous dupe count", async () => {
|
|||
expect(revisit).toBe(numResponses);
|
||||
});
|
||||
|
||||
test("test requires in wacz 1", () => {
|
||||
|
||||
const expected = {"requires": ["dedup-test-orig"]};
|
||||
|
||||
test("test requires in datapackage.json of wacz deduped against previous crawl", () => {
|
||||
const res1 = loadDataPackageRelated("dedup-test-dupe");
|
||||
|
||||
expect(res1.requires.length).toBe(1);
|
||||
const entry = res1.requires[0];
|
||||
expect(entry.crawlId).toBe("dedup-test-orig");
|
||||
expect(entry.filename).toBe("dedup-test-orig.wacz");
|
||||
expect(entry.size).toBeDefined();
|
||||
expect(entry.hash).toBeDefined();
|
||||
});
|
||||
|
||||
test("test requires in datapackage.json of wacz deduped against import from wacz", () => {
|
||||
const res2 = loadDataPackageRelated("dedup-test-dupe-2");
|
||||
|
||||
expect(res1).toEqual(expected);
|
||||
expect(res1).toEqual(res2);
|
||||
|
||||
expect(res2.requires.length).toBe(1);
|
||||
const entry2 = res2.requires[0];
|
||||
expect(entry2.crawlId).toBe("dedup-test-orig");
|
||||
expect(entry2.filename).toBe("dedup-test-orig.wacz");
|
||||
// undefined as importing from single WACZ and not computing
|
||||
expect(entry2.size).toBeUndefined();
|
||||
expect(entry2.hash).toBeUndefined();
|
||||
});
|
||||
|
||||
|
||||
|
|
|
|||
18
yarn.lock
18
yarn.lock
|
|
@ -370,10 +370,10 @@
|
|||
resolved "https://registry.yarnpkg.com/@humanwhocodes/object-schema/-/object-schema-2.0.3.tgz#4a2868d75d6d6963e423bcf90b7fd1be343409d3"
|
||||
integrity sha512-93zYdMES/c1D69yZiKDBj0V24vqNzB/koF26KPaagAfd3P/4gUlh3Dys5ogAK+Exi9QyzlD8x/08Zt7wIKcDcA==
|
||||
|
||||
"@ioredis/commands@^1.1.1":
|
||||
version "1.2.0"
|
||||
resolved "https://registry.yarnpkg.com/@ioredis/commands/-/commands-1.2.0.tgz#6d61b3097470af1fdbbe622795b8921d42018e11"
|
||||
integrity sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==
|
||||
"@ioredis/commands@1.4.0":
|
||||
version "1.4.0"
|
||||
resolved "https://registry.yarnpkg.com/@ioredis/commands/-/commands-1.4.0.tgz#9f657d51cdd5d2fdb8889592aa4a355546151f25"
|
||||
integrity sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ==
|
||||
|
||||
"@istanbuljs/load-nyc-config@^1.0.0":
|
||||
version "1.1.0"
|
||||
|
|
@ -3014,12 +3014,12 @@ intl-messageformat@^10.5.3:
|
|||
"@formatjs/icu-messageformat-parser" "2.11.2"
|
||||
tslib "^2.8.0"
|
||||
|
||||
ioredis@^5.3.2:
|
||||
version "5.4.1"
|
||||
resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.4.1.tgz#1c56b70b759f01465913887375ed809134296f40"
|
||||
integrity sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==
|
||||
ioredis@^5.8.2:
|
||||
version "5.8.2"
|
||||
resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.8.2.tgz#c7a228a26cf36f17a5a8011148836877780e2e14"
|
||||
integrity sha512-C6uC+kleiIMmjViJINWk80sOQw5lEzse1ZmvD+S/s8p8CWapftSaC+kocGTx6xrbrJ4WmYQGC08ffHLr6ToR6Q==
|
||||
dependencies:
|
||||
"@ioredis/commands" "^1.1.1"
|
||||
"@ioredis/commands" "1.4.0"
|
||||
cluster-key-slot "^1.1.0"
|
||||
debug "^4.3.4"
|
||||
denque "^2.1.0"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue