browsertrix-crawler/util/state.js
Tessa Walsh 1bee46b321
Remove puppeteer-cluster + iframe filtering + health check refactor + logging improvements (0.9.0-beta.0) (#219)
* This commit removes puppeteer-cluster as a dependency in favor of
a simpler concurrency implementation, using p-queue to limit
concurrency to the number of available workers. As part of the
refactor, the custom window concurrency model in windowconcur.js
is removed and its logic implemented in the new Worker class's
initPage method.

* Remove concurrency models, always use new tab

* logging improvements: include worker-id in logs, use 'worker' context
- logging: log info string / version as first line
- logging: improve logging of error stack traces
- interruption: support interrupting crawl directly with 'interrupt' check which stops the job queue
- interruption: don't repair if interrupting, wait for queue to be idle
- log text extraction
- init order: ensure wb-manager init called first, then logs created
- logging: adjust info->debug logging
- Log no jobs available as debug

* tests: bail on first failure

* iframe filtering:
- fix filtering for about:blank iframes, support non-async shouldProcessFrame()
- filter iframes both for behaviors and for link extraction
- add 5-second timeout to link extraction, to avoid link extraction holding up crawl!
- cache filtered frames

* healthcheck/worker reuse:
- refactor healthchecker into separate class
- increment healthchecker (if provided) if new page load fails
- remove expermeintal repair functionality for now
- add healthcheck

* deps: bump puppeteer-core to 17.1.2
- bump to 0.9.0-beta.0

--------
Co-authored-by: Ilya Kreymer <ikreymer@gmail.com>
2023-03-08 18:31:19 -08:00

473 lines
9.7 KiB
JavaScript

import { Job } from "./job.js";
import { Logger } from "./logger.js";
const logger = new Logger();
// ============================================================================
export class BaseState
{
constructor() {
this.drainMax = 0;
this.localOnly = false;
}
async setDrain(localOnly = false) {
this.drainMax = (await this.numRealPending()) + (await this.numDone());
this.localOnly = localOnly;
}
async size() {
return this.drainMax ? 0 : await this.realSize();
}
async isFinished() {
return (await this.realSize() == 0) && (await this.numDone() > 0);
}
async numSeen() {
return this.drainMax ? this.drainMax : await this.numRealSeen();
}
recheckScope(data, seeds) {
const seed = seeds[data.seedId];
return seed.isIncluded(data.url, data.depth, data.extraHops);
}
numPending(localPending = 0) {
if (this.localOnly) {
return localPending;
}
return this.numRealPending();
}
}
// ============================================================================
export class MemoryCrawlState extends BaseState
{
constructor() {
super();
this.seenList = new Set();
this.queue = [];
this.pending = new Map();
this.done = [];
this.status = null;
}
setStatus(status) {
this.status = status;
}
getStatus() {
return this.status;
}
incFailCount() {
// memory-only state, no retries
return true;
}
push(urlData) {
this.pending.delete(urlData.url);
this.queue.unshift(urlData);
}
realSize() {
return this.queue.length;
}
shift() {
const data = this.queue.pop();
if (!data) {
return;
}
const url = data.url;
const state = this;
state.pending.set(url, data);
const callbacks = {
start() {
data.started = new Date().toISOString();
state.pending.set(url, data);
},
resolve() {
state.pending.delete(url);
data.finished = new Date().toISOString();
state.done.unshift(data);
},
reject(e) {
logger.warn(`Page Load Failed: ${url}`, e.message);
state.pending.delete(url);
data.failed = true;
state.done.unshift(data);
}
};
return new Job(data, callbacks);
}
has(url) {
return this.seenList.has(url);
}
add(url) {
return this.seenList.add(url);
}
async serialize() {
const queued = this.queue.map(x => JSON.stringify(x));
const done = this.done.map(x => JSON.stringify(x));
const pending = (await this.getPendingList()).map(x => JSON.stringify(x));
return {queued, pending, done};
}
async load(state, seeds, checkScope=false) {
for (const json of state.queued) {
const data = JSON.parse(json);
if (checkScope && !this.recheckScope(data, seeds)) {
continue;
}
this.queue.push(data);
this.seenList.add(data.url);
}
for (const json of state.pending) {
const data = JSON.parse(json);
if (checkScope && !this.recheckScope(data, seeds)) {
continue;
}
this.queue.push(data);
this.seenList.add(data.url);
}
for (const json of state.done) {
const data = JSON.parse(json);
if (data.failed) {
this.queue.push(data);
} else {
this.done.push(data);
}
this.seenList.add(data.url);
}
return this.seenList.size;
}
async numDone() {
return this.done.length;
}
async numRealSeen() {
return this.seenList.size;
}
async numRealPending() {
return this.pending.size;
}
async getPendingList() {
return Array.from(this.pending.values());
}
}
// ============================================================================
export class RedisCrawlState extends BaseState
{
constructor(redis, key, pageTimeout, uid) {
super();
this.redis = redis;
this._lastSize = 0;
this.uid = uid;
this.key = key;
this.pageTimeout = pageTimeout / 1000;
this.qkey = this.key + ":q";
this.pkey = this.key + ":p";
this.skey = this.key + ":s";
this.dkey = this.key + ":d";
redis.defineCommand("addqueue", {
numberOfKeys: 2,
lua: `
redis.call('lpush', KEYS[2], ARGV[2]);
redis.call('hdel', KEYS[1], ARGV[1]);
`
});
redis.defineCommand("getnext", {
numberOfKeys: 2,
lua: `
local json = redis.call('rpop', KEYS[1]);
if json then
local data = cjson.decode(json);
redis.call('hset', KEYS[2], data.url, json);
end
return json;
`
});
redis.defineCommand("markstarted", {
numberOfKeys: 2,
lua: `
local json = redis.call('hget', KEYS[1], ARGV[1]);
if json then
local data = cjson.decode(json);
data['started'] = ARGV[2];
json = cjson.encode(data);
redis.call('hset', KEYS[1], ARGV[1], json);
redis.call('setex', KEYS[2], ARGV[3], "1");
end
`
});
redis.defineCommand("movedone", {
numberOfKeys: 2,
lua: `
local json = redis.call('hget', KEYS[1], ARGV[1]);
if json then
local data = cjson.decode(json);
data[ARGV[3]] = ARGV[2];
json = cjson.encode(data);
redis.call('lpush', KEYS[2], json);
redis.call('hdel', KEYS[1], ARGV[1]);
end
`
});
redis.defineCommand("requeue", {
numberOfKeys: 3,
lua: `
local res = redis.call('get', KEYS[3]);
if not res then
local json = redis.call('hget', KEYS[1], ARGV[1]);
if json then
redis.call('lpush', KEYS[2], json);
redis.call('hdel', KEYS[1], ARGV[1]);
return 1
end
end
return 0;
`
});
}
async _getNext() {
return await this.redis.getnext(this.qkey, this.pkey);
}
async _markStarted(url) {
const started = new Date().toISOString();
return await this.redis.markstarted(this.pkey, this.pkey + ":" + url, url, started, this.pageTimeout);
}
async _finish(url) {
const finished = new Date().toISOString();
return await this.redis.movedone(this.pkey, this.dkey, url, finished, "finished");
}
async _fail(url) {
return await this.redis.movedone(this.pkey, this.dkey, url, "1", "failed");
}
async setStatus(status_) {
await this.redis.hset(`${this.key}:status`, this.uid, status_);
}
async getStatus() {
return await this.redis.hget(`${this.key}:status`, this.uid);
}
async incFailCount() {
const key = `${this.key}:status:failcount:${this.uid}`;
const res = await this.redis.incr(key);
// consider failed if 3 failed retries in 60 secs
await this.redis.expire(key, 60);
return (res >= 3);
}
async push(urlData) {
await this.redis.addqueue(this.pkey, this.qkey, urlData.url, JSON.stringify(urlData));
}
async shift() {
const json = await this._getNext();
let data;
try {
data = JSON.parse(json);
} catch(e) {
logger.error("Invalid queued json", json);
return undefined;
}
if (!data) {
return undefined;
}
const url = data.url;
const state = this;
const callbacks = {
async start() {
await state._markStarted(url);
},
async resolve() {
await state._finish(url);
},
async reject(e) {
logger.warn(`Page Load Failed: ${url}`, e.message);
await state._fail(url);
}
};
return new Job(data, callbacks);
}
async has(url) {
return !!await this.redis.sismember(this.skey, url);
}
async add(url) {
return await this.redis.sadd(this.skey, url);
}
async serialize() {
const queued = await this._iterListKeys(this.qkey);
const done = await this._iterListKeys(this.dkey);
const pending = await this.getPendingList();
return {queued, pending, done};
}
async _iterListKeys(key, inc = 100) {
const results = [];
const len = await this.redis.llen(key);
for (let i = 0; i < len; i += inc) {
const someResults = await this.redis.lrange(key, i, i + inc - 1);
results.push(...someResults);
}
return results;
}
async load(state, seeds, checkScope) {
const seen = [];
// need to delete existing keys, if exist to fully reset state
await this.redis.del(this.qkey);
await this.redis.del(this.pkey);
await this.redis.del(this.dkey);
await this.redis.del(this.skey);
for (const json of state.queued) {
const data = JSON.parse(json);
if (checkScope) {
if (!this.recheckScope(data, seeds)) {
continue;
}
}
await this.redis.rpush(this.qkey, json);
seen.push(data.url);
}
for (const json of state.pending) {
const data = JSON.parse(json);
if (checkScope) {
if (!this.recheckScope(data, seeds)) {
continue;
}
}
await this.redis.rpush(this.qkey, json);
seen.push(data.url);
}
for (const json of state.done) {
const data = JSON.parse(json);
if (data.failed) {
await this.redis.rpush(this.qkey, json);
} else {
await this.redis.rpush(this.dkey, json);
}
seen.push(data.url);
}
await this.redis.sadd(this.skey, seen);
return seen.length;
}
async numDone() {
return await this.redis.llen(this.dkey);
}
async numRealSeen() {
return await this.redis.scard(this.skey);
}
async numRealPending() {
const res = await this.redis.hlen(this.pkey);
// reset pendings
if (res > 0 && !this._lastSize) {
await this.resetPendings();
}
return res;
}
async getPendingList() {
const list = await this.redis.hvals(this.pkey);
return list.map(x => JSON.parse(x));
}
async resetPendings() {
const pendingUrls = await this.redis.hkeys(this.pkey);
for (const url of pendingUrls) {
if (await this.redis.requeue(this.pkey, this.qkey, this.pkey + ":" + url, url)) {
logger.info(`Requeued: ${url}`);
}
}
}
async realSize() {
this._lastSize = await this.redis.llen(this.qkey);
return this._lastSize;
}
}