misc fixes:

- allow specifying custom redis start args via REDIS_ARGS env var, parse with splitArgsQuoteSafe()
- unify checking crawl should be stopped, also check when trying to get new page
- if getting new page failed, just return, avoid null dereference
- support adding offset to '-X' ordinal at the end via CRAWL_INDEX_OFFSET env var
This commit is contained in:
Ilya Kreymer 2023-08-15 18:37:11 -07:00
parent 212bff0a27
commit 1a1b9b4bff
3 changed files with 37 additions and 13 deletions

View file

@ -16,7 +16,7 @@ import { TextExtract } from "./util/textextract.js";
import { initStorage, getFileSize, getDirSize, interpolateFilename, checkDiskUtilization } from "./util/storage.js";
import { ScreenCaster, WSTransport, RedisPubSubTransport } from "./util/screencaster.js";
import { Screenshots } from "./util/screenshots.js";
import { parseArgs } from "./util/argParser.js";
import { parseArgs, splitArgsQuoteSafe } from "./util/argParser.js";
import { initRedis } from "./util/redis.js";
import { logger, errJSON } from "./util/logger.js";
import { runWorkers } from "./util/worker.js";
@ -259,7 +259,9 @@ export class Crawler {
const subprocesses = [];
subprocesses.push(child_process.spawn("redis-server", {cwd: "/tmp/", stdio: redisStdio}));
const redisArgs = process.env.REDIS_ARGS ? splitArgsQuoteSafe(process.env.REDIS_ARGS) : [];
subprocesses.push(child_process.spawn("redis-server", redisArgs, {cwd: "/tmp/", stdio: redisStdio}));
opts.env = {
...process.env,
@ -679,6 +681,18 @@ self.__bx_behaviors.selectMainBehavior();
process.exit(0);
}
async isCrawlRunning() {
if (this.interrupted) {
return false;
}
if (await this.crawlState.isCrawlStopped()) {
return false;
}
return true;
}
async crawl() {
if (this.params.healthCheckPort) {
this.healthChecker = new HealthChecker(this.params.healthCheckPort, this.params.workers);

View file

@ -398,7 +398,7 @@ class ArgParser {
argv = argv || process.argv;
if (process.env.CRAWL_ARGS) {
argv = argv.concat(this.splitCrawlArgsQuoteSafe(process.env.CRAWL_ARGS));
argv = argv.concat(splitArgsQuoteSafe(process.env.CRAWL_ARGS));
}
let origConfig = {};
@ -419,12 +419,6 @@ class ArgParser {
return {parsed, origConfig};
}
splitCrawlArgsQuoteSafe(crawlArgs) {
// Split process.env.CRAWL_ARGS on spaces but retaining spaces within double quotes
const regex = /"[^"]+"|[^\s]+/g;
return crawlArgs.match(regex).map(e => e.replace(/"(.+)"/, "$1"));
}
validateArgs(argv) {
argv.collection = interpolateFilename(argv.collection, argv.crawlId);
@ -545,3 +539,9 @@ class ArgParser {
export function parseArgs(argv) {
return new ArgParser().parseArgs(argv);
}
export function splitArgsQuoteSafe(crawlArgs) {
// Split process.env.CRAWL_ARGS on spaces but retaining spaces within double quotes
const regex = /"[^"]+"|[^\s]+/g;
return crawlArgs.match(regex).map(e => e.replace(/"(.+)"/, "$1"));
}

View file

@ -23,7 +23,8 @@ export function runWorkers(crawler, numWorkers, maxPageTime) {
const rx = new RegExp(rxEscape(process.env.CRAWL_ID) + "\\-([\\d]+)$");
const m = os.hostname().match(rx);
if (m) {
offset = m[1] * numWorkers;
offset = Number(m[1]) + (Number(process.env.CRAWL_INDEX_OFFSET) || 0);
offset = offset * numWorkers;
logger.info("Starting workerid index at " + offset, "worker");
}
}
@ -94,10 +95,10 @@ export class PageWorker
this.reuseCount = 1;
const workerid = this.id;
while (true) {
while (await this.crawler.isCrawlRunning()) {
try {
logger.debug("Getting page in new window", {workerid}, "worker");
const { page, cdp } = await timedRun(
const result = await timedRun(
this.crawler.browser.newWindowPageWithCDP(),
NEW_WINDOW_TIMEOUT,
"New Window Timed Out",
@ -105,6 +106,12 @@ export class PageWorker
"worker"
);
if (!result) {
continue;
}
const { page, cdp } = result;
this.page = page;
this.cdp = cdp;
this.opts = {page: this.page, cdp: this.cdp, workerid};
@ -180,13 +187,16 @@ export class PageWorker
async runLoop() {
const crawlState = this.crawler.crawlState;
while (!this.crawler.interrupted && !await crawlState.isCrawlStopped()) {
while (await this.crawler.isCrawlRunning()) {
const data = await crawlState.nextFromQueue();
// see if any work data in the queue
if (data) {
// init page (new or reuse)
const opts = await this.initPage();
if (!opts) {
break;
}
// run timed crawl of page
await this.timedCrawlPage({...opts, data});