From 1a1b9b4bff6d8b514f4cd6ce53fab6f3f868468c Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Tue, 15 Aug 2023 18:37:11 -0700 Subject: [PATCH] misc fixes: - allow specifying custom redis start args via REDIS_ARGS env var, parse with splitArgsQuoteSafe() - unify checking crawl should be stopped, also check when trying to get new page - if getting new page failed, just return, avoid null dereference - support adding offset to '-X' ordinal at the end via CRAWL_INDEX_OFFSET env var --- crawler.js | 18 ++++++++++++++++-- util/argParser.js | 14 +++++++------- util/worker.js | 18 ++++++++++++++---- 3 files changed, 37 insertions(+), 13 deletions(-) diff --git a/crawler.js b/crawler.js index d19546c1..b1da8147 100644 --- a/crawler.js +++ b/crawler.js @@ -16,7 +16,7 @@ import { TextExtract } from "./util/textextract.js"; import { initStorage, getFileSize, getDirSize, interpolateFilename, checkDiskUtilization } from "./util/storage.js"; import { ScreenCaster, WSTransport, RedisPubSubTransport } from "./util/screencaster.js"; import { Screenshots } from "./util/screenshots.js"; -import { parseArgs } from "./util/argParser.js"; +import { parseArgs, splitArgsQuoteSafe } from "./util/argParser.js"; import { initRedis } from "./util/redis.js"; import { logger, errJSON } from "./util/logger.js"; import { runWorkers } from "./util/worker.js"; @@ -259,7 +259,9 @@ export class Crawler { const subprocesses = []; - subprocesses.push(child_process.spawn("redis-server", {cwd: "/tmp/", stdio: redisStdio})); + const redisArgs = process.env.REDIS_ARGS ? splitArgsQuoteSafe(process.env.REDIS_ARGS) : []; + + subprocesses.push(child_process.spawn("redis-server", redisArgs, {cwd: "/tmp/", stdio: redisStdio})); opts.env = { ...process.env, @@ -679,6 +681,18 @@ self.__bx_behaviors.selectMainBehavior(); process.exit(0); } + async isCrawlRunning() { + if (this.interrupted) { + return false; + } + + if (await this.crawlState.isCrawlStopped()) { + return false; + } + + return true; + } + async crawl() { if (this.params.healthCheckPort) { this.healthChecker = new HealthChecker(this.params.healthCheckPort, this.params.workers); diff --git a/util/argParser.js b/util/argParser.js index 21991292..e9e0a25f 100644 --- a/util/argParser.js +++ b/util/argParser.js @@ -398,7 +398,7 @@ class ArgParser { argv = argv || process.argv; if (process.env.CRAWL_ARGS) { - argv = argv.concat(this.splitCrawlArgsQuoteSafe(process.env.CRAWL_ARGS)); + argv = argv.concat(splitArgsQuoteSafe(process.env.CRAWL_ARGS)); } let origConfig = {}; @@ -419,12 +419,6 @@ class ArgParser { return {parsed, origConfig}; } - splitCrawlArgsQuoteSafe(crawlArgs) { - // Split process.env.CRAWL_ARGS on spaces but retaining spaces within double quotes - const regex = /"[^"]+"|[^\s]+/g; - return crawlArgs.match(regex).map(e => e.replace(/"(.+)"/, "$1")); - } - validateArgs(argv) { argv.collection = interpolateFilename(argv.collection, argv.crawlId); @@ -545,3 +539,9 @@ class ArgParser { export function parseArgs(argv) { return new ArgParser().parseArgs(argv); } + +export function splitArgsQuoteSafe(crawlArgs) { + // Split process.env.CRAWL_ARGS on spaces but retaining spaces within double quotes + const regex = /"[^"]+"|[^\s]+/g; + return crawlArgs.match(regex).map(e => e.replace(/"(.+)"/, "$1")); +} diff --git a/util/worker.js b/util/worker.js index ae3c4484..13d6a376 100644 --- a/util/worker.js +++ b/util/worker.js @@ -23,7 +23,8 @@ export function runWorkers(crawler, numWorkers, maxPageTime) { const rx = new RegExp(rxEscape(process.env.CRAWL_ID) + "\\-([\\d]+)$"); const m = os.hostname().match(rx); if (m) { - offset = m[1] * numWorkers; + offset = Number(m[1]) + (Number(process.env.CRAWL_INDEX_OFFSET) || 0); + offset = offset * numWorkers; logger.info("Starting workerid index at " + offset, "worker"); } } @@ -94,10 +95,10 @@ export class PageWorker this.reuseCount = 1; const workerid = this.id; - while (true) { + while (await this.crawler.isCrawlRunning()) { try { logger.debug("Getting page in new window", {workerid}, "worker"); - const { page, cdp } = await timedRun( + const result = await timedRun( this.crawler.browser.newWindowPageWithCDP(), NEW_WINDOW_TIMEOUT, "New Window Timed Out", @@ -105,6 +106,12 @@ export class PageWorker "worker" ); + if (!result) { + continue; + } + + const { page, cdp } = result; + this.page = page; this.cdp = cdp; this.opts = {page: this.page, cdp: this.cdp, workerid}; @@ -180,13 +187,16 @@ export class PageWorker async runLoop() { const crawlState = this.crawler.crawlState; - while (!this.crawler.interrupted && !await crawlState.isCrawlStopped()) { + while (await this.crawler.isCrawlRunning()) { const data = await crawlState.nextFromQueue(); // see if any work data in the queue if (data) { // init page (new or reuse) const opts = await this.initPage(); + if (!opts) { + break; + } // run timed crawl of page await this.timedCrawlPage({...opts, data});