mirror of
https://github.com/webrecorder/browsertrix-crawler.git
synced 2025-12-08 06:09:48 +00:00
- track source index for each hash, so entry becomes '<source index> <date> <url>'
- entry for source index can contain the crawl id (or possibly wacz and crawl id) - also store dependent sources in relation.requires in datapackage.json - tests: update tests to check for relation.requires
This commit is contained in:
parent
8d53399455
commit
298b901558
7 changed files with 113 additions and 16 deletions
|
|
@ -1980,6 +1980,8 @@ self.__bx_behaviors.selectMainBehavior();
|
||||||
|
|
||||||
await this.closeLog();
|
await this.closeLog();
|
||||||
|
|
||||||
|
const requires = await this.crawlState.getDupeDependentSources();
|
||||||
|
|
||||||
const waczOpts: WACZInitOpts = {
|
const waczOpts: WACZInitOpts = {
|
||||||
input: warcFileList.map((x) => path.join(this.archivesDir, x)),
|
input: warcFileList.map((x) => path.join(this.archivesDir, x)),
|
||||||
output: waczPath,
|
output: waczPath,
|
||||||
|
|
@ -1988,6 +1990,7 @@ self.__bx_behaviors.selectMainBehavior();
|
||||||
warcCdxDir: this.warcCdxDir,
|
warcCdxDir: this.warcCdxDir,
|
||||||
indexesDir: this.indexesDir,
|
indexesDir: this.indexesDir,
|
||||||
softwareString: this.infoString,
|
softwareString: this.infoString,
|
||||||
|
requires,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (process.env.WACZ_SIGN_URL) {
|
if (process.env.WACZ_SIGN_URL) {
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,12 @@ export class CrawlIndexer {
|
||||||
type: "string",
|
type: "string",
|
||||||
required: true,
|
required: true,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
sourceId: {
|
||||||
|
describe: "If single WACZ, use this id as source id",
|
||||||
|
type: "string",
|
||||||
|
required: false,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
.parseSync();
|
.parseSync();
|
||||||
}
|
}
|
||||||
|
|
@ -44,7 +50,7 @@ export class CrawlIndexer {
|
||||||
const params = this.initArgs();
|
const params = this.initArgs();
|
||||||
|
|
||||||
const redis = await initRedisWaitForSuccess(params.redisDedupUrl);
|
const redis = await initRedisWaitForSuccess(params.redisDedupUrl);
|
||||||
const dedupIndex = new RedisDedupIndex(redis);
|
const dedupIndex = new RedisDedupIndex(redis, "");
|
||||||
|
|
||||||
for await (const [name, waczfile] of this.iterWACZ(params.sourceUrl)) {
|
for await (const [name, waczfile] of this.iterWACZ(params.sourceUrl)) {
|
||||||
await dedupIndex.addHashSource(name, waczfile);
|
await dedupIndex.addHashSource(name, waczfile);
|
||||||
|
|
@ -58,6 +64,11 @@ export class CrawlIndexer {
|
||||||
count += 1;
|
count += 1;
|
||||||
const loader = new WACZLoader(url);
|
const loader = new WACZLoader(url);
|
||||||
logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile: url });
|
logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile: url });
|
||||||
|
|
||||||
|
const sourceId = params.sourceId && total === 1 ? params.sourceId : url;
|
||||||
|
|
||||||
|
dedupIndex.dedupKeyIndex = await dedupIndex.addToSourcesList(sourceId);
|
||||||
|
|
||||||
for await (const file of loader.iterFiles("indexes/")) {
|
for await (const file of loader.iterFiles("indexes/")) {
|
||||||
const filename = file.filename;
|
const filename = file.filename;
|
||||||
if (filename.endsWith(".cdx.gz")) {
|
if (filename.endsWith(".cdx.gz")) {
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ export const DETECT_SITEMAP = "<detect>";
|
||||||
export const EXTRACT_TEXT_TYPES = ["to-pages", "to-warc", "final-to-warc"];
|
export const EXTRACT_TEXT_TYPES = ["to-pages", "to-warc", "final-to-warc"];
|
||||||
|
|
||||||
export const HASH_DUPE_KEY = "dupe";
|
export const HASH_DUPE_KEY = "dupe";
|
||||||
|
export const HASH_DUPE_SOURCE_LIST_KEY = "sources";
|
||||||
|
|
||||||
export enum BxFunctionBindings {
|
export enum BxFunctionBindings {
|
||||||
BehaviorLogFunc = "__bx_log",
|
BehaviorLogFunc = "__bx_log",
|
||||||
|
|
|
||||||
|
|
@ -833,13 +833,16 @@ export class Recorder extends EventEmitter {
|
||||||
) {
|
) {
|
||||||
const hash =
|
const hash =
|
||||||
"sha256:" + createHash("sha256").update(reqresp.payload).digest("hex");
|
"sha256:" + createHash("sha256").update(reqresp.payload).digest("hex");
|
||||||
const { origUrl } = await this.crawlState.getHashDupe(hash);
|
const { origUrl, origId } = await this.crawlState.getHashDupe(hash);
|
||||||
if (origUrl) {
|
if (origUrl) {
|
||||||
const errorReason = "BlockedByResponse";
|
const errorReason = "BlockedByResponse";
|
||||||
await cdp.send("Fetch.failRequest", {
|
await cdp.send("Fetch.failRequest", {
|
||||||
requestId,
|
requestId,
|
||||||
errorReason,
|
errorReason,
|
||||||
});
|
});
|
||||||
|
if (origId) {
|
||||||
|
await this.crawlState.addDupeCrawlRef(origId);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1683,9 +1686,10 @@ export class Recorder extends EventEmitter {
|
||||||
const isEmpty = reqresp.readSize === 0;
|
const isEmpty = reqresp.readSize === 0;
|
||||||
|
|
||||||
if (!isEmpty && url) {
|
if (!isEmpty && url) {
|
||||||
const { origUrl, origDate } = await this.crawlState.getHashDupe(hash);
|
const { origUrl, origDate, origId } =
|
||||||
|
await this.crawlState.getHashDupe(hash);
|
||||||
|
|
||||||
if (hash && origUrl && origDate) {
|
if (hash && origUrl && origDate && origId) {
|
||||||
const date = tsToDate(origDate).toISOString();
|
const date = tsToDate(origDate).toISOString();
|
||||||
// always write revisit here
|
// always write revisit here
|
||||||
// duplicate URLs in same crawl filtered out separately
|
// duplicate URLs in same crawl filtered out separately
|
||||||
|
|
@ -1696,6 +1700,7 @@ export class Recorder extends EventEmitter {
|
||||||
origUrl,
|
origUrl,
|
||||||
date,
|
date,
|
||||||
));
|
));
|
||||||
|
await this.crawlState.addDupeCrawlRef(origId);
|
||||||
} else {
|
} else {
|
||||||
// no dupe, continue
|
// no dupe, continue
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,8 @@ import {
|
||||||
MAX_DEPTH,
|
MAX_DEPTH,
|
||||||
DEFAULT_MAX_RETRIES,
|
DEFAULT_MAX_RETRIES,
|
||||||
ROBOTS_CACHE_LIMIT,
|
ROBOTS_CACHE_LIMIT,
|
||||||
HASH_DUPE_KEY
|
HASH_DUPE_KEY,
|
||||||
|
HASH_DUPE_SOURCE_LIST_KEY,
|
||||||
} from "./constants.js";
|
} from "./constants.js";
|
||||||
import { ScopedSeed } from "./seeds.js";
|
import { ScopedSeed } from "./seeds.js";
|
||||||
import { Frame } from "puppeteer-core";
|
import { Frame } from "puppeteer-core";
|
||||||
|
|
@ -192,6 +193,8 @@ export type SaveState = {
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
export class RedisDedupIndex {
|
export class RedisDedupIndex {
|
||||||
dedupRedis: Redis;
|
dedupRedis: Redis;
|
||||||
|
key: string;
|
||||||
|
dedupKeyIndex = -1;
|
||||||
|
|
||||||
sourceDone = "src:d";
|
sourceDone = "src:d";
|
||||||
sourceQ = "src:q";
|
sourceQ = "src:q";
|
||||||
|
|
@ -199,22 +202,42 @@ export class RedisDedupIndex {
|
||||||
sourceP = "src:p";
|
sourceP = "src:p";
|
||||||
pendingPrefix = "pending:q:";
|
pendingPrefix = "pending:q:";
|
||||||
|
|
||||||
constructor(dedupRedis: Redis) {
|
constructor(dedupRedis: Redis, key: string) {
|
||||||
this.dedupRedis = dedupRedis;
|
this.dedupRedis = dedupRedis;
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async getKeyIndex() {
|
||||||
|
if (!this.key) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const res = await this.dedupRedis.lpos(HASH_DUPE_SOURCE_LIST_KEY, this.key);
|
||||||
|
if (res) {
|
||||||
|
this.dedupKeyIndex = res;
|
||||||
|
} else {
|
||||||
|
this.dedupKeyIndex = await this.addToSourcesList(this.key);
|
||||||
|
}
|
||||||
|
return this.dedupKeyIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
async addToSourcesList(crawlId: string) {
|
||||||
|
return (
|
||||||
|
(await this.dedupRedis.rpush(HASH_DUPE_SOURCE_LIST_KEY, crawlId)) - 1
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getHashDupe(
|
async getHashDupe(
|
||||||
hash: string,
|
hash: string,
|
||||||
key = HASH_DUPE_KEY,
|
key = HASH_DUPE_KEY,
|
||||||
//url: string,
|
//url: string,
|
||||||
): Promise<{ origDate?: string; origUrl?: string }> {
|
): Promise<{ origDate?: string; origUrl?: string; origId?: string }> {
|
||||||
hash = hash.split(":").at(-1)!;
|
hash = hash.split(":").at(-1)!;
|
||||||
const value = await this.dedupRedis.hget(key, hash);
|
const value = await this.dedupRedis.hget(key, hash);
|
||||||
if (!value) {
|
if (!value) {
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
const val = value.split("|");
|
const val = value.split(" ");
|
||||||
return { origUrl: val[1], origDate: val[0] };
|
return { origUrl: val[2], origDate: val[1], origId: val[0] };
|
||||||
}
|
}
|
||||||
|
|
||||||
async addHashDupe(
|
async addHashDupe(
|
||||||
|
|
@ -223,8 +246,12 @@ export class RedisDedupIndex {
|
||||||
date: string,
|
date: string,
|
||||||
key = HASH_DUPE_KEY,
|
key = HASH_DUPE_KEY,
|
||||||
) {
|
) {
|
||||||
const val = date.replace(/[^\d]/g, "") + "|" + url;
|
date = date.replace(/[^\d]/g, "");
|
||||||
hash = hash.split(":").at(-1)!;
|
hash = hash.split(":").at(-1)!;
|
||||||
|
if (this.dedupKeyIndex < 0) {
|
||||||
|
await this.getKeyIndex();
|
||||||
|
}
|
||||||
|
const val = `${this.dedupKeyIndex} ${date} ${url}`;
|
||||||
await this.dedupRedis.hsetnx(key, hash, val);
|
await this.dedupRedis.hsetnx(key, hash, val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -275,7 +302,7 @@ export class RedisDedupIndex {
|
||||||
|
|
||||||
await this.dedupRedis.lrem(this.pendingQ, 1, res);
|
await this.dedupRedis.lrem(this.pendingQ, 1, res);
|
||||||
const { id, url } = JSON.parse(res);
|
const { id, url } = JSON.parse(res);
|
||||||
const total = await this.dedupRedis.llen(this.sourceQ);
|
const total = (await this.dedupRedis.llen(this.sourceQ)) + 1;
|
||||||
await this.dedupRedis.setex(this.pendingPrefix + id, "1", 300);
|
await this.dedupRedis.setex(this.pendingPrefix + id, "1", 300);
|
||||||
return { id, url, total };
|
return { id, url, total };
|
||||||
}
|
}
|
||||||
|
|
@ -291,7 +318,6 @@ export class RedisCrawlState extends RedisDedupIndex {
|
||||||
maxRetries: number;
|
maxRetries: number;
|
||||||
|
|
||||||
uid: string;
|
uid: string;
|
||||||
key: string;
|
|
||||||
maxPageTime: number;
|
maxPageTime: number;
|
||||||
|
|
||||||
qkey: string;
|
qkey: string;
|
||||||
|
|
@ -320,11 +346,10 @@ export class RedisCrawlState extends RedisDedupIndex {
|
||||||
maxRetries?: number,
|
maxRetries?: number,
|
||||||
dedupRedis?: Redis,
|
dedupRedis?: Redis,
|
||||||
) {
|
) {
|
||||||
super(dedupRedis || redis);
|
super(dedupRedis || redis, key);
|
||||||
this.redis = redis;
|
this.redis = redis;
|
||||||
|
|
||||||
this.uid = uid;
|
this.uid = uid;
|
||||||
this.key = key;
|
|
||||||
this.maxPageTime = maxPageTime;
|
this.maxPageTime = maxPageTime;
|
||||||
this.maxRetries = maxRetries ?? DEFAULT_MAX_RETRIES;
|
this.maxRetries = maxRetries ?? DEFAULT_MAX_RETRIES;
|
||||||
|
|
||||||
|
|
@ -1248,4 +1273,23 @@ return inx;
|
||||||
result.modified = this._timestamp();
|
result.modified = this._timestamp();
|
||||||
await this.redis.set(`${this.key}:profileUploaded`, JSON.stringify(result));
|
await this.redis.set(`${this.key}:profileUploaded`, JSON.stringify(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async addDupeCrawlRef(id: string) {
|
||||||
|
await this.redis.sadd(`${this.key}:dindex`, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getDupeDependentSources(): Promise<string[]> {
|
||||||
|
const dependIndexes = await this.redis.smembers(`${this.key}:dindex`);
|
||||||
|
const crawlIds = [];
|
||||||
|
for (const inx of dependIndexes) {
|
||||||
|
const crawlId = await this.dedupRedis.lindex(
|
||||||
|
HASH_DUPE_SOURCE_LIST_KEY,
|
||||||
|
Number(inx),
|
||||||
|
);
|
||||||
|
if (crawlId && crawlId !== this.key) {
|
||||||
|
crawlIds.push(crawlId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return crawlIds;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,7 @@ export type WACZInitOpts = {
|
||||||
signingToken?: string;
|
signingToken?: string;
|
||||||
title?: string;
|
title?: string;
|
||||||
description?: string;
|
description?: string;
|
||||||
|
requires?: string[];
|
||||||
};
|
};
|
||||||
|
|
||||||
export type WACZResourceEntry = {
|
export type WACZResourceEntry = {
|
||||||
|
|
@ -61,6 +62,7 @@ export type WACZDataPackage = {
|
||||||
software: string;
|
software: string;
|
||||||
title?: string;
|
title?: string;
|
||||||
description?: string;
|
description?: string;
|
||||||
|
relation?: { requires: string[] };
|
||||||
};
|
};
|
||||||
|
|
||||||
type WACZDigest = {
|
type WACZDigest = {
|
||||||
|
|
@ -131,6 +133,11 @@ export class WACZ {
|
||||||
this.datapackage.description = config.description;
|
this.datapackage.description = config.description;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (config.requires && config.requires.length) {
|
||||||
|
this.datapackage.relation = { requires: config.requires };
|
||||||
|
}
|
||||||
|
console.log("REQUIRES", config.requires);
|
||||||
|
|
||||||
this.signingUrl = config.signingUrl || null;
|
this.signingUrl = config.signingUrl || null;
|
||||||
this.signingToken = config.signingToken || null;
|
this.signingToken = config.signingToken || null;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ afterAll(async () => {
|
||||||
function runCrawl(name, db="0") {
|
function runCrawl(name, db="0") {
|
||||||
fs.rmSync(`./test-crawls/collections/${name}`, { recursive: true, force: true });
|
fs.rmSync(`./test-crawls/collections/${name}`, { recursive: true, force: true });
|
||||||
|
|
||||||
const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupUrl redis://dedup-redis:6379/${db} --generateWACZ`);
|
const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupUrl redis://dedup-redis:6379/${db} --generateWACZ`);
|
||||||
|
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve) => {
|
||||||
crawler.on("exit", (code) => {
|
crawler.on("exit", (code) => {
|
||||||
|
|
@ -54,6 +54,20 @@ function loadFirstWARC(name) {
|
||||||
return parser;
|
return parser;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function loadDataPackageRelated(name) {
|
||||||
|
execSync(
|
||||||
|
`unzip test-crawls/collections/${name}/${name}.wacz -d test-crawls/collections/${name}/wacz`,
|
||||||
|
);
|
||||||
|
|
||||||
|
const data = fs.readFileSync(
|
||||||
|
`test-crawls/collections/${name}/wacz/datapackage.json`,
|
||||||
|
"utf8",
|
||||||
|
);
|
||||||
|
const dataPackageJSON = JSON.parse(data);
|
||||||
|
return dataPackageJSON.relation;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
test("check revisit records written on duplicate crawl", async () => {
|
test("check revisit records written on duplicate crawl", async () => {
|
||||||
|
|
||||||
expect(await runCrawl("dedup-test-orig")).toBe(0);
|
expect(await runCrawl("dedup-test-orig")).toBe(0);
|
||||||
|
|
@ -99,7 +113,7 @@ test("check revisit records written on duplicate crawl", async () => {
|
||||||
|
|
||||||
test("import index and crawl dupe", async () => {
|
test("import index and crawl dupe", async () => {
|
||||||
|
|
||||||
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedup-test-orig/dedup-test-orig.wacz --redisDedupUrl redis://dedup-redis:6379/1`);
|
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedup webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedup-test-orig/dedup-test-orig.wacz --sourceId dedup-test-orig --redisDedupUrl redis://dedup-redis:6379/1`);
|
||||||
|
|
||||||
const redis = new Redis("redis://127.0.0.1:37379/1", { lazyConnect: true, retryStrategy: () => null });
|
const redis = new Redis("redis://127.0.0.1:37379/1", { lazyConnect: true, retryStrategy: () => null });
|
||||||
|
|
||||||
|
|
@ -130,4 +144,16 @@ test("imported crawl dupe matches previous dupe count", async () => {
|
||||||
expect(revisit).toBe(numResponses);
|
expect(revisit).toBe(numResponses);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("test requires in wacz 1", () => {
|
||||||
|
|
||||||
|
const expected = {"requires": ["dedup-test-orig"]};
|
||||||
|
|
||||||
|
const res1 = loadDataPackageRelated("dedup-test-dupe");
|
||||||
|
const res2 = loadDataPackageRelated("dedup-test-dupe-2");
|
||||||
|
|
||||||
|
expect(res1).toEqual(expected);
|
||||||
|
expect(res1).toEqual(res2);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue