From 8595bcebc18b644aee7f9e2ff304a60010facbf1 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Tue, 25 Nov 2025 07:58:30 -0800 Subject: [PATCH] add new logger.interrupt() which will interrupt and exit crawl but not fail unlike logger.fatal() replace some logger.fatal() with interrupts to allow for retries instead of immediate failure, esp. when external inputs (profile, behaviors) can not be downloaded --- src/crawler.ts | 47 +++++++++++++---------------------------- src/util/browser.ts | 9 ++++---- src/util/constants.ts | 1 + src/util/file_reader.ts | 21 +++++++++++------- src/util/logger.ts | 45 +++++++++++++++++++++++++++++++++++++-- src/util/state.ts | 6 ++++++ src/util/storage.ts | 3 +-- src/util/wacz.ts | 3 +-- src/util/warcwriter.ts | 17 ++++++--------- 9 files changed, 92 insertions(+), 60 deletions(-) diff --git a/src/crawler.ts b/src/crawler.ts index 0d2a7e69..ecc8a321 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -63,12 +63,7 @@ import { import { Recorder } from "./util/recorder.js"; import { SitemapReader } from "./util/sitemapper.js"; import { ScopedSeed, parseSeeds } from "./util/seeds.js"; -import { - WARCWriter, - createWARCInfo, - setWARCInfo, - streamFinish, -} from "./util/warcwriter.js"; +import { WARCWriter, createWARCInfo, setWARCInfo } from "./util/warcwriter.js"; import { isHTMLMime, isRedirectStatus } from "./util/reqresp.js"; import { initProxy } from "./util/proxy.js"; import { initFlow, nextFlowStep } from "./util/flowbehavior.js"; @@ -474,10 +469,9 @@ export class Crawler { async bootstrap() { if (await isDiskFull(this.params.cwd)) { - logger.fatal( + await logger.interrupt( "Out of disk space, exiting", {}, - "general", ExitCodes.OutOfSpace, ); } @@ -502,8 +496,7 @@ export class Crawler { await fsp.mkdir(this.warcCdxDir, { recursive: true }); } - this.logFH = fs.createWriteStream(this.logFilename, { flags: "a" }); - logger.setExternalLogStream(this.logFH); + logger.openLog(this.logFilename); this.infoString = await getInfoString(); setWARCInfo(this.infoString, this.params.warcInfo); @@ -1572,14 +1565,7 @@ self.__bx_behaviors.selectMainBehavior(); } async setStatusAndExit(exitCode: ExitCodes, status: string) { - logger.info(`Exiting, Crawl status: ${status}`); - - await this.closeLog(); - - if (this.crawlState && status) { - await this.crawlState.setStatus(status); - } - process.exit(exitCode); + await logger.interrupt("", {}, exitCode, status); } async serializeAndExit() { @@ -1906,17 +1892,6 @@ self.__bx_behaviors.selectMainBehavior(); this.browser.crashed = true; } - async closeLog(): Promise { - // close file-based log - logger.setExternalLogStream(null); - if (!this.logFH) { - return; - } - const logFH = this.logFH; - this.logFH = null; - await streamFinish(logFH); - } - async generateWACZ() { logger.info("Generating WACZ"); await this.crawlState.setStatus("generate-wacz"); @@ -1953,7 +1928,7 @@ self.__bx_behaviors.selectMainBehavior(); logger.debug("End of log file in WACZ, storing logs to WACZ file"); - await this.closeLog(); + await logger.closeLog(); const waczOpts: WACZInitOpts = { input: warcFileList.map((x) => path.join(this.archivesDir, x)), @@ -2002,9 +1977,17 @@ self.__bx_behaviors.selectMainBehavior(); } catch (e) { logger.error("Error creating WACZ", e); if (!streaming) { - logger.fatal("Unable to write WACZ successfully"); + await logger.interrupt( + "Unable to write WACZ successfully", + formatErr(e), + ExitCodes.GenericError, + ); } else if (this.params.restartsOnError) { - await this.setStatusAndExit(ExitCodes.UploadFailed, "interrupted"); + await logger.interrupt( + "Unable to upload WACZ successfully", + formatErr(e), + ExitCodes.UploadFailed, + ); } } } diff --git a/src/util/browser.ts b/src/util/browser.ts index 8a249a4c..5e3727f9 100644 --- a/src/util/browser.ts +++ b/src/util/browser.ts @@ -12,6 +12,7 @@ import { initStorage, S3StorageSync, UploadResult } from "./storage.js"; import { DISPLAY, + ExitCodes, PAGE_OP_TIMEOUT_SECS, type ServiceWorkerOpt, } from "./constants.js"; @@ -236,10 +237,10 @@ export class Browser { this.removeSingletons(); return true; } catch (e) { - logger.fatal( - `Profile filename ${profileFilename} not a valid tar.gz, can not load profile, exiting`, - {}, - "browser", + await logger.interrupt( + `Profile not a valid tar.gz, can not load profile, exiting`, + { profileFilename }, + ExitCodes.InvalidInput, ); } } diff --git a/src/util/constants.ts b/src/util/constants.ts index 15b00bd7..0ec3a710 100644 --- a/src/util/constants.ts +++ b/src/util/constants.ts @@ -82,6 +82,7 @@ export enum ExitCodes { Fatal = 17, ProxyError = 21, UploadFailed = 22, + InvalidInput = 23, } export enum InterruptReason { diff --git a/src/util/file_reader.ts b/src/util/file_reader.ts index e7172b45..e1871c10 100644 --- a/src/util/file_reader.ts +++ b/src/util/file_reader.ts @@ -9,6 +9,7 @@ import { exec as execCallback } from "child_process"; import { formatErr, logger } from "./logger.js"; import { getProxyDispatcher } from "./proxy.js"; import { parseRecorderFlowJson } from "./flowbehavior.js"; +import { ExitCodes } from "./constants.js"; const exec = util.promisify(execCallback); @@ -61,10 +62,14 @@ export async function collectOnlineSeedFile(url: string): Promise { logger.info("Seed file downloaded", { url, path: filepath }); return filepath; } catch (e) { - logger.fatal("Error downloading seed file from URL", { - url, - ...formatErr(e), - }); + await logger.interrupt( + "Error downloading seed file from URL", + { + url, + ...formatErr(e), + }, + ExitCodes.InvalidInput, + ); throw e; } } @@ -122,10 +127,10 @@ async function collectGitBehaviors(gitUrl: string): Promise { ); return await collectLocalPathBehaviors(pathToCollect); } catch (e) { - logger.fatal( + await logger.interrupt( "Error downloading custom behaviors from Git repo", { url: urlStripped, ...formatErr(e) }, - "behavior", + ExitCodes.InvalidInput, ); } return []; @@ -145,10 +150,10 @@ async function collectOnlineBehavior(url: string): Promise { ); return await collectLocalPathBehaviors(behaviorFilepath, 0, url); } catch (e) { - logger.fatal( + await logger.interrupt( "Error downloading custom behavior from URL", { url, ...formatErr(e) }, - "behavior", + ExitCodes.InvalidInput, ); } return []; diff --git a/src/util/logger.ts b/src/util/logger.ts index 7d10939e..20dc7870 100644 --- a/src/util/logger.ts +++ b/src/util/logger.ts @@ -2,6 +2,7 @@ // to fix serialization of regexes for logging purposes import { Writable } from "node:stream"; +import fs from "node:fs"; import { RedisCrawlState } from "./state.js"; import { ExitCodes } from "./constants.js"; @@ -80,13 +81,24 @@ class Logger { excludeContexts: LogContext[] = []; crawlState?: RedisCrawlState | null = null; fatalExitCode: ExitCodes = ExitCodes.Fatal; + logFH: Writable | null = null; setDefaultFatalExitCode(exitCode: number) { this.fatalExitCode = exitCode; } - setExternalLogStream(logFH: Writable | null) { - this.logStream = logFH; + openLog(filename: string) { + this.logFH = fs.createWriteStream(filename, { flags: "a" }); + } + + async closeLog(): Promise { + // close file-based log + if (!this.logFH) { + return; + } + const logFH = this.logFH; + this.logFH = null; + await streamFinish(logFH); } setDebugLogging(debugLog: boolean) { @@ -220,6 +232,35 @@ class Logger { process.exit(exitCode); } } + + async interrupt( + message: string, + data = {}, + exitCode: ExitCodes, + status = "interrupted", + ) { + if (message) { + this.error(`${message}: exiting, crawl status: ${status}`, data); + } else { + this.info(`exiting, crawl status: ${status}`); + } + + await this.closeLog(); + + if (this.crawlState && status) { + await this.crawlState.setStatus(status); + } + process.exit(exitCode); + } +} + +// ================================================================= +export function streamFinish(fh: Writable) { + const p = new Promise((resolve) => { + fh.once("finish", () => resolve()); + }); + fh.end(); + return p; } export const logger = new Logger(); diff --git a/src/util/state.ts b/src/util/state.ts index 8960d9b6..4ab884d0 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -1106,4 +1106,10 @@ return inx; result.modified = this._timestamp(); await this.redis.set(`${this.key}:profileUploaded`, JSON.stringify(result)); } + + async markFailedIfEmpty() { + if ((await this.numDone()) === 0) { + await this.setStatus("failed"); + } + } } diff --git a/src/util/storage.ts b/src/util/storage.ts index 7ff054be..370d40ac 100644 --- a/src/util/storage.ts +++ b/src/util/storage.ts @@ -176,10 +176,9 @@ export class S3StorageSync { await sleep(5); logger.warn("Retry downloading profile", {}, "storage"); } else { - logger.fatal( + await logger.interrupt( "Could not download profile, exiting", {}, - "storage", ExitCodes.Failed, ); } diff --git a/src/util/wacz.ts b/src/util/wacz.ts index fcf4eabc..362b257a 100644 --- a/src/util/wacz.ts +++ b/src/util/wacz.ts @@ -13,8 +13,7 @@ import { gzip } from "node:zlib"; import { ReadableStream } from "node:stream/web"; import { makeZip, InputWithoutMeta } from "client-zip"; -import { logger, formatErr } from "./logger.js"; -import { streamFinish } from "./warcwriter.js"; +import { logger, formatErr, streamFinish } from "./logger.js"; import { getDirSize } from "./storage.js"; const DATAPACKAGE_JSON = "datapackage.json"; diff --git a/src/util/warcwriter.ts b/src/util/warcwriter.ts index 7c26b60c..af714d6b 100644 --- a/src/util/warcwriter.ts +++ b/src/util/warcwriter.ts @@ -4,7 +4,13 @@ import path from "path"; import { CDXIndexer, WARCRecord, DEFAULT_CDX_FIELDS } from "warcio"; import { WARCSerializer } from "warcio/node"; -import { logger, formatErr, LogDetails, LogContext } from "./logger.js"; +import { + logger, + formatErr, + LogDetails, + LogContext, + streamFinish, +} from "./logger.js"; import type { IndexerOffsetLength } from "warcio"; import { timestampNow } from "./timing.js"; import PQueue from "p-queue"; @@ -373,12 +379,3 @@ export async function createWARCInfo(filename: string) { }); return buffer; } - -// ================================================================= -export function streamFinish(fh: Writable) { - const p = new Promise((resolve) => { - fh.once("finish", () => resolve()); - }); - fh.end(); - return p; -}