mirror of
https://github.com/webrecorder/browsertrix-crawler.git
synced 2025-10-19 14:33:17 +00:00

* new options: - to support browsertrix-cloud, add a --waitOnDone option, which has browsertrix crawler wait when finished - when running with redis shared state, set the `<crawl id>:status` field to `running`, `failing`, `failed` or `done` to let job controller know crawl is finished. - set redis state to `failing` in case of exception, set to `failed` in case of >3 or more failed exits within 60 seconds (todo: make customizable) - when receiving a SIGUSR1, assume final shutdown and finalize files (eg. save WACZ) before exiting. - also write WACZ if exiting due to size limit exceed, but not do to other interruptions - change sleep() to be in seconds * misc fixes: - crawlstate.finished() -> isFinished() - return if >0 pages and none left in queue - don't fail crawl if isFinished() is true - don't keep looping in pending wait for urls to finish if received abort request * screencast improvements (fix related to webrecorder/browsertrix-cloud#233) - more optimized screencasting, don't close and restart after every page. - don't assume targets change after every page, they don't in window mode! - only send 'close' message when target is actually closed * bump to 0.6.0
459 lines
9.5 KiB
JavaScript
459 lines
9.5 KiB
JavaScript
const Job = require("puppeteer-cluster/dist/Job").default;
|
|
|
|
|
|
// ============================================================================
|
|
class BaseState
|
|
{
|
|
constructor() {
|
|
this.drainMax = 0;
|
|
this.localOnly = false;
|
|
}
|
|
|
|
async setDrain(localOnly = false) {
|
|
this.drainMax = (await this.numRealPending()) + (await this.numDone());
|
|
this.localOnly = localOnly;
|
|
}
|
|
|
|
async size() {
|
|
return this.drainMax ? 0 : await this.realSize();
|
|
}
|
|
|
|
async isFinished() {
|
|
return (await this.realSize() == 0) && (await this.numDone() > 0);
|
|
}
|
|
|
|
async numSeen() {
|
|
return this.drainMax ? this.drainMax : await this.numRealSeen();
|
|
}
|
|
|
|
recheckScope(data, seeds) {
|
|
const seed = seeds[data.seedId];
|
|
|
|
return seed.isIncluded(data.url, data.depth, data.extraHops);
|
|
}
|
|
|
|
numPending(localPending = 0) {
|
|
if (this.localOnly) {
|
|
return localPending;
|
|
}
|
|
|
|
return this.numRealPending();
|
|
}
|
|
}
|
|
|
|
|
|
// ============================================================================
|
|
class MemoryCrawlState extends BaseState
|
|
{
|
|
constructor() {
|
|
super();
|
|
this.seenList = new Set();
|
|
this.queue = [];
|
|
this.pending = new Map();
|
|
this.done = [];
|
|
this.status = null;
|
|
}
|
|
|
|
setStatus(status) {
|
|
this.status = status;
|
|
}
|
|
|
|
getStatus() {
|
|
return this.status;
|
|
}
|
|
|
|
incFailCount() {
|
|
// memory-only state, no retries
|
|
return true;
|
|
}
|
|
|
|
push(job) {
|
|
this.pending.delete(job.data.url);
|
|
this.queue.unshift(job.data);
|
|
}
|
|
|
|
realSize() {
|
|
return this.queue.length;
|
|
}
|
|
|
|
shift() {
|
|
const data = this.queue.pop();
|
|
|
|
const url = data.url;
|
|
|
|
const state = this;
|
|
|
|
state.pending.set(url, data);
|
|
|
|
const callbacks = {
|
|
start() {
|
|
data.started = new Date().toISOString();
|
|
|
|
state.pending.set(url, data);
|
|
},
|
|
|
|
resolve() {
|
|
state.pending.delete(url);
|
|
|
|
data.finished = new Date().toISOString();
|
|
|
|
state.done.unshift(data);
|
|
},
|
|
|
|
reject(e) {
|
|
console.warn(`Page Load Failed: ${url}, Reason: ${e}`);
|
|
|
|
state.pending.delete(url);
|
|
|
|
data.failed = true;
|
|
|
|
state.done.unshift(data);
|
|
}
|
|
};
|
|
|
|
return new Job(data, undefined, callbacks);
|
|
}
|
|
|
|
has(url) {
|
|
return this.seenList.has(url);
|
|
}
|
|
|
|
add(url) {
|
|
return this.seenList.add(url);
|
|
}
|
|
|
|
async serialize() {
|
|
const queued = this.queue.map(x => JSON.stringify(x));
|
|
const pending = Array.from(this.pending.values()).map(x => JSON.stringify(x));
|
|
const done = this.done.map(x => JSON.stringify(x));
|
|
|
|
return {queued, pending, done};
|
|
}
|
|
|
|
async load(state, seeds, checkScope=false) {
|
|
for (const json of state.queued) {
|
|
const data = JSON.parse(json);
|
|
if (checkScope && !this.recheckScope(data, seeds)) {
|
|
continue;
|
|
}
|
|
this.queue.push(data);
|
|
this.seenList.add(data.url);
|
|
}
|
|
|
|
for (const json of state.pending) {
|
|
const data = JSON.parse(json);
|
|
if (checkScope && !this.recheckScope(data, seeds)) {
|
|
continue;
|
|
}
|
|
this.queue.push(data);
|
|
this.seenList.add(data.url);
|
|
}
|
|
|
|
for (const json of state.done) {
|
|
const data = JSON.parse(json);
|
|
if (data.failed) {
|
|
this.queue.push(data);
|
|
} else {
|
|
this.done.push(data);
|
|
}
|
|
this.seenList.add(data.url);
|
|
}
|
|
|
|
return this.seenList.size;
|
|
}
|
|
|
|
async numDone() {
|
|
return this.done.length;
|
|
}
|
|
|
|
async numRealSeen() {
|
|
return this.seenList.size;
|
|
}
|
|
|
|
async numRealPending() {
|
|
return this.pending.size;
|
|
}
|
|
}
|
|
|
|
|
|
// ============================================================================
|
|
class RedisCrawlState extends BaseState
|
|
{
|
|
constructor(redis, key, pageTimeout, uid) {
|
|
super();
|
|
this.redis = redis;
|
|
|
|
this._lastSize = 0;
|
|
|
|
this.uid = uid;
|
|
this.key = key;
|
|
this.pageTimeout = pageTimeout / 1000;
|
|
|
|
this.qkey = this.key + ":q";
|
|
this.pkey = this.key + ":p";
|
|
this.skey = this.key + ":s";
|
|
this.dkey = this.key + ":d";
|
|
|
|
redis.defineCommand("addqueue", {
|
|
numberOfKeys: 2,
|
|
lua: `
|
|
redis.call('lpush', KEYS[2], ARGV[2]);
|
|
redis.call('hdel', KEYS[1], ARGV[1]);
|
|
`
|
|
});
|
|
|
|
redis.defineCommand("getnext", {
|
|
numberOfKeys: 2,
|
|
lua: `
|
|
local json = redis.call('rpop', KEYS[1]);
|
|
|
|
if json then
|
|
local data = cjson.decode(json);
|
|
redis.call('hset', KEYS[2], data.url, json);
|
|
end
|
|
|
|
return json;
|
|
`
|
|
});
|
|
|
|
redis.defineCommand("markstarted", {
|
|
numberOfKeys: 2,
|
|
lua: `
|
|
local json = redis.call('hget', KEYS[1], ARGV[1]);
|
|
|
|
if json then
|
|
local data = cjson.decode(json);
|
|
data['started'] = ARGV[2];
|
|
json = cjson.encode(data);
|
|
redis.call('hset', KEYS[1], ARGV[1], json);
|
|
redis.call('setex', KEYS[2], ARGV[3], "1");
|
|
end
|
|
|
|
`
|
|
});
|
|
|
|
redis.defineCommand("movedone", {
|
|
numberOfKeys: 2,
|
|
lua: `
|
|
local json = redis.call('hget', KEYS[1], ARGV[1]);
|
|
|
|
if json then
|
|
local data = cjson.decode(json);
|
|
data[ARGV[3]] = ARGV[2];
|
|
json = cjson.encode(data);
|
|
|
|
redis.call('lpush', KEYS[2], json);
|
|
redis.call('hdel', KEYS[1], ARGV[1]);
|
|
end
|
|
|
|
`
|
|
});
|
|
|
|
redis.defineCommand("requeue", {
|
|
numberOfKeys: 3,
|
|
lua: `
|
|
local res = redis.call('get', KEYS[3]);
|
|
if not res then
|
|
local json = redis.call('hget', KEYS[1], ARGV[1]);
|
|
if json then
|
|
redis.call('lpush', KEYS[2], json);
|
|
redis.call('hdel', KEYS[1], ARGV[1]);
|
|
return 1
|
|
end
|
|
end
|
|
return 0;
|
|
`
|
|
});
|
|
|
|
}
|
|
|
|
async _getNext() {
|
|
return await this.redis.getnext(this.qkey, this.pkey);
|
|
}
|
|
|
|
async _markStarted(url) {
|
|
const started = new Date().toISOString();
|
|
|
|
return await this.redis.markstarted(this.pkey, this.pkey + ":" + url, url, started, this.pageTimeout);
|
|
}
|
|
|
|
async _finish(url) {
|
|
const finished = new Date().toISOString();
|
|
|
|
return await this.redis.movedone(this.pkey, this.dkey, url, finished, "finished");
|
|
}
|
|
|
|
async _fail(url) {
|
|
return await this.redis.movedone(this.pkey, this.dkey, url, "1", "failed");
|
|
}
|
|
|
|
async setStatus(status_) {
|
|
await this.redis.hset(`${this.key}:status`, this.uid, status_);
|
|
}
|
|
|
|
async getStatus() {
|
|
return await this.redis.hget(`${this.key}:status`, this.uid);
|
|
}
|
|
|
|
async incFailCount() {
|
|
const key = `${this.key}:status:failcount:${this.uid}`;
|
|
const res = await this.redis.incr(key);
|
|
|
|
// consider failed if 3 failed retries in 60 secs
|
|
await this.redis.expire(key, 60);
|
|
return (res >= 3);
|
|
}
|
|
|
|
async push(job) {
|
|
await this.redis.addqueue(this.pkey, this.qkey, job.data.url, JSON.stringify(job.data));
|
|
}
|
|
|
|
async shift() {
|
|
const json = await this._getNext();
|
|
let data;
|
|
|
|
try {
|
|
data = JSON.parse(json);
|
|
} catch(e) {
|
|
console.error("Invalid queued json: ", json);
|
|
return null;
|
|
}
|
|
|
|
if (!data) {
|
|
return null;
|
|
}
|
|
|
|
const url = data.url;
|
|
|
|
const state = this;
|
|
|
|
const callbacks = {
|
|
async start() {
|
|
await state._markStarted(url);
|
|
},
|
|
|
|
async resolve() {
|
|
await state._finish(url);
|
|
},
|
|
|
|
async reject(e) {
|
|
console.warn(`Page Load Failed: ${url}, Reason: ${e}`);
|
|
await state._fail(url);
|
|
}
|
|
};
|
|
|
|
return new Job(data, undefined, callbacks);
|
|
}
|
|
|
|
async has(url) {
|
|
return !!await this.redis.sismember(this.skey, url);
|
|
}
|
|
|
|
async add(url) {
|
|
return await this.redis.sadd(this.skey, url);
|
|
}
|
|
|
|
async serialize() {
|
|
const queued = await this._iterListKeys(this.qkey);
|
|
const done = await this._iterListKeys(this.dkey);
|
|
const pending = await this.redis.hvals(this.pkey);
|
|
|
|
return {queued, pending, done};
|
|
}
|
|
|
|
async _iterListKeys(key, inc = 100) {
|
|
const results = [];
|
|
|
|
const len = await this.redis.llen(key);
|
|
|
|
for (let i = 0; i < len; i += inc) {
|
|
const someResults = await this.redis.lrange(key, i, i + inc - 1);
|
|
results.push(...someResults);
|
|
}
|
|
return results;
|
|
}
|
|
|
|
async load(state, seeds, checkScope) {
|
|
const seen = [];
|
|
|
|
// need to delete existing keys, if exist to fully reset state
|
|
await this.redis.del(this.qkey);
|
|
await this.redis.del(this.pkey);
|
|
await this.redis.del(this.dkey);
|
|
await this.redis.del(this.skey);
|
|
|
|
for (const json of state.queued) {
|
|
const data = JSON.parse(json);
|
|
if (checkScope) {
|
|
if (!this.recheckScope(data, seeds)) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
await this.redis.rpush(this.qkey, json);
|
|
seen.push(data.url);
|
|
}
|
|
|
|
for (const json of state.pending) {
|
|
const data = JSON.parse(json);
|
|
if (checkScope) {
|
|
if (!this.recheckScope(data, seeds)) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
await this.redis.rpush(this.qkey, json);
|
|
seen.push(data.url);
|
|
}
|
|
|
|
for (const json of state.done) {
|
|
const data = JSON.parse(json);
|
|
if (data.failed) {
|
|
await this.redis.rpush(this.qkey, json);
|
|
} else {
|
|
await this.redis.rpush(this.dkey, json);
|
|
}
|
|
seen.push(data.url);
|
|
}
|
|
|
|
await this.redis.sadd(this.skey, seen);
|
|
return seen.length;
|
|
}
|
|
|
|
async numDone() {
|
|
return await this.redis.llen(this.dkey);
|
|
}
|
|
|
|
async numRealSeen() {
|
|
return await this.redis.scard(this.skey);
|
|
}
|
|
|
|
async numRealPending() {
|
|
const res = await this.redis.hlen(this.pkey);
|
|
|
|
// reset pendings
|
|
if (res > 0 && !this._lastSize) {
|
|
await this.resetPendings();
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
async resetPendings() {
|
|
const pendingUrls = await this.redis.hkeys(this.pkey);
|
|
|
|
for (const url of pendingUrls) {
|
|
if (await this.redis.requeue(this.pkey, this.qkey, this.pkey + ":" + url, url)) {
|
|
console.log("Requeued: " + url);
|
|
}
|
|
}
|
|
}
|
|
|
|
async realSize() {
|
|
this._lastSize = await this.redis.llen(this.qkey);
|
|
return this._lastSize;
|
|
}
|
|
}
|
|
|
|
module.exports.RedisCrawlState = RedisCrawlState;
|
|
module.exports.MemoryCrawlState = MemoryCrawlState;
|