From cf90304fa736bef1443f35a943a1700491e1e8ad Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Fri, 17 Jun 2022 11:58:44 -0700 Subject: [PATCH] 0.6.0 Wait State + Screencasting Fixes (#141) * new options: - to support browsertrix-cloud, add a --waitOnDone option, which has browsertrix crawler wait when finished - when running with redis shared state, set the `:status` field to `running`, `failing`, `failed` or `done` to let job controller know crawl is finished. - set redis state to `failing` in case of exception, set to `failed` in case of >3 or more failed exits within 60 seconds (todo: make customizable) - when receiving a SIGUSR1, assume final shutdown and finalize files (eg. save WACZ) before exiting. - also write WACZ if exiting due to size limit exceed, but not do to other interruptions - change sleep() to be in seconds * misc fixes: - crawlstate.finished() -> isFinished() - return if >0 pages and none left in queue - don't fail crawl if isFinished() is true - don't keep looping in pending wait for urls to finish if received abort request * screencast improvements (fix related to webrecorder/browsertrix-cloud#233) - more optimized screencasting, don't close and restart after every page. - don't assume targets change after every page, they don't in window mode! - only send 'close' message when target is actually closed * bump to 0.6.0 --- crawler.js | 100 ++++++++++++++++++++++++++++++------------- docker-compose.yml | 2 +- main.js | 13 ++++-- package.json | 2 +- util/argParser.js | 6 +++ util/screencaster.js | 24 ++++++++++- util/state.js | 38 ++++++++++++++-- 7 files changed, 145 insertions(+), 40 deletions(-) diff --git a/crawler.js b/crawler.js index d080fe87..e1f5e7aa 100644 --- a/crawler.js +++ b/crawler.js @@ -99,6 +99,10 @@ class Crawler { this.errorCount = 0; this.exitCode = 0; + + this.done = false; + this.sizeExceeded = false; + this.finalExit = false; } statusLog(...args) { @@ -150,15 +154,21 @@ class Crawler { let redis; - try { - redis = await initRedis(redisUrl); - } catch (e) { - throw new Error("Unable to connect to state store Redis: " + redisUrl); + while (true) { + try { + redis = await initRedis(redisUrl); + break; + } catch (e) { + //throw new Error("Unable to connect to state store Redis: " + redisUrl); + console.warn(`Waiting for redis at ${redisUrl}`); + await this.sleep(3); + } } - this.statusLog(`Storing state via Redis ${redisUrl} @ key prefix "${this.params.crawlId}"`); + this.statusLog(`Storing state via Redis ${redisUrl} @ key prefix "${this.crawlId}"`); - this.crawlState = new RedisCrawlState(redis, this.params.crawlId, this.params.timeout); + this.crawlState = new RedisCrawlState(redis, this.params.crawlId, this.params.timeout * 2, os.hostname()); + await this.crawlState.setStatus("running"); } else { this.statusLog("Storing state in memory"); @@ -265,14 +275,26 @@ class Crawler { await fsp.mkdir(this.params.cwd, {recursive: true}); this.bootstrap(); + let status; try { await this.crawl(); - process.exit(this.exitCode); + status = (this.exitCode === 0 ? "done" : "interrupted"); } catch(e) { console.error("Crawl failed"); console.error(e); - process.exit(9); + this.exitCode = 9; + status = "failing"; + if (await this.crawlState.incFailCount()) { + status = "failed"; + } + + } finally { + console.log(status); + + await this.crawlState.setStatus(status); + + process.exit(this.exitCode); } } @@ -293,7 +315,7 @@ class Crawler { async crawlPage({page, data}) { try { if (this.screencaster) { - await this.screencaster.newTarget(page.target()); + await this.screencaster.screencastTarget(page.target()); } if (this.emulateDevice) { @@ -341,15 +363,6 @@ class Crawler { } catch (e) { console.warn(e); - } finally { - - try { - if (this.screencaster) { - await this.screencaster.endTarget(page.target()); - } - } catch (e) { - console.warn(e); - } } } @@ -400,6 +413,7 @@ class Crawler { if (size >= this.params.sizeLimit) { console.log(`Size threshold reached ${size} >= ${this.params.sizeLimit}, stopping`); interrupt = true; + this.sizeExceeded = true; } } @@ -503,8 +517,26 @@ class Crawler { await this.awaitProcess(child_process.spawn("wb-manager", ["reindex", this.params.collection], {stdio: "inherit", cwd: this.params.cwd})); } - if (this.params.generateWACZ) { + if (this.params.generateWACZ && (this.exitCode === 0 || this.finalExit || this.sizeExceeded)) { await this.generateWACZ(); + + if (this.sizeExceeded) { + console.log(`Clearing ${this.collDir} before exit`); + try { + fs.rmSync(this.collDir, { recursive: true, force: true }); + } catch(e) { + console.warn(e); + } + } + } + + if (this.exitCode === 0 && this.params.waitOnDone && this.params.redisStoreUrl && !this.finalExit) { + this.done = true; + this.statusLog("All done, waiting for signal..."); + await this.crawlState.setStatus("done"); + + // wait forever until signal + await new Promise(() => {}); } } @@ -516,8 +548,15 @@ class Crawler { // Get a list of the warcs inside const warcFileList = await fsp.readdir(archiveDir); + // is finished (>0 pages and all pages written) + const isFinished = await this.crawlState.isFinished(); + console.log(`Num WARC Files: ${warcFileList.length}`); if (!warcFileList.length) { + // if finished, just return + if (isFinished) { + return; + } throw new Error("No WARC Files, assuming crawl failed"); } @@ -526,7 +565,6 @@ class Crawler { const waczPath = path.join(this.collDir, waczFilename); const createArgs = ["create", "--split-seeds", "-o", waczPath, "--pages", this.pagesFile]; - const validateArgs = ["validate"]; if (process.env.WACZ_SIGN_URL) { createArgs.push("--signing-url"); @@ -538,7 +576,6 @@ class Crawler { } createArgs.push("-f"); - validateArgs.push("-f"); warcFileList.forEach((val, index) => createArgs.push(path.join(archiveDir, val))); // eslint-disable-line no-unused-vars @@ -553,6 +590,9 @@ class Crawler { this.debugLog(`WACZ successfully generated and saved to: ${waczPath}`); // Verify WACZ + /* + const validateArgs = ["validate"]; + validateArgs.push("-f"); validateArgs.push(waczPath); const waczVerifyResult = await this.awaitProcess(child_process.spawn("wacz", validateArgs, {stdio: "inherit"})); @@ -561,13 +601,12 @@ class Crawler { console.log("validate", waczVerifyResult); throw new Error("Unable to verify WACZ created successfully"); } - +*/ if (this.storage) { - const finished = await this.crawlState.finished(); const filename = process.env.STORE_FILENAME || "@ts-@id.wacz"; const targetFilename = interpolateFilename(filename, this.crawlId); - await this.storage.uploadCollWACZ(waczPath, targetFilename, finished); + await this.storage.uploadCollWACZ(waczPath, targetFilename, isFinished); } } @@ -719,7 +758,7 @@ class Crawler { try { while (await page.$("div.cf-browser-verification.cf-im-under-attack")) { this.statusLog("Cloudflare Check Detected, waiting for reload..."); - await this.sleep(5500); + await this.sleep(5.5); } } catch (e) { //console.warn("Check CF failed, ignoring"); @@ -850,7 +889,8 @@ class Crawler { const redis = await initRedis("redis://localhost/0"); - while (true) { + // wait until pending, unless canceling + while (this.exitCode !== 1) { const res = await redis.get(`pywb:${this.params.collection}:pending`); if (res === "0" || !res) { break; @@ -858,12 +898,12 @@ class Crawler { this.debugLog(`Still waiting for ${res} pending requests to finish...`); - await this.sleep(1000); + await this.sleep(1); } } - sleep(time) { - return new Promise(resolve => setTimeout(resolve, time)); + sleep(seconds) { + return new Promise(resolve => setTimeout(resolve, seconds * 1000)); } async parseSitemap(url, seedId) { @@ -983,7 +1023,7 @@ class Crawler { if (!done) { return; } - if (await this.crawlState.finished()) { + if (await this.crawlState.isFinished()) { return; } break; diff --git a/docker-compose.yml b/docker-compose.yml index 33b9f058..47605c92 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: '3.5' services: crawler: - image: webrecorder/browsertrix-crawler:latest + image: ${REGISTRY}webrecorder/browsertrix-crawler:latest build: context: ./ diff --git a/main.js b/main.js index 4e50511e..617d9e04 100755 --- a/main.js +++ b/main.js @@ -14,7 +14,7 @@ async function handleTerminate() { try { if (!crawler.crawlState.drainMax) { console.log("SIGNAL: gracefully finishing current pages..."); - crawler.crawlState.setDrain(); + crawler.crawlState.setDrain(crawler.finalExit); } else if ((Date.now() - lastSigInt) > 200) { console.log("SIGNAL: stopping crawl now..."); @@ -32,10 +32,16 @@ process.on("SIGINT", async () => { await handleTerminate(); }); +process.on("SIGUSR1", () => { + if (crawler) { + crawler.finalExit = true; + } +}); + process.on("SIGTERM", async () => { - if (forceTerm) { + if (forceTerm || crawler.done) { console.log("SIGTERM received, exit immediately"); - process.exit(1); + process.exit(crawler.done ? 0 : 1); } console.log("SIGTERM received..."); @@ -45,6 +51,7 @@ process.on("SIGTERM", async () => { process.on("SIGABRT", async () => { console.log("SIGABRT received, will force immediate exit on SIGTERM"); forceTerm = true; + crawler.exitCode = 1; }); diff --git a/package.json b/package.json index c6a4230f..1e4b955f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "browsertrix-crawler", - "version": "0.6.0-beta.1", + "version": "0.6.0", "main": "browsertrix-crawler", "repository": "https://github.com/webrecorder/browsertrix-crawler", "author": "Ilya Kreymer , Webrecorder Software", diff --git a/util/argParser.js b/util/argParser.js index 494e07a5..a115d932 100644 --- a/util/argParser.js +++ b/util/argParser.js @@ -281,6 +281,12 @@ class ArgParser { type: "boolean", default: false }, + + "waitOnDone": { + describe: "if set, wait for interrupt signal when finished instead of exiting", + type: "boolean", + default: false + }, }; } diff --git a/util/screencaster.js b/util/screencaster.js index 89c1c9d7..474621be 100644 --- a/util/screencaster.js +++ b/util/screencaster.js @@ -175,11 +175,31 @@ class ScreenCaster } } + detectClose(target) { + const context = target.browserContext(); - async newTarget(target) { - const cdp = await target.createCDPSession(); + if (context.__destroy_added) { + return; + } + + context.on("targetdestroyed", (target) => { + this.endTarget(target); + }); + + context.__destroy_added = true; + } + + async screencastTarget(target) { const id = target._targetId; + if (this.targets.has(id)) { + return; + } + + this.detectClose(target); + + const cdp = await target.createCDPSession(); + this.targets.set(id, cdp); this.urls.set(id, target.url()); diff --git a/util/state.js b/util/state.js index 4917873d..52e2bb88 100644 --- a/util/state.js +++ b/util/state.js @@ -18,8 +18,8 @@ class BaseState return this.drainMax ? 0 : await this.realSize(); } - async finished() { - return await this.realSize() == 0; + async isFinished() { + return (await this.realSize() == 0) && (await this.numDone() > 0); } async numSeen() { @@ -51,6 +51,20 @@ class MemoryCrawlState extends BaseState this.queue = []; this.pending = new Map(); this.done = []; + this.status = null; + } + + setStatus(status) { + this.status = status; + } + + getStatus() { + return this.status; + } + + incFailCount() { + // memory-only state, no retries + return true; } push(job) { @@ -165,12 +179,13 @@ class MemoryCrawlState extends BaseState // ============================================================================ class RedisCrawlState extends BaseState { - constructor(redis, key, pageTimeout) { + constructor(redis, key, pageTimeout, uid) { super(); this.redis = redis; this._lastSize = 0; + this.uid = uid; this.key = key; this.pageTimeout = pageTimeout / 1000; @@ -272,6 +287,23 @@ return 0; return await this.redis.movedone(this.pkey, this.dkey, url, "1", "failed"); } + async setStatus(status_) { + await this.redis.hset(`${this.key}:status`, this.uid, status_); + } + + async getStatus() { + return await this.redis.hget(`${this.key}:status`, this.uid); + } + + async incFailCount() { + const key = `${this.key}:status:failcount:${this.uid}`; + const res = await this.redis.incr(key); + + // consider failed if 3 failed retries in 60 secs + await this.redis.expire(key, 60); + return (res >= 3); + } + async push(job) { await this.redis.addqueue(this.pkey, this.qkey, job.data.url, JSON.stringify(job.data)); }