remove util

This commit is contained in:
Ilya Kreymer 2023-10-31 23:14:56 -07:00
parent 84f210e0b4
commit 8cc1a8c015
20 changed files with 1 additions and 4289 deletions

View file

@ -8,7 +8,7 @@
"license": "AGPL-3.0-or-later",
"scripts": {
"tsc": "tsc",
"lint": "eslint *.js util/*.js tests/*.test.js",
"lint": "eslint *.js tests/*.test.js",
"test": "yarn node --experimental-vm-modules $(yarn bin jest --bail 1)",
"prepare": "husky install"
},

View file

@ -1,575 +0,0 @@
import path from "path";
import fs from "fs";
import os from "os";
import yaml from "js-yaml";
import { KnownDevices as devices } from "puppeteer-core";
import yargs from "yargs";
import { hideBin } from "yargs/helpers";
import { BEHAVIOR_LOG_FUNC, WAIT_UNTIL_OPTS } from "./constants.js";
import { ScopedSeed } from "./seeds.js";
import { interpolateFilename } from "./storage.js";
import { screenshotTypes } from "./screenshots.js";
import { logger } from "./logger.js";
// ============================================================================
class ArgParser {
get cliOpts() {
return {
"seeds": {
alias: "url",
describe: "The URL to start crawling from",
type: "array",
default: [],
},
"seedFile": {
alias: ["urlFile"],
describe: "If set, read a list of seed urls, one per line, from the specified",
type: "string",
},
"workers": {
alias: "w",
describe: "The number of workers to run in parallel",
default: 1,
type: "number",
},
"crawlId": {
alias: "id",
describe: "A user provided ID for this crawl or crawl configuration (can also be set via CRAWL_ID env var)",
type: "string",
default: process.env.CRAWL_ID || os.hostname(),
},
"newContext": {
describe: "Deprecated as of 0.8.0, any values passed will be ignored",
default: null,
type: "string"
},
"waitUntil": {
describe: "Puppeteer page.goto() condition to wait for before continuing, can be multiple separated by ','",
default: "load,networkidle2",
},
"depth": {
describe: "The depth of the crawl for all seeds",
default: -1,
type: "number",
},
"extraHops": {
describe: "Number of extra 'hops' to follow, beyond the current scope",
default: 0,
type: "number"
},
"pageLimit": {
alias: "limit",
describe: "Limit crawl to this number of pages",
default: 0,
type: "number",
},
"maxPageLimit": {
describe: "Maximum pages to crawl, overriding pageLimit if both are set",
default: 0,
type: "number",
},
"pageLoadTimeout": {
alias: "timeout",
describe: "Timeout for each page to load (in seconds)",
default: 90,
type: "number",
},
"scopeType": {
describe: "A predefined scope of the crawl. For more customization, use 'custom' and set scopeIncludeRx regexes",
type: "string",
choices: ["page", "page-spa", "prefix", "host", "domain", "any", "custom"]
},
"scopeIncludeRx": {
alias: "include",
describe: "Regex of page URLs that should be included in the crawl (defaults to the immediate directory of URL)",
},
"scopeExcludeRx": {
alias: "exclude",
describe: "Regex of page URLs that should be excluded from the crawl."
},
"allowHashUrls": {
describe: "Allow Hashtag URLs, useful for single-page-application crawling or when different hashtags load dynamic content",
},
"blockRules": {
describe: "Additional rules for blocking certain URLs from being loaded, by URL regex and optionally via text match in an iframe",
type: "array",
default: [],
},
"blockMessage": {
describe: "If specified, when a URL is blocked, a record with this error message is added instead",
type: "string",
},
"blockAds": {
alias: "blockads",
describe: "If set, block advertisements from being loaded (based on Stephen Black's blocklist)",
type: "boolean",
default: false,
},
"adBlockMessage": {
describe: "If specified, when an ad is blocked, a record with this error message is added instead",
type: "string",
},
"collection": {
alias: "c",
describe: "Collection name to crawl to (replay will be accessible under this name in pywb preview)",
type: "string",
default: "crawl-@ts"
},
"headless": {
describe: "Run in headless mode, otherwise start xvfb",
type: "boolean",
default: false,
},
"driver": {
describe: "JS driver for the crawler",
type: "string",
default: "./defaultDriver.js",
},
"generateCDX": {
alias: ["generatecdx", "generateCdx"],
describe: "If set, generate index (CDXJ) for use with pywb after crawl is done",
type: "boolean",
default: false,
},
"combineWARC": {
alias: ["combinewarc", "combineWarc"],
describe: "If set, combine the warcs",
type: "boolean",
default: false,
},
"rolloverSize": {
describe: "If set, declare the rollover size",
default: 1000000000,
type: "number",
},
"generateWACZ": {
alias: ["generatewacz", "generateWacz"],
describe: "If set, generate wacz",
type: "boolean",
default: false,
},
"logging": {
describe: "Logging options for crawler, can include: stats (enabled by default), jserrors, pywb, debug",
type: "string",
default: "stats",
},
"logLevel": {
describe: "Comma-separated list of log levels to include in logs",
type: "string",
default: "",
},
"context": {
describe: "Comma-separated list of contexts to include in logs",
type: "string",
default: "",
},
"text": {
describe: "If set, extract text to the pages.jsonl file",
type: "boolean",
default: false,
},
"cwd": {
describe: "Crawl working directory for captures (pywb root). If not set, defaults to process.cwd()",
type: "string",
default: process.cwd(),
},
"mobileDevice": {
describe: "Emulate mobile device by name from: https://github.com/puppeteer/puppeteer/blob/main/src/common/DeviceDescriptors.ts",
type: "string",
},
"userAgent": {
describe: "Override user-agent with specified string",
type: "string",
},
"userAgentSuffix": {
describe: "Append suffix to existing browser user-agent (ex: +MyCrawler, info@example.com)",
type: "string",
},
"useSitemap": {
alias: "sitemap",
describe: "If enabled, check for sitemaps at /sitemap.xml, or custom URL if URL is specified",
},
"sitemapFromDate": {
alias: "sitemapFrom",
describe: "If set, filter URLs from sitemaps to those greater than or equal to provided ISO Date string (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS or partial date)",
},
"statsFilename": {
describe: "If set, output stats as JSON to this file. (Relative filename resolves to crawl working directory)"
},
"behaviors": {
describe: "Which background behaviors to enable on each page",
default: "autoplay,autofetch,autoscroll,siteSpecific",
type: "string",
},
"behaviorTimeout": {
describe: "If >0, timeout (in seconds) for in-page behavior will run on each page. If 0, a behavior can run until finish.",
default: 90,
type: "number",
},
"pageExtraDelay": {
alias: "delay",
describe: "If >0, amount of time to sleep (in seconds) after behaviors before moving on to next page",
default: 0,
type: "number",
},
"dedupPolicy": {
describe: "Deduplication policy",
default: "skip",
type: "string",
choices: ["skip", "revisit", "keep"],
},
"profile": {
describe: "Path to tar.gz file which will be extracted and used as the browser profile",
type: "string",
},
"screenshot": {
describe: "Screenshot options for crawler, can include: view, thumbnail, fullPage (comma-separated list)",
type: "string",
default: "",
},
"screencastPort": {
describe: "If set to a non-zero value, starts an HTTP server with screencast accessible on this port",
type: "number",
default: 0
},
"screencastRedis": {
describe: "If set, will use the state store redis pubsub for screencasting. Requires --redisStoreUrl to be set",
type: "boolean",
default: false
},
"warcInfo": {
alias: ["warcinfo"],
describe: "Optional fields added to the warcinfo record in combined WARCs",
type: "object"
},
"redisStoreUrl": {
describe: "If set, url for remote redis server to store state. Otherwise, using in-memory store",
type: "string",
default: "redis://localhost:6379/0"
},
"saveState": {
describe: "If the crawl state should be serialized to the crawls/ directory. Defaults to 'partial', only saved when crawl is interrupted",
type: "string",
default: "partial",
choices: ["never", "partial", "always"]
},
"saveStateInterval": {
describe: "If save state is set to 'always', also save state during the crawl at this interval (in seconds)",
type: "number",
default: 300,
},
"saveStateHistory": {
describe: "Number of save states to keep during the duration of a crawl",
type: "number",
default: 5,
},
"sizeLimit": {
describe: "If set, save state and exit if size limit exceeds this value",
type: "number",
default: 0,
},
"diskUtilization": {
describe: "If set, save state and exit if disk utilization exceeds this percentage value",
type: "number",
default: 90,
},
"timeLimit": {
describe: "If set, save state and exit after time limit, in seconds",
type: "number",
default: 0,
},
"healthCheckPort": {
describe: "port to run healthcheck on",
type: "number",
default: 0,
},
"overwrite": {
describe: "overwrite current crawl data: if set, existing collection directory will be deleted before crawl is started",
type: "boolean",
default: false
},
"waitOnDone": {
describe: "if set, wait for interrupt signal when finished instead of exiting",
type: "boolean",
default: false
},
"restartsOnError": {
describe: "if set, assume will be restarted if interrupted, don't run post-crawl processes on interrupt",
type: "boolean",
default: false
},
"netIdleWait": {
describe: "if set, wait for network idle after page load and after behaviors are done (in seconds). if -1 (default), determine based on scope",
type: "number",
default: -1
},
"lang": {
describe: "if set, sets the language used by the browser, should be ISO 639 language[-country] code",
type: "string"
},
"title": {
describe: "If set, write supplied title into WACZ datapackage.json metadata",
type: "string"
},
"description": {
alias: ["desc"],
describe: "If set, write supplied description into WACZ datapackage.json metadata",
type: "string"
},
"originOverride": {
describe: "if set, will redirect requests from each origin in key to origin in the value, eg. --originOverride https://host:port=http://alt-host:alt-port",
type: "array",
default: [],
},
"logErrorsToRedis": {
describe: "If set, write error messages to redis",
type: "boolean",
default: false,
},
"failOnFailedSeed": {
describe: "If set, crawler will fail with exit code 1 if any seed fails",
type: "boolean",
default: false
},
"failOnFailedLimit": {
describe: "If set, save state and exit if number of failed pages exceeds this value",
type: "number",
default: 0,
},
"customBehaviors": {
describe: "injects a custom behavior file or set of behavior files in a directory",
type: ["string"]
},
};
}
parseArgs(argv) {
argv = argv || process.argv;
if (process.env.CRAWL_ARGS) {
argv = argv.concat(this.splitCrawlArgsQuoteSafe(process.env.CRAWL_ARGS));
}
let origConfig = {};
const parsed = yargs(hideBin(argv))
.usage("crawler [options]")
.option(this.cliOpts)
.config("config", "Path to YAML config file", (configPath) => {
if (configPath === "/crawls/stdin") {
configPath = process.stdin.fd;
}
origConfig = yaml.load(fs.readFileSync(configPath, "utf8"));
return origConfig;
})
.check((argv) => this.validateArgs(argv))
.argv;
return {parsed, origConfig};
}
splitCrawlArgsQuoteSafe(crawlArgs) {
// Split process.env.CRAWL_ARGS on spaces but retaining spaces within double quotes
const regex = /"[^"]+"|[^\s]+/g;
return crawlArgs.match(regex).map(e => e.replace(/"(.+)"/, "$1"));
}
validateArgs(argv) {
argv.collection = interpolateFilename(argv.collection, argv.crawlId);
// Check that the collection name is valid.
if (argv.collection.search(/^[\w][\w-]*$/) === -1){
logger.fatal(`\n${argv.collection} is an invalid collection name. Please supply a collection name only using alphanumeric characters and the following characters [_ - ]\n`);
}
// waitUntil condition must be: load, domcontentloaded, networkidle0, networkidle2
// can be multiple separate by comma
// (see: https://github.com/puppeteer/puppeteer/blob/main/docs/api.md#pagegotourl-options)
if (typeof argv.waitUntil != "object"){
argv.waitUntil = argv.waitUntil.split(",");
}
for (const opt of argv.waitUntil) {
if (!WAIT_UNTIL_OPTS.includes(opt)) {
logger.fatal("Invalid waitUntil option, must be one of: " + WAIT_UNTIL_OPTS.join(","));
}
}
// validate screenshot options
if (argv.screenshot) {
const passedScreenshotTypes = argv.screenshot.split(",");
argv.screenshot = [];
passedScreenshotTypes.forEach((element) => {
if (element in screenshotTypes) {
argv.screenshot.push(element);
} else {
logger.warn(`${element} not found in ${screenshotTypes}`);
}
});
}
// log options
argv.logging = argv.logging.split(",");
argv.logLevel = argv.logLevel ? argv.logLevel.split(",") : [];
argv.context = argv.context ? argv.context.split(",") : [];
// background behaviors to apply
const behaviorOpts = {};
if (typeof argv.behaviors != "object"){
argv.behaviors = argv.behaviors.split(",");
}
argv.behaviors.forEach((x) => behaviorOpts[x] = true);
behaviorOpts.log = BEHAVIOR_LOG_FUNC;
argv.behaviorOpts = JSON.stringify(behaviorOpts);
if (argv.newContext) {
logger.info("Note: The newContext argument is deprecated in 0.8.0. Values passed to this option will be ignored");
}
if (argv.mobileDevice) {
argv.emulateDevice = devices[argv.mobileDevice.replace("-", " ")];
if (!argv.emulateDevice) {
logger.fatal("Unknown device: " + argv.mobileDevice);
}
} else {
argv.emulateDevice = {viewport: null};
}
if (argv.seedFile) {
const urlSeedFile = fs.readFileSync(argv.seedFile, "utf8");
const urlSeedFileList = urlSeedFile.split("\n");
if (typeof(argv.seeds) === "string") {
argv.seeds = [argv.seeds];
}
for (const seed of urlSeedFileList) {
if (seed) {
argv.seeds.push(seed);
}
}
}
if (argv.netIdleWait === -1) {
if (argv.scopeType === "page" || argv.scopeType === "page-spa") {
argv.netIdleWait = 15;
} else {
argv.netIdleWait = 2;
}
//logger.debug(`Set netIdleWait to ${argv.netIdleWait} seconds`);
}
const scopeOpts = {
scopeType: argv.scopeType,
sitemap: argv.sitemap,
include: argv.include,
exclude: argv.exclude,
depth: argv.depth,
extraHops: argv.extraHops,
};
argv.scopedSeeds = [];
for (let seed of argv.seeds) {
if (typeof(seed) === "string") {
seed = {url: seed};
}
try {
argv.scopedSeeds.push(new ScopedSeed({...scopeOpts, ...seed}));
} catch (e) {
if (argv.failOnFailedSeed) {
logger.fatal(`Invalid Seed "${seed.url}" specified, aborting crawl.`);
}
}
}
if (!argv.scopedSeeds.length) {
logger.fatal("No valid seeds specified, aborting crawl.");
}
// Resolve statsFilename
if (argv.statsFilename) {
argv.statsFilename = path.resolve(argv.cwd, argv.statsFilename);
}
if ((argv.diskUtilization < 0 || argv.diskUtilization > 99)) {
argv.diskUtilization = 90;
}
return true;
}
}
export function parseArgs(argv) {
return new ArgParser().parseArgs(argv);
}

View file

@ -1,243 +0,0 @@
import fs from "fs";
import { logger, errJSON } from "./logger.js";
const RULE_TYPES = ["block", "allowOnly"];
const ALWAYS_ALLOW = ["https://pywb.proxy/", "http://pywb.proxy/"];
const BlockState = {
ALLOW: null,
BLOCK_PAGE_NAV: "page",
BLOCK_IFRAME_NAV: "iframe",
BLOCK_OTHER: "resource",
BLOCK_AD: "advertisement"
};
// ===========================================================================
class BlockRule
{
constructor(data) {
if (typeof(data) === "string") {
this.url = new RegExp(data);
this.type = "block";
} else {
this.url = data.url ? new RegExp(data.url) : null;
this.frameTextMatch = data.frameTextMatch ? new RegExp(data.frameTextMatch) : null;
this.inFrameUrl = data.inFrameUrl ? new RegExp(data.inFrameUrl) : null;
this.type = data.type || "block";
}
if (!RULE_TYPES.includes(this.type)) {
logger.fatal("Rule \"type\" must be: " + RULE_TYPES.join(", "));
}
}
toString() {
return `\
* Rule for URL Regex: ${this.url}
Type: ${this.type}
In Frame Regex: ${this.inFrameUrl ? this.inFrameUrl : "any"}
Resource Type: ${this.frameTextMatch ? "frame" : "any"}
${this.frameTextMatch ? "Frame Text Regex: " + this.frameTextMatch : ""}
`;
}
}
// ===========================================================================
export class BlockRules
{
constructor(blockRules, blockPutUrl, blockErrMsg) {
this.rules = [];
this.blockPutUrl = blockPutUrl;
this.blockErrMsg = blockErrMsg;
this.blockedUrlSet = new Set();
for (const ruleData of blockRules) {
this.rules.push(new BlockRule(ruleData));
}
if (this.rules.length) {
logger.debug("URL Block Rules:\n", {}, "blocking");
for (const rule of this.rules) {
logger.debug(rule.toString(), {}, "blocking");
}
}
}
async initPage(browser, page) {
const onRequest = async (request) => {
const logDetails = {page: page.url()};
try {
await this.handleRequest(request, logDetails);
} catch (e) {
logger.warn("Error handling request", {...errJSON(e), ...logDetails}, "blocking");
}
};
await browser.interceptRequest(page, onRequest);
}
async handleRequest(request, logDetails) {
const url = request.url();
let blockState;
try {
blockState = await this.shouldBlock(request, url, logDetails);
if (blockState === BlockState.ALLOW) {
await request.continue({}, 1);
} else {
await request.abort("blockedbyclient", 1);
}
} catch (e) {
logger.debug(`Block: (${blockState}) Failed On: ${url}`, {...errJSON(e), ...logDetails}, "blocking");
}
}
async shouldBlock(request, url, logDetails) {
if (!url.startsWith("http:") && !url.startsWith("https:")) {
return BlockState.ALLOW;
}
const isNavReq = request.isNavigationRequest();
const frame = request.frame();
let frameUrl = "";
let blockState;
if (isNavReq) {
const parentFrame = frame.parentFrame();
if (parentFrame) {
frameUrl = parentFrame.url();
blockState = BlockState.BLOCK_IFRAME_NAV;
} else {
frameUrl = frame.url();
blockState = BlockState.BLOCK_PAGE_NAV;
}
} else {
frameUrl = frame ? frame.url() : "";
blockState = BlockState.BLOCK_OTHER;
}
// ignore initial page
if (frameUrl === "about:blank") {
return BlockState.ALLOW;
}
// always allow special pywb proxy script
for (const allowUrl of ALWAYS_ALLOW) {
if (url.startsWith(allowUrl)) {
return BlockState.ALLOW;
}
}
for (const rule of this.rules) {
const {done, block} = await this.ruleCheck(rule, request, url, frameUrl, isNavReq, logDetails);
if (block) {
if (blockState === BlockState.BLOCK_PAGE_NAV) {
logger.warn("Block rule match for page request ignored, set --exclude to block full pages", {url, ...logDetails}, "blocking");
return BlockState.ALLOW;
}
logger.debug("URL Blocked in iframe", {url, frameUrl, ...logDetails}, "blocking");
await this.recordBlockMsg(url);
return blockState;
}
if (done) {
break;
}
}
return BlockState.ALLOW;
}
async ruleCheck(rule, request, reqUrl, frameUrl, isNavReq, logDetails) {
const {url, inFrameUrl, frameTextMatch} = rule;
const type = rule.type || "block";
const allowOnly = (type === "allowOnly");
// not a frame match, skip rule
if (inFrameUrl && !frameUrl.match(inFrameUrl)) {
return {block: false, done: false};
}
const urlMatched = (url && reqUrl.match(url));
// if frame text-based rule: if url matched and a frame request
// frame text-based match: only applies to nav requests, never block otherwise
if (frameTextMatch) {
if (!urlMatched || !isNavReq) {
return {block: false, done: false};
}
const block = await this.isTextMatch(request, reqUrl, frameTextMatch, logDetails) ? !allowOnly : allowOnly;
logger.debug("URL Conditional rule in iframe", {...logDetails, url, rule: block ? "BLOCKED" : "ALLOWED", frameUrl}, "blocking");
return {block, done: true};
}
// for non frame text rule, simply match by URL
const block = urlMatched ? !allowOnly : allowOnly;
return {block, done: false};
}
async isTextMatch(request, reqUrl, frameTextMatch, logDetails) {
try {
const res = await fetch(reqUrl);
const text = await res.text();
return !!text.match(frameTextMatch);
} catch (e) {
logger.debug("Error determining text match", {...errJSON(e), ...logDetails}, "blocking");
}
}
async recordBlockMsg(url) {
if (this.blockedUrlSet.has(url)) {
return;
}
this.blockedUrlSet.add(url);
if (!this.blockErrMsg || !this.blockPutUrl) {
return;
}
const body = this.blockErrMsg;
const putUrl = new URL(this.blockPutUrl);
putUrl.searchParams.set("url", url);
await fetch(putUrl.href, {method: "PUT", headers: {"Content-Type": "text/html"}, body});
}
}
// ===========================================================================
export class AdBlockRules extends BlockRules
{
constructor(blockPutUrl, blockErrMsg, adhostsFilePath = "../ad-hosts.json") {
super([], blockPutUrl, blockErrMsg);
this.adhosts = JSON.parse(fs.readFileSync(new URL(adhostsFilePath, import.meta.url)));
}
isAdUrl(url) {
const fragments = url.split("/");
const domain = fragments.length > 2 ? fragments[2] : null;
return this.adhosts.includes(domain);
}
async shouldBlock(request, url, logDetails) {
if (this.isAdUrl(url)) {
logger.debug("URL blocked for being an ad", {url, ...logDetails}, "blocking");
await this.recordBlockMsg(url);
return BlockState.BLOCK_AD;
}
return BlockState.ALLOW;
}
}

View file

@ -1,416 +0,0 @@
import * as child_process from "child_process";
import fs from "fs";
import { pipeline } from "node:stream/promises";
import { Readable } from "node:stream";
import os from "os";
import path from "path";
import { logger } from "./logger.js";
import { initStorage } from "./storage.js";
import puppeteer from "puppeteer-core";
// ==================================================================
export class BaseBrowser
{
constructor() {
this.profileDir = fs.mkdtempSync(path.join(os.tmpdir(), "profile-"));
this.customProfile = false;
this.emulateDevice = null;
this.recorders = [];
}
async launch({profileUrl, chromeOptions, signals = false, headless = false, emulateDevice = {}, ondisconnect = null} = {}) {
if (this.isLaunched()) {
return;
}
if (profileUrl) {
this.customProfile = await this.loadProfile(profileUrl);
}
this.emulateDevice = emulateDevice;
const args = this.chromeArgs(chromeOptions);
let defaultViewport = null;
if (process.env.GEOMETRY) {
const geom = process.env.GEOMETRY.split("x");
defaultViewport = {width: Number(geom[0]), height: Number(geom[1])};
}
const launchOpts = {
args,
headless: headless ? "new" : false,
executablePath: this.getBrowserExe(),
ignoreDefaultArgs: ["--enable-automation", "--hide-scrollbars"],
ignoreHTTPSErrors: true,
handleSIGHUP: signals,
handleSIGINT: signals,
handleSIGTERM: signals,
protocolTimeout: 0,
defaultViewport,
waitForInitialPage: false,
userDataDir: this.profileDir
};
await this._init(launchOpts, ondisconnect);
}
async setupPage({page}) {
await this.addInitScript(page, "Object.defineProperty(navigator, \"webdriver\", {value: false});");
if (this.customProfile) {
logger.info("Disabling Service Workers for profile", {}, "browser");
await page.setBypassServiceWorker(true);
}
}
async loadProfile(profileFilename) {
const targetFilename = "/tmp/profile.tar.gz";
if (profileFilename &&
(profileFilename.startsWith("http:") || profileFilename.startsWith("https:"))) {
logger.info(`Downloading ${profileFilename} to ${targetFilename}`, {}, "browserProfile");
const resp = await fetch(profileFilename);
await pipeline(
Readable.fromWeb(resp.body),
fs.createWriteStream(targetFilename)
);
profileFilename = targetFilename;
} else if (profileFilename && profileFilename.startsWith("@")) {
const storage = initStorage();
if (!storage) {
logger.fatal("Profile specified relative to s3 storage, but no S3 storage defined");
}
await storage.downloadFile(profileFilename.slice(1), targetFilename);
profileFilename = targetFilename;
}
if (profileFilename) {
try {
child_process.execSync("tar xvfz " + profileFilename, {cwd: this.profileDir});
return true;
} catch (e) {
logger.error(`Profile filename ${profileFilename} not a valid tar.gz`);
}
}
return false;
}
saveProfile(profileFilename) {
child_process.execFileSync("tar", ["cvfz", profileFilename, "./"], {cwd: this.profileDir});
}
chromeArgs({proxy=true, userAgent=null, extraArgs=[]} = {}) {
// Chrome Flags, including proxy server
const args = [
// eslint-disable-next-line no-use-before-define
...defaultArgs,
...(process.env.CHROME_FLAGS ?? "").split(" ").filter(Boolean),
//"--no-xshm", // needed for Chrome >80 (check if puppeteer adds automatically)
"--no-sandbox",
"--disable-background-media-suspend",
"--remote-debugging-port=9221",
"--remote-allow-origins=*",
"--autoplay-policy=no-user-gesture-required",
"--disable-site-isolation-trials",
`--user-agent=${userAgent || this.getDefaultUA()}`,
...extraArgs,
];
if (proxy) {
args.push("--ignore-certificate-errors");
args.push(`--proxy-server=http://${process.env.PROXY_HOST}:${process.env.PROXY_PORT}`);
}
return args;
}
getDefaultUA() {
let version = process.env.BROWSER_VERSION;
try {
version = child_process.execFileSync(this.getBrowserExe(), ["--version"], {encoding: "utf8"});
version = version.match(/[\d.]+/)[0];
} catch(e) {
console.error(e);
}
return `Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/${version} Safari/537.36`;
}
getBrowserExe() {
const files = [process.env.BROWSER_BIN, "/usr/bin/google-chrome", "/usr/bin/chromium-browser"];
for (const file of files) {
if (file && fs.existsSync(file)) {
return file;
}
}
return null;
}
async evaluateWithCLI_(cdp, frame, cdpContextId, funcString, logData, contextName) {
const frameUrl = frame.url();
let details = {frameUrl, ...logData};
if (!frameUrl || frame.isDetached()) {
logger.info("Run Script Skipped, frame no longer attached or has no URL", details, contextName);
return false;
}
logger.info("Run Script Started", details, contextName);
// from puppeteer _evaluateInternal() but with includeCommandLineAPI: true
//const contextId = context._contextId;
const expression = funcString + "\n//# sourceURL=__evaluation_script__";
const { exceptionDetails, result } = await cdp
.send("Runtime.evaluate", {
expression,
contextId: cdpContextId,
returnByValue: true,
awaitPromise: true,
userGesture: true,
includeCommandLineAPI: true,
});
if (exceptionDetails) {
if (exceptionDetails.stackTrace) {
details = {...exceptionDetails.stackTrace, text: exceptionDetails.text, ...details};
}
logger.error("Run Script Failed", details, contextName);
} else {
logger.info("Run Script Finished", details, contextName);
}
return result.value;
}
}
// ==================================================================
export class Browser extends BaseBrowser
{
constructor() {
super();
this.browser = null;
this.firstCDP = null;
}
isLaunched() {
if (this.browser) {
logger.warn("Context already inited", {}, "browser");
return true;
}
return false;
}
async close() {
if (this.browser) {
this.browser.removeAllListeners("disconnected");
await this.browser.close();
this.browser = null;
}
}
addInitScript(page, script) {
return page.evaluateOnNewDocument(script);
}
async _init(launchOpts, ondisconnect = null) {
this.browser = await puppeteer.launch(launchOpts);
const target = this.browser.target();
this.firstCDP = await target.createCDPSession();
await this.serviceWorkerFetch();
if (ondisconnect) {
this.browser.on("disconnected", (err) => ondisconnect(err));
}
this.browser.on("disconnected", () => {
this.browser = null;
});
}
async newWindowPageWithCDP() {
// unique url to detect new pages
const startPage = "about:blank?_browsertrix" + Math.random().toString(36).slice(2);
const p = new Promise((resolve) => {
const listener = (target) => {
if (target.url() === startPage) {
resolve(target);
this.browser.removeListener("targetcreated", listener);
}
};
this.browser.on("targetcreated", listener);
});
try {
await this.firstCDP.send("Target.createTarget", {url: startPage, newWindow: true});
} catch (e) {
if (!this.browser) {
throw e;
}
const target = this.browser.target();
this.firstCDP = await target.createCDPSession();
await this.firstCDP.send("Target.createTarget", {url: startPage, newWindow: true});
}
const target = await p;
const page = await target.page();
const device = this.emulateDevice;
if (device) {
if (device.viewport && device.userAgent) {
await page.emulate(device);
} else if (device.userAgent) {
await page.setUserAgent(device.userAgent);
}
}
const cdp = await target.createCDPSession();
return {page, cdp};
}
async serviceWorkerFetch() {
this.firstCDP.on("Fetch.requestPaused", async (params) => {
const { frameId, requestId, networkId, request } = params;
if (networkId) {
try {
await this.firstCDP.send("Fetch.continueResponse", {requestId});
} catch (e) {
logger.warn("continueResponse failed", {url: request.url}, "recorder");
}
return;
}
let foundRecorder = null;
for (const recorder of this.recorders) {
if (recorder.swUrls.has(request.url)) {
//console.log(`*** found sw ${request.url} in recorder for worker ${recorder.workerid}`);
recorder.swFrameIds.add(frameId);
}
if (recorder.swFrameIds && recorder.swFrameIds.has(frameId)) {
foundRecorder = recorder;
break;
}
}
if (!foundRecorder) {
logger.warn("Skipping URL from unknown frame", {url: request.url, frameId}, "recorder");
try {
await this.firstCDP.send("Fetch.continueResponse", {requestId});
} catch (e) {
logger.warn("continueResponse failed", {url: request.url}, "recorder");
}
return;
}
await foundRecorder.handleRequestPaused(params, this.firstCDP, true);
});
await this.firstCDP.send("Fetch.enable", {patterns: [{urlPattern: "*", requestStage: "Response"}]});
}
async evaluateWithCLI(_, frame, cdp, funcString, logData, contextName) {
const context = await frame.executionContext();
cdp = context._client;
const cdpContextId = context._contextId;
return await this.evaluateWithCLI_(cdp, frame, cdpContextId, funcString, logData, contextName);
}
interceptRequest(page, callback) {
page.on("request", callback);
}
async waitForNetworkIdle(page, params) {
return await page.waitForNetworkIdle(params);
}
async setViewport(page, params) {
await page.setViewport(params);
}
async getCookies(page) {
return await page.cookies();
}
async setCookies(page, cookies) {
return await page.setCookie(...cookies);
}
}
// ==================================================================
// Default Chromium args from playwright
export const defaultArgs = [
"--disable-field-trial-config", // https://source.chromium.org/chromium/chromium/src/+/main:testing/variations/README.md
"--disable-background-networking",
"--enable-features=NetworkService,NetworkServiceInProcess",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-back-forward-cache", // Avoids surprises like main request not being intercepted during page.goBack().
"--disable-breakpad",
"--disable-client-side-phishing-detection",
"--disable-component-extensions-with-background-pages",
"--disable-component-update", // Avoids unneeded network activity after startup.
"--no-default-browser-check",
"--disable-default-apps",
"--disable-dev-shm-usage",
"--disable-extensions",
// AvoidUnnecessaryBeforeUnloadCheckSync - https://github.com/microsoft/playwright/issues/14047
// Translate - https://github.com/microsoft/playwright/issues/16126
// Optimization* - https://bugs.chromium.org/p/chromium/issues/detail?id=1311753
"--disable-features=ImprovedCookieControls,LazyFrameLoading,GlobalMediaControls,DestroyProfileOnBrowserClose,MediaRouter,DialMediaRouteProvider,AcceptCHFrame,AutoExpandDetailsElement,CertificateTransparencyComponentUpdater,AvoidUnnecessaryBeforeUnloadCheckSync,Translate,OptimizationGuideModelDownloading,OptimizationHintsFetching,OptimizationTargetPrediction,OptimizationHints",
"--allow-pre-commit-input",
"--disable-hang-monitor",
"--disable-ipc-flooding-protection",
"--disable-popup-blocking",
"--disable-prompt-on-repost",
"--disable-renderer-backgrounding",
"--disable-sync",
"--force-color-profile=srgb",
"--metrics-recording-only",
"--no-first-run",
"--enable-automation",
"--password-store=basic",
"--use-mock-keychain",
// See https://chromium-review.googlesource.com/c/chromium/src/+/2436773
"--no-service-autorun",
"--export-tagged-pdf",
"--apps-keep-chrome-alive-in-tests",
"--apps-gallery-url=https://invalid.webstore.example.com/",
"--apps-gallery-update-url=https://invalid.webstore.example.com/"
];

View file

@ -1,13 +0,0 @@
export const HTML_TYPES = ["text/html", "application/xhtml", "application/xhtml+xml"];
export const WAIT_UNTIL_OPTS = ["load", "domcontentloaded", "networkidle0", "networkidle2"];
export const BEHAVIOR_LOG_FUNC = "__bx_log";
export const ADD_LINK_FUNC = "__bx_addLink";
export const MAX_DEPTH = 1000000;
export const DEFAULT_SELECTORS = [{
selector: "a[href]",
extract: "href",
isAttribute: false
}];

View file

@ -1,33 +0,0 @@
import fs from "fs";
import path from "path";
const MAX_DEPTH = 2;
export function collectAllFileSources(fileOrDir, ext = null, depth = 0) {
const resolvedPath = path.resolve(fileOrDir);
if (depth >= MAX_DEPTH) {
console.warn(`WARN: MAX_DEPTH of ${MAX_DEPTH} reached traversing "${resolvedPath}"`);
return [];
}
const stat = fs.statSync(resolvedPath);
if (stat.isFile && (ext === null || path.extname(resolvedPath) === ext)) {
const contents = fs.readFileSync(resolvedPath);
return [`/* src: ${resolvedPath} */\n\n${contents}`];
}
if (stat.isDirectory) {
const files = fs.readdirSync(resolvedPath);
return files.reduce((acc, next) => {
const nextPath = path.join(fileOrDir, next);
return [...acc, ...collectAllFileSources(nextPath, ext, depth + 1)];
}, []);
}
if (depth === 0) {
console.warn(`WARN: The provided path "${resolvedPath}" is not a .js file or directory.`);
return [];
}
}

View file

@ -1,47 +0,0 @@
import http from "http";
import url from "url";
import { logger } from "./logger.js";
// ===========================================================================
export class HealthChecker
{
constructor(port, errorThreshold) {
this.port = port;
this.errorCount = 0;
this.errorThreshold = errorThreshold;
this.healthServer = http.createServer((...args) => this.healthCheck(...args));
logger.info(`Healthcheck server started on ${port}`, {}, "healthcheck");
this.healthServer.listen(port);
}
async healthCheck(req, res) {
const pathname = url.parse(req.url).pathname;
switch (pathname) {
case "/healthz":
if (this.errorCount < this.errorThreshold) {
logger.debug(`health check ok, num errors ${this.errorCount} < ${this.errorThreshold}`, {}, "healthcheck");
res.writeHead(200);
res.end();
}
return;
}
logger.error(`health check failed: ${this.errorCount} >= ${this.errorThreshold}`, {}, "healthcheck");
res.writeHead(503);
res.end();
}
resetErrors() {
if (this.errorCount > 0) {
logger.info(`Page loaded, resetting error count ${this.errorCount} to 0`, {}, "healthcheck");
this.errorCount = 0;
}
}
incError() {
this.errorCount++;
}
}

View file

@ -1,120 +0,0 @@
// ===========================================================================
// to fix serialization of regexes for logging purposes
RegExp.prototype.toJSON = RegExp.prototype.toString;
// ===========================================================================
export function errJSON(e) {
return {"type": "exception", "message": e.message, "stack": e.stack};
}
// ===========================================================================
class Logger
{
constructor() {
this.logStream = null;
this.debugLogging = null;
this.logErrorsToRedis = false;
this.logLevels = [];
this.contexts = [];
this.crawlState = null;
}
setExternalLogStream(logFH) {
this.logStream = logFH;
}
setDebugLogging(debugLog) {
this.debugLogging = debugLog;
}
setLogErrorsToRedis(logErrorsToRedis) {
this.logErrorsToRedis = logErrorsToRedis;
}
setLogLevel(logLevels) {
this.logLevels = logLevels;
}
setContext(contexts) {
this.contexts = contexts;
}
setCrawlState(crawlState) {
this.crawlState = crawlState;
}
logAsJSON(message, data, context, logLevel="info") {
if (data instanceof Error) {
data = errJSON(data);
} else if (typeof data !== "object") {
data = {"message": data.toString()};
}
if (this.logLevels.length) {
if (this.logLevels.indexOf(logLevel) < 0) {
return;
}
}
if (this.contexts.length) {
if (this.contexts.indexOf(context) < 0) {
return;
}
}
let dataToLog = {
"timestamp": new Date().toISOString(),
"logLevel": logLevel,
"context": context,
"message": message,
"details": data ? data : {}
};
const string = JSON.stringify(dataToLog);
console.log(string);
if (this.logStream) {
this.logStream.write(string + "\n");
}
const toLogToRedis = ["error", "fatal"];
if (this.logErrorsToRedis && toLogToRedis.includes(logLevel)) {
this.crawlState.logError(string);
}
}
info(message, data={}, context="general") {
this.logAsJSON(message, data, context);
}
error(message, data={}, context="general") {
this.logAsJSON(message, data, context, "error");
}
warn(message, data={}, context="general") {
this.logAsJSON(message, data, context, "warn");
}
debug(message, data={}, context="general") {
if (this.debugLogging) {
this.logAsJSON(message, data, context, "debug");
}
}
fatal(message, data={}, context="general", exitCode=17) {
this.logAsJSON(`${message}. Quitting`, data, context, "fatal");
async function markFailedAndEnd(crawlState) {
await crawlState.setStatus("failed");
await crawlState.setEndTime();
}
if (this.crawlState) {
markFailedAndEnd(this.crawlState).finally(process.exit(exitCode));
} else {
process.exit(exitCode);
}
}
}
export const logger = new Logger();

View file

@ -1,59 +0,0 @@
import { errJSON, logger } from "./logger.js";
export class OriginOverride
{
constructor(originOverride) {
this.originOverride = originOverride.map((override) => {
let [orig, dest] = override.split("=");
const origUrl = new URL(orig);
const destUrl = new URL(dest);
return {origUrl, destUrl};
});
}
async initPage(browser, page) {
const onRequest = async (request) => {
try {
const url = request.url();
let newUrl = null;
let orig = null;
for (const {origUrl, destUrl} of this.originOverride) {
if (url.startsWith(origUrl.origin)) {
newUrl = destUrl.origin + url.slice(origUrl.origin.length);
orig = origUrl;
break;
}
}
if (!newUrl) {
request.continue({}, -1);
return;
}
const headers = new Headers(request.headers());
headers.set("host", orig.host);
if (headers.get("origin")) {
headers.set("origin", orig.origin);
}
const resp = await fetch(newUrl, {headers});
const body = Buffer.from(await resp.arrayBuffer());
const respHeaders = Object.fromEntries(resp.headers);
const status = resp.status;
logger.debug("Origin overridden", {orig: url, dest: newUrl, status, body: body.length}, "originoverride");
request.respond({body, headers: respHeaders, status}, -1);
} catch (e) {
logger.warn("Error overriding origin", {...errJSON(e), url: page.url()}, "originoverride");
request.continue({}, -1);
}
};
await browser.interceptRequest(page, onRequest);
}
}

View file

@ -1,952 +0,0 @@
import fs from "fs";
import path from "path";
import os from "os";
import { v4 as uuidv4 } from "uuid";
import PQueue from "p-queue";
import { logger, errJSON } from "./logger.js";
import { sleep, timestampNow } from "./timing.js";
import { RequestResponseInfo } from "./reqresp.js";
import { baseRules as baseDSRules } from "@webrecorder/wabac/src/rewrite/index.js";
import { rewriteDASH, rewriteHLS } from "@webrecorder/wabac/src/rewrite/rewriteVideo.js";
import { WARCRecord } from "warcio";
import { WARCSerializer } from "warcio/node";
import { WARCWriter } from "./warcwriter.js";
const MAX_BROWSER_FETCH_SIZE = 2_000_000;
const MAX_NETWORK_LOAD_SIZE = 200_000_000;
const ASYNC_FETCH_DUPE_KEY = "s:fetchdupe";
const WRITE_DUPE_KEY = "s:writedupe";
const encoder = new TextEncoder();
// =================================================================
function logNetwork(/*msg, data*/) {
// logger.debug(msg, data, "recorderNetwork");
}
// =================================================================
export class Recorder
{
constructor({workerid, collDir, crawler}) {
this.workerid = workerid;
this.crawler = crawler;
this.crawlState = crawler.crawlState;
this.warcQ = new PQueue({concurrency: 1});
this.fetcherQ = new PQueue({concurrency: 1});
this.pendingRequests = null;
this.skipIds = null;
this.swSessionId = null;
this.swFrameIds = new Set();
this.swUrls = new Set();
this.logDetails = {};
this.skipping = false;
this.allowFull206 = true;
this.collDir = collDir;
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(this.collDir, "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");
fs.mkdirSync(this.tempdir, {recursive: true});
fs.mkdirSync(this.archivesDir, {recursive: true});
fs.mkdirSync(this.tempCdxDir, {recursive: true});
const crawlId = process.env.CRAWL_ID || os.hostname();
const filename = `rec-${crawlId}-${timestampNow()}-${this.workerid}.warc`;
this.gzip = true;
this.writer = new WARCWriter({
archivesDir: this.archivesDir,
tempCdxDir: this.tempCdxDir,
filename,
gzip: this.gzip,
logDetails: this.logDetails
});
}
async onCreatePage({cdp}) {
// Fetch
cdp.on("Fetch.requestPaused", async (params) => {
this.handleRequestPaused(params, cdp);
});
await cdp.send("Fetch.enable", {patterns: [{urlPattern: "*", requestStage: "Response"}]});
// Response
cdp.on("Network.responseReceived", (params) => {
// handling to fill in security details
logNetwork("Network.responseReceived", {requestId: params.requestId, ...this.logDetails});
this.handleResponseReceived(params);
});
cdp.on("Network.responseReceivedExtraInfo", (params) => {
logNetwork("Network.responseReceivedExtraInfo", {requestId: params.requestId, ...this.logDetails});
const reqresp = this.pendingReqResp(params.requestId, true);
if (reqresp) {
reqresp.fillResponseReceivedExtraInfo(params);
}
});
// Request
cdp.on("Network.requestWillBeSent", (params) => {
// only handling redirect here, committing last response in redirect chain
// request data stored from requestPaused
if (params.redirectResponse) {
logNetwork("Network.requestWillBeSent after redirect", {requestId: params.requestId, ...this.logDetails});
this.handleRedirectResponse(params);
}
});
cdp.on("Network.requestServedFromCache", (params) => {
logNetwork("Network.requestServedFromCache", {requestId: params.requestId, ...this.logDetails});
this.removeReqResp(params.requestId);
});
cdp.on("Network.requestWillBeSentExtraInfo", (params) => {
logNetwork("Network.requestWillBeSentExtraInfo", {requestId: params.requestId, ...this.logDetails});
this.handleRequestExtraInfo(params);
});
// Loading
cdp.on("Network.loadingFinished", (params) => {
logNetwork("Network.loadingFinished", {requestId: params.requestId, ...this.logDetails});
this.handleLoadingFinished(params);
});
cdp.on("Network.loadingFailed", (params) => {
logNetwork("Network.loadingFailed", {requestId: params.requestId, ...this.logDetails});
this.handleLoadingFailed(params);
});
await cdp.send("Network.enable");
// Target
cdp.on("Target.attachedToTarget", async (params) => {
const { url, type, sessionId } = params.targetInfo;
if (type === "service_worker") {
this.swSessionId = sessionId;
this.swUrls.add(url);
}
});
cdp.on("Target.detachedFromTarget", async (params) => {
const { sessionId } = params;
if (this.swSessionId && sessionId === this.swSessionId) {
this.swUrls.clear();
this.swFrameIds.clear();
this.swSessionId = null;
}
});
await cdp.send("Target.setAutoAttach", {autoAttach: true, waitForDebuggerOnStart: false, flatten: true});
}
handleResponseReceived(params) {
const { requestId, response } = params;
const reqresp = this.pendingReqResp(requestId);
if (!reqresp) {
return;
}
reqresp.fillResponse(response);
}
handleRequestExtraInfo(params) {
if (!this.shouldSkip(params.headers)) {
const reqresp = this.pendingReqResp(params.requestId, true);
if (reqresp) {
reqresp.fillRequestExtraInfo(params);
}
}
}
handleRedirectResponse(params) {
const { requestId, redirectResponse } = params;
// remove and serialize, but allow reusing requestId
// as redirect chain may reuse same requestId for subsequent request
const reqresp = this.removeReqResp(requestId, true);
if (!reqresp) {
return;
}
reqresp.fillResponse(redirectResponse);
if (reqresp.isSelfRedirect()) {
logger.warn("Skipping self redirect", {url: reqresp. url, status: reqresp.status, ...this.logDetails}, "recorder");
return;
}
this.serializeToWARC(reqresp);
}
handleLoadingFailed(params) {
const { errorText, type, requestId } = params;
const reqresp = this.pendingReqResp(requestId, true);
if (!reqresp) {
return;
}
const { url } = reqresp;
switch (errorText) {
case "net::ERR_BLOCKED_BY_CLIENT":
logNetwork("Request blocked", {url, errorText, ...this.logDetails}, "recorder");
break;
case "net::ERR_ABORTED":
// check if this is a false positive -- a valid download that's already been fetched
// the abort is just for page, but download will succeed
if (url && type === "Document" && reqresp.isValidBinary()) {
this.serializeToWARC(reqresp);
//} else if (url) {
} else if (url && reqresp.requestHeaders && reqresp.requestHeaders["x-browsertrix-fetch"]) {
delete reqresp.requestHeaders["x-browsertrix-fetch"];
logger.warn("Attempt direct fetch of failed request", {url, ...this.logDetails}, "recorder");
const fetcher = new AsyncFetcher({tempdir: this.tempdir, reqresp, recorder: this, networkId: requestId});
this.fetcherQ.add(() => fetcher.load());
return;
}
break;
default:
logger.warn("Request failed", {url, errorText, ...this.logDetails}, "recorder");
}
this.removeReqResp(requestId);
}
handleLoadingFinished(params) {
const reqresp = this.pendingReqResp(params.requestId, true);
if (!reqresp || reqresp.asyncLoading) {
return;
}
this.removeReqResp(params.requestId);
if (!this.isValidUrl(reqresp.url)) {
return;
}
this.serializeToWARC(reqresp);
}
async handleRequestPaused(params, cdp, isSWorker = false) {
const { requestId, request, responseStatusCode, responseErrorReason, resourceType, networkId } = params;
const { method, headers, url } = request;
logNetwork("Fetch.requestPaused", {requestId, networkId, url, ...this.logDetails});
let continued = false;
try {
if (responseStatusCode && !responseErrorReason && !this.shouldSkip(headers, url, method, resourceType) && !(isSWorker && networkId)) {
continued = await this.handleFetchResponse(params, cdp, isSWorker);
}
} catch (e) {
logger.error("Error handling response, probably skipping URL", {url, ...errJSON(e), ...this.logDetails}, "recorder");
}
if (!continued) {
try {
await cdp.send("Fetch.continueResponse", {requestId});
} catch (e) {
logger.debug("continueResponse failed", {requestId, networkId, url, ...errJSON(e), ...this.logDetails}, "recorder");
}
}
}
async handleFetchResponse(params, cdp, isSWorker) {
const { request } = params;
const { url } = request;
const {requestId, responseErrorReason, responseStatusCode, responseHeaders} = params;
const networkId = params.networkId || requestId;
if (responseErrorReason) {
logger.warn("Skipping failed response", {url, reason: responseErrorReason, ...this.logDetails}, "recorder");
return false;
}
const contentLen = this._getContentLen(responseHeaders);
if (responseStatusCode === 206) {
const range = this._getContentRange(responseHeaders);
if (this.allowFull206 && range === `bytes 0-${contentLen - 1}/${contentLen}`) {
logger.debug("Keep 206 Response, Full Range", {range, contentLen, url, networkId, ...this.logDetails}, "recorder");
} else {
logger.debug("Skip 206 Response", {range, contentLen, url, ...this.logDetails}, "recorder");
this.removeReqResp(networkId);
return false;
}
}
const reqresp = this.pendingReqResp(networkId);
if (!reqresp) {
return false;
}
reqresp.fillFetchRequestPaused(params);
if (this.noResponseForStatus(responseStatusCode)) {
reqresp.payload = new Uint8Array();
if (isSWorker) {
this.removeReqResp(networkId);
await this.serializeToWARC(reqresp);
}
return false;
}
let streamingConsume = false;
if (contentLen < 0 || contentLen > MAX_BROWSER_FETCH_SIZE) {
const opts = {tempdir: this.tempdir, reqresp, expectedSize: contentLen, recorder: this, networkId, cdp};
// fetching using response stream, await here and then either call fulFill, or if not started, return false
if (contentLen < 0) {
const fetcher = new ResponseStreamAsyncFetcher({...opts, requestId, cdp });
const res = await fetcher.load();
switch (res) {
case "dupe":
this.removeReqResp(networkId);
return false;
case "fetched":
streamingConsume = true;
break;
}
}
// if not consumed via takeStream, attempt async loading
if (!streamingConsume) {
let fetcher = null;
if (reqresp.method !== "GET" || contentLen > MAX_NETWORK_LOAD_SIZE) {
fetcher = new AsyncFetcher(opts);
} else {
fetcher = new NetworkLoadStreamAsyncFetcher(opts);
}
this.fetcherQ.add(() => fetcher.load());
return false;
}
} else {
try {
logNetwork("Fetching response", {sizeExpected: this._getContentLen(responseHeaders), url, networkId, ...this.logDetails});
const { body, base64Encoded } = await cdp.send("Fetch.getResponseBody", {requestId});
reqresp.payload = Buffer.from(body, base64Encoded ? "base64" : "utf-8");
logNetwork("Fetch done", {size: reqresp.payload.length, url, networkId, ...this.logDetails});
} catch (e) {
logger.warn("Failed to load response body", {url, networkId, ...errJSON(e), ...this.logDetails}, "recorder");
return false;
}
}
const rewritten = await this.rewriteResponse(reqresp);
// if in service worker, serialize here
// as won't be getting a loadingFinished message
if (isSWorker && reqresp.payload) {
this.removeReqResp(networkId);
await this.serializeToWARC(reqresp);
}
// not rewritten, and not streaming, return false to continue
if (!rewritten && !streamingConsume) {
if (!reqresp.payload) {
logger.error("Unable to get payload skipping recording", {url, ...this.logDetails}, "recorder");
this.removeReqResp(networkId);
}
return false;
}
// if has payload, encode it, otherwise return empty string
const body = reqresp.payload && reqresp.payload.length ? Buffer.from(reqresp.payload).toString("base64") : "";
try {
await cdp.send("Fetch.fulfillRequest", {
requestId,
responseCode: responseStatusCode,
responseHeaders,
body
});
} catch (e) {
const type = reqresp.type;
if (type === "Document") {
logger.debug("document not loaded in browser, possibly other URLs missing", {url, type: reqresp.resourceType}, "recorder");
} else {
logger.debug("URL not loaded in browser", {url, type: reqresp.resourceType}, "recorder");
}
}
return true;
}
startPage({pageid, url}) {
this.pageid = pageid;
this.logDetails = {page: url, workerid: this.workerid};
// if (this.pendingRequests && this.pendingRequests.size) {
// logger.warn("Interrupting timed out requests, moving to next page", this.logDetails, "recorder");
// }
this.pendingRequests = new Map();
this.skipIds = new Set();
this.skipping = false;
}
async finishPage() {
//this.skipping = true;
for (const [requestId, reqresp] of this.pendingRequests.entries()) {
if (reqresp.payload) {
this.removeReqResp(requestId);
await this.serializeToWARC(reqresp);
// no url, likely invalid
} else if (!reqresp.url) {
this.removeReqResp(requestId);
}
}
let numPending = this.pendingRequests.size;
while (numPending && !this.crawler.interrupted) {
const pending = [];
for (const [requestId, reqresp] of this.pendingRequests.entries()) {
const url = reqresp.url;
const entry = {requestId, url};
if (reqresp.expectedSize) {
entry.expectedSize = reqresp.expectedSize;
}
if (reqresp.readSize) {
entry.readSize = reqresp.readSize;
}
pending.push(entry);
}
logger.debug("Finishing pending requests for page", {numPending, pending, ...this.logDetails}, "recorder");
await sleep(5.0);
numPending = this.pendingRequests.size;
}
}
async onClosePage() {
}
async onDone() {
await this.crawlState.setStatus("pending-wait");
logger.debug("Finishing Fetcher Queue", this.logDetails, "recorder");
await this.fetcherQ.onIdle();
logger.debug("Finishing WARC writing", this.logDetails, "recorder");
await this.warcQ.onIdle();
await this.writer.flush();
}
shouldSkip(headers, url, method, resourceType) {
if (headers && !method) {
method = headers[":method"];
}
if (!this.isValidUrl(url)) {
return true;
}
if (method === "OPTIONS" || method === "HEAD") {
return true;
}
if (["EventSource", "WebSocket", "Ping"].includes(resourceType)) {
return true;
}
// beacon
if (resourceType === "Other" && method === "POST") {
return true;
}
// skip eventsource, resourceType may not be set correctly
if (headers && (headers["accept"] === "text/event-stream" || headers["Accept"] === "text/event-stream")) {
return true;
}
return false;
}
async rewriteResponse(reqresp) {
const { url, responseHeadersList, extraOpts, payload } = reqresp;
if (!payload || !payload.length) {
return false;
}
let newString = null;
let string = null;
const ct = this._getContentType(responseHeadersList);
switch (ct) {
case "application/x-mpegURL":
case "application/vnd.apple.mpegurl":
string = payload.toString("utf-8");
newString = rewriteHLS(string, {save: extraOpts});
break;
case "application/dash+xml":
string = payload.toString("utf-8");
newString = rewriteDASH(string, {save: extraOpts});
break;
case "text/html":
case "application/json":
case "text/javascript":
case "application/javascript":
case "application/x-javascript": {
const rw = baseDSRules.getRewriter(url);
if (rw !== baseDSRules.defaultRewriter) {
string = payload.toString("utf-8");
newString = rw.rewrite(string, {live: true, save: extraOpts});
}
break;
}
}
if (!newString) {
return false;
}
if (newString !== string) {
extraOpts.rewritten = 1;
logger.debug("Content Rewritten", {url, ...this.logDetails}, "recorder");
reqresp.payload = encoder.encode(newString);
return true;
} else {
return false;
}
//return Buffer.from(newString).toString("base64");
}
_getContentType(headers) {
for (let header of headers) {
if (header.name.toLowerCase() === "content-type") {
return header.value.split(";")[0];
}
}
return null;
}
_getContentLen(headers) {
for (let header of headers) {
if (header.name.toLowerCase() === "content-length") {
return Number(header.value);
}
}
return -1;
}
_getContentRange(headers) {
for (let header of headers) {
if (header.name.toLowerCase() === "content-range") {
return header.value;
}
}
return null;
}
noResponseForStatus(status) {
return (!status || status === 204 || (status >= 300 && status < 400));
}
isValidUrl(url) {
return url && (url.startsWith("https:") || url.startsWith("http:"));
}
pendingReqResp(requestId, reuseOnly = false) {
if (!this.pendingRequests.has(requestId)) {
if (reuseOnly || !requestId) {
return null;
}
if (this.skipIds.has(requestId)) {
logNetwork("Skipping ignored id", {requestId});
return null;
}
if (this.skipping) {
//logger.debug("Skipping request, page already finished", this.logDetails, "recorder");
return null;
}
const reqresp = new RequestResponseInfo(requestId);
this.pendingRequests.set(requestId, reqresp);
return reqresp;
} else {
const reqresp = this.pendingRequests.get(requestId);
if (requestId !== reqresp.requestId) {
logger.warn("Invalid request id", {requestId, actualRequestId: reqresp.requestId}, "recorder");
}
return reqresp;
}
}
removeReqResp(requestId, allowReuse=false) {
const reqresp = this.pendingRequests.get(requestId);
this.pendingRequests.delete(requestId);
if (!allowReuse) {
this.skipIds.add(requestId);
}
return reqresp;
}
async serializeToWARC(reqresp) {
if (!reqresp.payload) {
logNetwork("Not writing, no payload", {url: reqresp.url});
return;
}
if (reqresp.method === "GET" && !await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, reqresp.url)) {
logNetwork("Skipping dupe", {url: reqresp.url});
return;
}
const responseRecord = createResponse(reqresp, this.pageid);
const requestRecord = createRequest(reqresp, responseRecord, this.pageid);
this.warcQ.add(() => this.writer.writeRecordPair(responseRecord, requestRecord));
}
async directFetchCapture(url) {
const reqresp = new RequestResponseInfo(0);
reqresp.url = url;
reqresp.method = "GET";
logger.debug("Directly fetching page URL without browser", {url, ...this.logDetails}, "recorder");
const filter = (resp) => resp.status === 200 && !resp.headers.get("set-cookie");
// ignore dupes: if previous URL was not a page, still load as page. if previous was page,
// should not get here, as dupe pages tracked via seen list
const fetcher = new AsyncFetcher({tempdir: this.tempdir, reqresp, recorder: this, networkId: 0, filter, ignoreDupe: true});
const res = await fetcher.load();
const mime = reqresp && reqresp.responseHeaders["content-type"] && reqresp.responseHeaders["content-type"].split(";")[0];
return {fetched: res === "fetched", mime};
}
}
// =================================================================
class AsyncFetcher
{
constructor({tempdir, reqresp, expectedSize = -1, recorder, networkId, filter = null, ignoreDupe = false}) {
//super();
this.reqresp = reqresp;
this.reqresp.expectedSize = expectedSize;
this.reqresp.asyncLoading = true;
this.networkId = networkId;
this.filter = filter;
this.ignoreDupe = ignoreDupe;
this.recorder = recorder;
this.tempdir = tempdir;
this.filename = path.join(this.tempdir, `${timestampNow()}-${uuidv4()}.data`);
}
async load() {
const { reqresp, recorder, networkId, filename } = this;
const { url } = reqresp;
const { pageid, crawlState, gzip, logDetails } = recorder;
let fetched = "notfetched";
try {
if (reqresp.method === "GET" && !await crawlState.addIfNoDupe(ASYNC_FETCH_DUPE_KEY, url)) {
if (!this.ignoreDupe) {
this.reqresp.asyncLoading = false;
return "dupe";
}
}
const body = await this._doFetch();
fetched = "fetched";
const responseRecord = createResponse(reqresp, pageid, body);
const requestRecord = createRequest(reqresp, responseRecord, pageid);
const serializer = new WARCSerializer(responseRecord, {gzip, maxMemSize: MAX_BROWSER_FETCH_SIZE});
try {
let readSize = await serializer.digestRecord();
if (serializer.httpHeadersBuff) {
readSize -= serializer.httpHeadersBuff.length;
}
reqresp.readSize = readSize;
} catch (e) {
logger.error("Error reading + digesting payload", {url, filename, ...errJSON(e), ...logDetails}, "recorder");
}
if (reqresp.readSize === reqresp.expectedSize || reqresp.expectedSize < 0) {
logger.debug("Async fetch: streaming done", {size: reqresp.readSize, expected: reqresp.expectedSize, networkId, url, ...logDetails}, "recorder");
} else {
logger.warn("Async fetch: possible response size mismatch", {size: reqresp.readSize, expected: reqresp.expectedSize, url, ...logDetails}, "recorder");
//await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
//return fetched;
}
const externalBuffer = serializer.externalBuffer;
if (externalBuffer) {
const { currSize, buffers, fh } = externalBuffer;
if (buffers && buffers.length && !fh) {
reqresp.payload = Buffer.concat(buffers, currSize);
externalBuffer.buffers = [reqresp.payload];
}
}
if (Object.keys(reqresp.extraOpts).length) {
responseRecord.warcHeaders["WARC-JSON-Metadata"] = JSON.stringify(reqresp.extraOpts);
}
recorder.warcQ.add(() => recorder.writer.writeRecordPair(responseRecord, requestRecord, serializer));
} catch (e) {
logger.error("Streaming Fetch Error", {url, networkId, filename, ...errJSON(e), ...logDetails}, "recorder");
await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
} finally {
recorder.removeReqResp(networkId);
}
return fetched;
}
async _doFetch() {
const { reqresp } = this;
const { method, url } = reqresp;
logger.debug("Async started: fetch", {url}, "recorder");
const headers = reqresp.getRequestHeadersDict();
let signal = null;
let abort = null;
if (this.filter) {
abort = new AbortController();
signal = abort.signal;
}
const resp = await fetch(url, {method, headers, body: reqresp.postData || undefined, signal});
if (this.filter && !this.filter(resp)) {
abort.abort();
throw new Error("invalid response, ignoring fetch");
}
if (reqresp.expectedSize < 0 && resp.headers.get("content-length") && !resp.headers.get("content-encoding")) {
reqresp.expectedSize = Number(resp.headers.get("content-length") || -1);
}
if (reqresp.expectedSize === 0) {
reqresp.payload = new Uint8Array();
return;
} else if (!resp.body) {
logger.error("Empty body, stopping fetch", {url}, "recorder");
await this.recorder.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
return;
}
reqresp.fillFetchResponse(resp);
return this.takeReader(resp.body.getReader());
}
async* takeReader(reader) {
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
yield value;
}
} catch (e) {
logger.warn("takeReader interrupted", {...errJSON(e), url: this.reqresp.url, ...this.recorder.logDetails}, "recorder");
this.reqresp.truncated = "disconnect";
}
}
async* takeStreamIter(cdp, stream) {
try {
while (true) {
const {data, base64Encoded, eof} = await cdp.send("IO.read", {handle: stream});
const buff = Buffer.from(data, base64Encoded ? "base64" : "utf-8");
yield buff;
if (eof) {
break;
}
}
} catch (e) {
logger.warn("takeStream interrupted", {...errJSON(e), url: this.reqresp.url, ...this.recorder.logDetails}, "recorder");
this.reqresp.truncated = "disconnect";
}
}
}
// =================================================================
class ResponseStreamAsyncFetcher extends AsyncFetcher
{
constructor(opts) {
super(opts);
this.cdp = opts.cdp;
this.requestId = opts.requestId;
}
async _doFetch() {
const { requestId, reqresp, cdp } = this;
const { url } = reqresp;
logger.debug("Async started: takeStream", {url}, "recorder");
const { stream } = await cdp.send("Fetch.takeResponseBodyAsStream", {requestId});
return this.takeStreamIter(cdp, stream);
}
}
// =================================================================
class NetworkLoadStreamAsyncFetcher extends AsyncFetcher
{
constructor(opts) {
super(opts);
this.cdp = opts.cdp;
}
async _doFetch() {
const { reqresp, cdp } = this;
const { url } = reqresp;
logger.debug("Async started: loadNetworkResource", {url}, "recorder");
const options = {disableCache: false, includeCredentials: true};
let result = null;
try {
result = await cdp.send("Network.loadNetworkResource", {frameId: reqresp.frameId, url, options});
} catch (e) {
logger.debug("Network.loadNetworkResource failed, attempting node fetch", {url, ...errJSON(e), ...this.recorder.logDetails}, "recorder");
return await super._doFetch();
}
const { stream, headers, httpStatusCode, success, netError, netErrorName } = result.resource;
if (!success || !stream) {
//await this.recorder.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
logger.debug("Network.loadNetworkResource failed, attempting node fetch", {url, netErrorName, netError, httpStatusCode, ...this.recorder.logDetails}, "recorder");
return await super._doFetch();
}
if (reqresp.expectedSize < 0 && headers && headers["content-length"] && !headers["content-encoding"]) {
reqresp.expectedSize = Number(headers["content-length"] || -1);
}
if (reqresp.expectedSize === 0) {
reqresp.payload = new Uint8Array();
return;
}
reqresp.status = httpStatusCode;
reqresp.responseHeaders = headers || {};
return this.takeStreamIter(cdp, stream);
}
}
// =================================================================
// response
function createResponse(reqresp, pageid, contentIter) {
const url = reqresp.url;
const warcVersion = "WARC/1.1";
const statusline = `HTTP/1.1 ${reqresp.status} ${reqresp.statusText}`;
const date = new Date().toISOString();
const httpHeaders = reqresp.getResponseHeadersDict(reqresp.payload ? reqresp.payload.length : null);
const warcHeaders = {
"WARC-Page-ID": pageid,
};
if (reqresp.truncated) {
warcHeaders["WARC-Truncated"] = reqresp.truncated;
}
if (!contentIter) {
contentIter = [reqresp.payload];
}
if (Object.keys(reqresp.extraOpts).length) {
warcHeaders["WARC-JSON-Metadata"] = JSON.stringify(reqresp.extraOpts);
}
return WARCRecord.create({
url, date, warcVersion, type: "response", warcHeaders,
httpHeaders, statusline}, contentIter);
}
// =================================================================
// request
function createRequest(reqresp, responseRecord, pageid) {
const url = reqresp.url;
const warcVersion = "WARC/1.1";
const method = reqresp.method;
const urlParsed = new URL(url);
const statusline = `${method} ${url.slice(urlParsed.origin.length)} HTTP/1.1`;
const requestBody = reqresp.postData ? [encoder.encode(reqresp.postData)] : [];
const httpHeaders = reqresp.getRequestHeadersDict();
const warcHeaders = {
"WARC-Concurrent-To": responseRecord.warcHeader("WARC-Record-ID"),
"WARC-Page-ID": pageid,
};
const date = responseRecord.warcDate;
return WARCRecord.create({
url, date, warcVersion, type: "request", warcHeaders,
httpHeaders, statusline}, requestBody);
}

View file

@ -1,40 +0,0 @@
import Redis from "ioredis";
import { logger } from "./logger.js";
const error = console.error;
let lastLogTime = 0;
let exitOnError = false;
// log only once every 10 seconds
const REDIS_ERROR_LOG_INTERVAL_SECS = 10000;
console.error = function (...args) {
if (
typeof args[0] === "string" &&
args[0].indexOf("[ioredis] Unhandled error event") === 0
) {
let now = Date.now();
if ((now - lastLogTime) > REDIS_ERROR_LOG_INTERVAL_SECS) {
if (lastLogTime && exitOnError) {
logger.fatal("Crawl interrupted, redis gone, exiting", {}, "redis");
}
logger.warn("ioredis error", {error: args[0]}, "redis");
lastLogTime = now;
}
return;
}
error.call(console, ...args);
};
export async function initRedis(url) {
const redis = new Redis(url, {lazyConnect: true});
await redis.connect();
return redis;
}
export function setExitOnRedisError() {
exitOnError = true;
}

View file

@ -1,240 +0,0 @@
import { getStatusText } from "@webrecorder/wabac/src/utils.js";
const CONTENT_LENGTH = "content-length";
const CONTENT_TYPE = "content-type";
const EXCLUDE_HEADERS = ["content-encoding", "transfer-encoding"];
// ===========================================================================
export class RequestResponseInfo
{
constructor(requestId) {
this._created = new Date();
this.requestId = requestId;
this.ts = null;
// request data
this.method = null;
this.url = null;
this.protocol = "HTTP/1.1";
this.requestHeaders = null;
this.requestHeadersText = null;
this.postData = null;
this.hasPostData = false;
// response data
this.status = 0;
this.statusText = null;
this.responseHeaders = null;
this.responseHeadersList = null;
this.responseHeadersText = null;
this.payload = null;
this.fromServiceWorker = false;
this.fetch = false;
this.resourceType = null;
this.extraOpts = {};
this.readSize = 0;
this.expectedSize = 0;
// set to true to indicate async loading in progress
this.asyncLoading = false;
// set to add truncated message
this.truncated = null;
}
fillRequest(params) {
this.url = params.request.url;
this.method = params.request.method;
if (!this.requestHeaders) {
this.requestHeaders = params.request.headers;
}
this.postData = params.request.postData;
this.hasPostData = params.request.hasPostData;
if (params.type) {
this.resourceType = params.type;
}
//this.loaderId = params.loaderId;
}
fillFetchRequestPaused(params) {
this.fillRequest(params);
this.status = params.responseStatusCode;
this.statusText = params.responseStatusText || getStatusText(this.status);
this.responseHeadersList = params.responseHeaders;
this.fetch = true;
this.resourceType = params.resourceType;
this.frameId = params.frameId;
}
fillResponse(response) {
// if initial fetch was a 200, but now replacing with 304, don't!
if (response.status == 304 && this.status && this.status != 304 && this.url) {
return;
}
this.url = response.url.split("#")[0];
this.status = response.status;
this.statusText = response.statusText || getStatusText(this.status);
this.protocol = response.protocol;
if (response.requestHeaders) {
this.requestHeaders = response.requestHeaders;
}
if (response.requestHeadersText) {
this.requestHeadersText = response.requestHeadersText;
}
this.responseHeaders = response.headers;
if (response.headersText) {
this.responseHeadersText = response.headersText;
}
this.fromServiceWorker = !!response.fromServiceWorker;
if (response.securityDetails) {
const issuer = response.securityDetails.issuer || "";
const ctc = response.securityDetails.certificateTransparencyCompliance === "compliant" ? "1" : "0";
this.extraOpts.cert = {issuer, ctc};
}
}
isSelfRedirect() {
if (this.status < 300 || this.status >= 400 || this.status === 304) {
return false;
}
try {
const headers = new Headers(this.responseHeaders);
const redirUrl = new URL(headers.get("location"), this.url).href;
return this.url === redirUrl;
} catch (e) {
return false;
}
}
fillResponseReceivedExtraInfo(params) {
// this.responseHeaders = params.headers;
// if (params.headersText) {
// this.responseHeadersText = params.headersText;
// }
this.extraOpts.ipType = params.resourceIPAddressSpace;
}
fillFetchResponse(response) {
this.responseHeaders = Object.fromEntries(response.headers);
this.status = response.status;
this.statusText = response.statusText || getStatusText(this.status);
}
fillRequestExtraInfo(params) {
this.requestHeaders = params.headers;
}
getResponseHeadersText() {
let headers = `${this.protocol} ${this.status} ${this.statusText}\r\n`;
for (const header of Object.keys(this.responseHeaders)) {
headers += `${header}: ${this.responseHeaders[header].replace(/\n/g, ", ")}\r\n`;
}
headers += "\r\n";
return headers;
}
hasRequest() {
return this.method && (this.requestHeaders || this.requestHeadersText);
}
getRequestHeadersDict() {
return this._getHeadersDict(this.requestHeaders, null);
}
getResponseHeadersDict(length) {
return this._getHeadersDict(this.responseHeaders, this.responseHeadersList, length);
}
_getHeadersDict(headersDict, headersList, actualContentLength) {
if (!headersDict && headersList) {
headersDict = {};
for (const header of headersList) {
let headerName = header.name.toLowerCase();
if (EXCLUDE_HEADERS.includes(headerName)) {
headerName = "x-orig-" + headerName;
continue;
}
if (actualContentLength && headerName === CONTENT_LENGTH) {
headersDict[headerName] = "" + actualContentLength;
continue;
}
headersDict[headerName] = header.value.replace(/\n/g, ", ");
}
}
if (!headersDict) {
return {};
}
for (const key of Object.keys(headersDict)) {
if (key[0] === ":") {
delete headersDict[key];
continue;
}
const keyLower = key.toLowerCase();
if (EXCLUDE_HEADERS.includes(keyLower)) {
headersDict["x-orig-" + key] = headersDict[key];
delete headersDict[key];
continue;
}
if (actualContentLength && keyLower === CONTENT_LENGTH) {
headersDict[key] = "" + actualContentLength;
continue;
}
headersDict[key] = headersDict[key].replace(/\n/g, ", ");
}
return headersDict;
}
isValidBinary() {
if (!this.payload) {
return false;
}
const length = this.payload.length;
const headers = new Headers(this.getResponseHeadersDict());
const contentType = headers.get(CONTENT_TYPE);
const contentLength = headers.get(CONTENT_LENGTH);
if (Number(contentLength) !== length) {
return false;
}
if (contentType && contentType.startsWith("text/html")) {
return false;
}
return true;
}
}

View file

@ -1,96 +0,0 @@
import fs from "fs";
import path from "path";
import * as warcio from "warcio";
import sharp from "sharp";
import { logger, errJSON } from "./logger.js";
// ============================================================================
export const screenshotTypes = {
"view": {
type: "png",
omitBackground: true,
fullPage: false
},
"thumbnail": {
type: "jpeg",
omitBackground: true,
fullPage: false
},
"fullPage": {
type: "png",
omitBackground: true,
fullPage: true
}
};
export class Screenshots {
constructor({browser, page, url, date, directory}) {
this.browser = browser;
this.page = page;
this.url = url;
this.directory = directory;
this.warcName = path.join(this.directory, "screenshots.warc.gz");
this.date = date ? date : new Date();
}
async take(screenshotType="view") {
try {
if (screenshotType !== "fullPage") {
await this.browser.setViewport(this.page, {width: 1920, height: 1080});
}
const options = screenshotTypes[screenshotType];
const screenshotBuffer = await this.page.screenshot(options);
await this.writeBufferToWARC(screenshotBuffer, screenshotType, options.type);
logger.info(`Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.warcName}`);
} catch (e) {
logger.error("Taking screenshot failed", {"page": this.url, type: screenshotType, ...errJSON(e)}, "screenshots");
}
}
async takeFullPage() {
await this.take("fullPage");
}
async takeThumbnail() {
const screenshotType = "thumbnail";
try {
await this.browser.setViewport(this.page, {width: 1920, height: 1080});
const options = screenshotTypes[screenshotType];
const screenshotBuffer = await this.page.screenshot(options);
const thumbnailBuffer = await sharp(screenshotBuffer)
// 16:9 thumbnail
.resize(640, 360)
.toBuffer();
await this.writeBufferToWARC(thumbnailBuffer, screenshotType, options.type);
logger.info(`Screenshot (type: thumbnail) for ${this.url} written to ${this.warcName}`);
} catch (e) {
logger.error("Taking screenshot failed", {"page": this.url, type: screenshotType, ...errJSON(e)}, "screenshots");
}
}
async writeBufferToWARC(screenshotBuffer, screenshotType, imageType) {
const warcRecord = await this.wrap(screenshotBuffer, screenshotType, imageType);
const warcRecordBuffer = await warcio.WARCSerializer.serialize(warcRecord, {gzip: true});
fs.appendFileSync(this.warcName, warcRecordBuffer);
}
async wrap(buffer, screenshotType="screenshot", imageType="png") {
const warcVersion = "WARC/1.1";
const warcRecordType = "resource";
const warcHeaders = {"Content-Type": `image/${imageType}`};
async function* content() {
yield buffer;
}
let screenshotUrl = `urn:${screenshotType}:` + this.url;
return warcio.WARCRecord.create({
url: screenshotUrl,
date: this.date.toISOString(),
type: warcRecordType,
warcVersion,
warcHeaders}, content());
}
}

View file

@ -1,188 +0,0 @@
import { logger } from "./logger.js";
import { MAX_DEPTH } from "./constants.js";
export class ScopedSeed
{
constructor({url, scopeType, include, exclude = [], allowHash = false, depth = -1, sitemap = false, extraHops = 0} = {}) {
const parsedUrl = this.parseUrl(url);
if (!parsedUrl) {
throw new Error("Invalid URL");
}
this.url = parsedUrl.href;
this.include = this.parseRx(include);
this.exclude = this.parseRx(exclude);
this.scopeType = scopeType;
if (!this.scopeType) {
this.scopeType = this.include.length ? "custom" : "prefix";
}
if (this.scopeType !== "custom") {
[include, allowHash] = this.scopeFromType(this.scopeType, parsedUrl);
this.include = [...include, ...this.include];
}
// for page scope, the depth is set to extraHops, as no other
// crawling is done
if (this.scopeType === "page") {
depth = extraHops;
}
this.sitemap = this.resolveSiteMap(sitemap);
this.allowHash = allowHash;
this.maxExtraHops = extraHops;
this.maxDepth = depth < 0 ? MAX_DEPTH : depth;
}
parseRx(value) {
if (value === null || value === undefined || value === "") {
return [];
} else if (!(value instanceof Array)) {
return [new RegExp(value)];
} else {
return value.map(e => (e instanceof RegExp) ? e : new RegExp(e));
}
}
parseUrl(url, logDetails = {}) {
let parsedUrl = null;
try {
parsedUrl = new URL(url.trim());
} catch (e) {
logger.warn("Invalid Page - not a valid URL", {url, ...logDetails});
return null;
}
if (parsedUrl.protocol !== "http:" && parsedUrl.protocol != "https:") {
logger.warn("Invalid Page - URL must start with http:// or https://", {url, ...logDetails});
parsedUrl = null;
}
return parsedUrl;
}
resolveSiteMap(sitemap) {
if (sitemap === true) {
const url = new URL(this.url);
url.pathname = "/sitemap.xml";
return url.href;
}
return sitemap;
}
scopeFromType(scopeType, parsedUrl) {
let include;
let allowHash = false;
switch (scopeType) {
case "page":
include = [];
break;
case "page-spa":
// allow scheme-agnostic URLS as likely redirects
include = [new RegExp("^" + urlRxEscape(parsedUrl.href, parsedUrl) + "#.+")];
allowHash = true;
break;
case "prefix":
include = [new RegExp("^" + urlRxEscape(parsedUrl.origin + parsedUrl.pathname.slice(0, parsedUrl.pathname.lastIndexOf("/") + 1), parsedUrl))];
break;
case "host":
include = [new RegExp("^" + urlRxEscape(parsedUrl.origin + "/", parsedUrl))];
break;
case "domain":
if (parsedUrl.hostname.startsWith("www.")) {
parsedUrl.hostname = parsedUrl.hostname.replace("www.", "");
}
include = [new RegExp("^" + urlRxEscape(parsedUrl.origin + "/", parsedUrl).replace("\\/\\/", "\\/\\/([^/]+\\.)*"))];
break;
case "any":
include = [/.*/];
break;
default:
logger.fatal(`Invalid scope type "${scopeType}" specified, valid types are: page, page-spa, prefix, host, domain, any`);
}
return [include, allowHash];
}
isAtMaxDepth(depth) {
return depth >= this.maxDepth;
}
isIncluded(url, depth, extraHops = 0, logDetails = {}) {
if (depth > this.maxDepth) {
return false;
}
url = this.parseUrl(url, logDetails);
if (!url) {
return false;
}
if (!this.allowHash) {
// remove hashtag
url.hash = "";
}
url = url.href;
if (url === this.url) {
return true;
}
// skip already crawled
// if (this.seenList.has(url)) {
// return false;
//}
let inScope = false;
// check scopes
for (const s of this.include) {
if (s.test(url)) {
inScope = true;
break;
}
}
let isOOS = false;
if (!inScope) {
if (this.maxExtraHops && extraHops <= this.maxExtraHops) {
isOOS = true;
} else {
//console.log(`Not in scope ${url} ${this.include}`);
return false;
}
}
// check exclusions
for (const e of this.exclude) {
if (e.test(url)) {
//console.log(`Skipping ${url} excluded by ${e}`);
return false;
}
}
return {url, isOOS};
}
}
export function rxEscape(string) {
return string.replace(/[-/\\^$*+?.()|[\]{}]/g, "\\$&");
}
export function urlRxEscape(url, parsedUrl) {
return rxEscape(url).replace(parsedUrl.protocol, "https?:");
}

View file

@ -1,494 +0,0 @@
import { logger } from "./logger.js";
import { MAX_DEPTH } from "./constants.js";
// ============================================================================
export const LoadState = {
FAILED: 0,
CONTENT_LOADED: 1,
FULL_PAGE_LOADED: 2,
EXTRACTION_DONE: 3,
BEHAVIORS_DONE: 4,
};
// ============================================================================
export const QueueState = {
ADDED: 0,
LIMIT_HIT: 1,
DUPE_URL: 2,
};
// ============================================================================
export class PageState
{
constructor(redisData) {
this.url = redisData.url;
this.seedId = redisData.seedId;
this.depth = redisData.depth;
this.extraHops = redisData.extraHops;
this.workerid = null;
this.pageid = null;
this.title = null;
this.isHTMLPage = null;
this.text = null;
this.skipBehaviors = false;
this.filteredFrames = [];
this.loadState = LoadState.FAILED;
this.logDetails = {};
}
}
// ============================================================================
export class RedisCrawlState
{
constructor(redis, key, maxPageTime, uid) {
this.redis = redis;
this.maxRetryPending = 1;
this._lastSize = 0;
this.uid = uid;
this.key = key;
this.maxPageTime = maxPageTime;
this.qkey = this.key + ":q";
this.pkey = this.key + ":p";
this.skey = this.key + ":s";
// done (integer)
this.dkey = this.key + ":d";
// failed
this.fkey = this.key + ":f";
// crawler errors
this.ekey = this.key + ":e";
// start and end times to compute execution minutes
this.startkey = this.key + ":start";
this.endkey = this.key + ":end";
this._initLuaCommands(this.redis);
}
_initLuaCommands(redis) {
redis.defineCommand("addqueue", {
numberOfKeys: 3,
lua: `
local size = redis.call('scard', KEYS[3]);
local limit = tonumber(ARGV[4]);
if limit > 0 and size >= limit then
return 1;
end
if redis.call('sadd', KEYS[3], ARGV[1]) == 0 then
return 2;
end
redis.call('zadd', KEYS[2], ARGV[2], ARGV[3]);
redis.call('hdel', KEYS[1], ARGV[1]);
return 0;
`
});
redis.defineCommand("getnext", {
numberOfKeys: 2,
lua: `
local res = redis.call('zpopmin', KEYS[1]);
local json = res[1]
if json then
local data = cjson.decode(json);
redis.call('hset', KEYS[2], data.url, json);
end
return json;
`
});
redis.defineCommand("markstarted", {
numberOfKeys: 2,
lua: `
local json = redis.call('hget', KEYS[1], ARGV[1]);
if json then
local data = cjson.decode(json);
data['started'] = ARGV[2];
json = cjson.encode(data);
redis.call('hset', KEYS[1], ARGV[1], json);
redis.call('setex', KEYS[2], ARGV[3], ARGV[4]);
end
`
});
redis.defineCommand("unlockpending", {
numberOfKeys: 1,
lua: `
local value = redis.call('get', KEYS[1]);
if value == ARGV[1] then
redis.call('del', KEYS[1])
end
`
});
redis.defineCommand("movefailed", {
numberOfKeys: 2,
lua: `
local json = redis.call('hget', KEYS[1], ARGV[1]);
if json then
local data = cjson.decode(json);
data[ARGV[3]] = ARGV[2];
json = cjson.encode(data);
redis.call('lpush', KEYS[2], json);
redis.call('hdel', KEYS[1], ARGV[1]);
end
`
});
redis.defineCommand("requeue", {
numberOfKeys: 3,
lua: `
local res = redis.call('get', KEYS[3]);
if not res then
local json = redis.call('hget', KEYS[1], ARGV[1]);
if json then
local data = cjson.decode(json);
data['retry'] = (data['retry'] or 0) + 1;
redis.call('hdel', KEYS[1], ARGV[1]);
if tonumber(data['retry']) <= tonumber(ARGV[2]) then
json = cjson.encode(data);
redis.call('zadd', KEYS[2], 0, json);
return 1;
else
return 2;
end
end
end
return 0;
`
});
}
async _getNext() {
return await this.redis.getnext(this.qkey, this.pkey);
}
_timestamp() {
return new Date().toISOString();
}
async setStartTime() {
const startTime = this._timestamp();
return await this.redis.rpush(`${this.startkey}:${this.uid}`, startTime);
}
async getStartTimes() {
return await this.redis.lrange(`${this.startkey}:${this.uid}`, 0, -1);
}
async setEndTime() {
// Set start time if crawler exits before it was able to set one
if (!await this.redis.llen(`${this.startkey}:${this.uid}`)) {
await this.setStartTime();
}
const endTime = this._timestamp();
return await this.redis.rpush(`${this.endkey}:${this.uid}`, endTime);
}
async markStarted(url) {
const started = this._timestamp();
return await this.redis.markstarted(this.pkey, this.pkey + ":" + url, url, started, this.maxPageTime, this.uid);
}
async markFinished(url) {
await this.redis.call("hdel", this.pkey, url);
return await this.redis.incr(this.dkey);
}
async markFailed(url) {
await this.redis.movefailed(this.pkey, this.fkey, url, "1", "failed");
return await this.redis.incr(this.dkey);
}
recheckScope(data, seeds) {
const seed = seeds[data.seedId];
return seed.isIncluded(data.url, data.depth, data.extraHops);
}
async isFinished() {
return (await this.queueSize() == 0) && (await this.numDone() > 0);
}
async setStatus(status_) {
await this.redis.hset(`${this.key}:status`, this.uid, status_);
}
async getStatus() {
return await this.redis.hget(`${this.key}:status`, this.uid);
}
async setArchiveSize(size) {
return await this.redis.hset(`${this.key}:size`, this.uid, size);
}
async isCrawlStopped() {
if (await this.redis.get(`${this.key}:stopping`) === "1") {
return true;
}
if (await this.redis.hget(`${this.key}:stopone`, this.uid) === "1") {
return true;
}
return false;
}
// note: not currently called in crawler, but could be
// crawl may be stopped by setting this elsewhere in shared redis
async stopCrawl() {
await this.redis.set(`${this.key}:stopping`, "1");
}
async incFailCount() {
const key = `${this.key}:status:failcount:${this.uid}`;
const res = await this.redis.incr(key);
// consider failed if 3 failed retries in 60 secs
await this.redis.expire(key, 60);
return (res >= 3);
}
async addToQueue({url, seedId, depth = 0, extraHops = 0} = {}, limit = 0) {
const added = this._timestamp();
const data = {added, url, seedId, depth};
if (extraHops) {
data.extraHops = extraHops;
}
// return codes
// 0 - url queued successfully
// 1 - url queue size limit reached
// 2 - url is a dupe
return await this.redis.addqueue(this.pkey, this.qkey, this.skey, url, this._getScore(data), JSON.stringify(data), limit);
}
async nextFromQueue() {
const json = await this._getNext();
let data;
try {
data = JSON.parse(json);
} catch(e) {
logger.error("Invalid queued json", json);
return null;
}
if (!data) {
return null;
}
await this.markStarted(data.url);
return new PageState(data);
}
async has(url) {
return !!await this.redis.sismember(this.skey, url);
}
async serialize() {
//const queued = await this._iterSortKey(this.qkey);
const done = await this.numDone();
const queued = await this._iterSortedKey(this.qkey);
const pending = await this.getPendingList();
const failed = await this._iterListKeys(this.fkey);
const errors = await this.getErrorList();
return {done, queued, pending, failed, errors};
}
_getScore(data) {
return (data.depth || 0) + (data.extraHops || 0) * MAX_DEPTH;
}
async _iterSortedKey(key, inc = 100) {
const results = [];
const len = await this.redis.zcard(key);
for (let i = 0; i < len; i += inc) {
const someResults = await this.redis.zrangebyscore(key, 0, "inf", "limit", i, inc);
results.push(...someResults);
}
return results;
}
async _iterListKeys(key, inc = 100) {
const results = [];
const len = await this.redis.llen(key);
for (let i = 0; i < len; i += inc) {
const someResults = await this.redis.lrange(key, i, i + inc - 1);
results.push(...someResults);
}
return results;
}
async load(state, seeds, checkScope) {
const seen = [];
// need to delete existing keys, if exist to fully reset state
await this.redis.del(this.qkey);
await this.redis.del(this.pkey);
await this.redis.del(this.dkey);
await this.redis.del(this.fkey);
await this.redis.del(this.skey);
await this.redis.del(this.ekey);
for (const json of state.queued) {
const data = JSON.parse(json);
if (checkScope) {
if (!this.recheckScope(data, seeds)) {
continue;
}
}
await this.redis.zadd(this.qkey, this._getScore(data), json);
seen.push(data.url);
}
for (const json of state.pending) {
const data = JSON.parse(json);
if (checkScope) {
if (!this.recheckScope(data, seeds)) {
continue;
}
}
await this.redis.zadd(this.qkey, this._getScore(data), json);
seen.push(data.url);
}
// retained in modified form for backwards compatibility
for (const json of state.done) {
const data = JSON.parse(json);
if (data.failed) {
await this.redis.zadd(this.qkey, this._getScore(data), json);
} else {
await this.redis.incr(this.dkey);
}
seen.push(data.url);
}
for (const json of state.failed) {
const data = JSON.parse(json);
await this.redis.zadd(this.qkey, this._getScore(data), json);
seen.push(data.url);
}
for (const json of state.errors) {
await this.logError(json);
}
await this.redis.sadd(this.skey, seen);
return seen.length;
}
async numDone() {
const done = await this.redis.get(this.dkey);
return parseInt(done);
}
async numSeen() {
return await this.redis.scard(this.skey);
}
async numPending() {
const res = await this.redis.hlen(this.pkey);
// reset pendings
if (res > 0 && !this._lastSize) {
await this.resetPendings();
}
return res;
}
async numFailed() {
return await this.redis.llen(this.fkey);
}
async getPendingList() {
const list = await this.redis.hvals(this.pkey);
return list.map(x => JSON.parse(x));
}
async getErrorList() {
return await this.redis.lrange(this.ekey, 0, -1);
}
async clearOwnPendingLocks() {
try {
const pendingUrls = await this.redis.hkeys(this.pkey);
for (const url of pendingUrls) {
await this.redis.unlockpending(this.pkey + ":" + url, this.uid);
}
} catch (e) {
logger.error("Redis Del Pending Failed", e, "state");
}
}
async resetPendings() {
const pendingUrls = await this.redis.hkeys(this.pkey);
for (const url of pendingUrls) {
const res = await this.redis.requeue(this.pkey, this.qkey, this.pkey + ":" + url, url, this.maxRetryPending);
switch (res) {
case 1:
logger.info(`Requeued: ${url}`);
break;
case 2:
logger.info(`Not requeuing anymore: ${url}`);
break;
}
}
}
async queueSize() {
this._lastSize = await this.redis.zcard(this.qkey);
return this._lastSize;
}
async addIfNoDupe(key, value) {
return await this.redis.sadd(key, value) === 1;
}
async removeDupe(key, value) {
return await this.redis.srem(key, value);
}
async logError(error) {
return await this.redis.lpush(this.ekey, error);
}
}

View file

@ -1,241 +0,0 @@
import child_process from "child_process";
import fs from "fs";
import fsp from "fs/promises";
import util from "util";
import os from "os";
import { createHash } from "crypto";
import Minio from "minio";
import { initRedis } from "./redis.js";
import { logger } from "./logger.js";
import getFolderSize from "get-folder-size";
// ===========================================================================
export class S3StorageSync
{
constructor(urlOrData, {webhookUrl, userId, crawlId} = {}) {
let url;
let accessKey;
let secretKey;
if (typeof(urlOrData) === "string") {
url = new URL(urlOrData);
accessKey = url.username;
secretKey = url.password;
url.username = "";
url.password = "";
this.fullPrefix = url.href;
} else {
url = new URL(urlOrData.endpointUrl);
accessKey = urlOrData.accessKey;
secretKey = urlOrData.secretKey;
this.fullPrefix = url.href;
}
this.client = new Minio.Client({
endPoint: url.hostname,
port: Number(url.port) || (url.protocol === "https:" ? 443 : 80),
useSSL: url.protocol === "https:",
accessKey,
secretKey,
partSize: 100*1024*1024
});
this.client.enableSHA256 = true;
this.bucketName = url.pathname.slice(1).split("/")[0];
this.objectPrefix = url.pathname.slice(this.bucketName.length + 2);
this.resources = [];
this.userId = userId;
this.crawlId = crawlId;
this.webhookUrl = webhookUrl;
}
async uploadFile(srcFilename, targetFilename) {
const fileUploadInfo = {
"bucket": this.bucketName,
"crawlId": this.crawlId,
"prefix": this.objectPrefix,
"targetFilename": this.targetFilename
};
logger.info("S3 file upload information", fileUploadInfo, "s3Upload");
await this.client.fPutObject(this.bucketName, this.objectPrefix + targetFilename, srcFilename);
const finalHash = await checksumFile("sha256", srcFilename);
const size = await getFileSize(srcFilename);
return {"path": targetFilename, "hash": finalHash, "bytes": size};
}
async downloadFile(srcFilename, destFilename) {
await this.client.fGetObject(this.bucketName, this.objectPrefix + srcFilename, destFilename);
}
async uploadCollWACZ(srcFilename, targetFilename, completed = true) {
const resource = await this.uploadFile(srcFilename, targetFilename);
logger.info("WACZ S3 file upload resource", resource, "s3Upload");
if (this.webhookUrl) {
const body = {
id: this.crawlId,
user: this.userId,
//filename: `s3://${this.bucketName}/${this.objectPrefix}${this.waczFilename}`,
filename: this.fullPrefix + targetFilename,
hash: resource.hash,
size: resource.bytes,
completed
};
logger.info(`Pinging Webhook: ${this.webhookUrl}`);
if (this.webhookUrl.startsWith("http://") || this.webhookUrl.startsWith("https://")) {
await fetch(this.webhookUrl, {method: "POST", body: JSON.stringify(body)});
} else if (this.webhookUrl.startsWith("redis://")) {
const parts = this.webhookUrl.split("/");
if (parts.length !== 5) {
logger.fatal("redis webhook url must be in format: redis://<host>:<port>/<db>/<key>");
}
const redis = await initRedis(parts.slice(0, 4).join("/"));
await redis.rpush(parts[4], JSON.stringify(body));
}
}
}
}
export function initStorage() {
if (!process.env.STORE_ENDPOINT_URL) {
return null;
}
const endpointUrl = process.env.STORE_ENDPOINT_URL + (process.env.STORE_PATH || "");
const storeInfo = {
endpointUrl,
accessKey: process.env.STORE_ACCESS_KEY,
secretKey: process.env.STORE_SECRET_KEY,
};
const opts = {
crawlId: process.env.CRAWL_ID || os.hostname(),
webhookUrl: process.env.WEBHOOK_URL,
userId: process.env.STORE_USER,
};
logger.info("Initing Storage...");
return new S3StorageSync(storeInfo, opts);
}
export async function getFileSize(filename) {
const stats = await fsp.stat(filename);
return stats.size;
}
export async function getDirSize(dir) {
const { size, errors } = await getFolderSize(dir);
if (errors && errors.length) {
logger.warn("Size check errors", {errors}, "sizecheck");
}
return size;
}
export async function checkDiskUtilization(params, archiveDirSize, dfOutput=null) {
const diskUsage = await getDiskUsage("/crawls", dfOutput);
const usedPercentage = parseInt(diskUsage["Use%"].slice(0, -1));
// Check that disk usage isn't already above threshold
if (usedPercentage >= params.diskUtilization) {
logger.info(`Disk utilization threshold reached ${usedPercentage}% > ${params.diskUtilization}%, stopping`);
return {
stop: true,
used: usedPercentage,
projected: null,
threshold: params.diskUtilization
};
}
// Check that disk usage isn't likely to cross threshold
const kbUsed = parseInt(diskUsage["Used"]);
const kbTotal = parseInt(diskUsage["1K-blocks"]);
let kbArchiveDirSize = Math.round(archiveDirSize/1024);
if (params.combineWARC && params.generateWACZ) {
kbArchiveDirSize *= 4;
} else if (params.combineWARC || params.generateWACZ) {
kbArchiveDirSize *= 2;
}
const projectedTotal = kbUsed + kbArchiveDirSize;
const projectedUsedPercentage = calculatePercentageUsed(projectedTotal, kbTotal);
if (projectedUsedPercentage >= params.diskUtilization) {
logger.info(`Disk utilization projected to reach threshold ${projectedUsedPercentage}% > ${params.diskUtilization}%, stopping`);
return {
stop: true,
used: usedPercentage,
projected: projectedUsedPercentage,
threshold: params.diskUtilization
};
}
return {
stop: false,
used: usedPercentage,
projected: projectedUsedPercentage,
threshold: params.diskUtilization
};
}
export async function getDFOutput(path) {
const exec = util.promisify(child_process.exec);
const res = await exec(`df ${path}`);
return res.stdout;
}
export async function getDiskUsage(path="/crawls", dfOutput = null) {
const result = dfOutput || await getDFOutput(path);
const lines = result.split("\n");
const keys = lines[0].split(/\s+/ig);
const rows = lines.slice(1).map(line => {
const values = line.split(/\s+/ig);
return keys.reduce((o, k, index) => {
o[k] = values[index];
return o;
}, {});
});
return rows[0];
}
export function calculatePercentageUsed(used, total) {
return Math.round((used/total) * 100);
}
function checksumFile(hashName, path) {
return new Promise((resolve, reject) => {
const hash = createHash(hashName);
const stream = fs.createReadStream(path);
stream.on("error", err => reject(err));
stream.on("data", chunk => hash.update(chunk));
stream.on("end", () => resolve(hash.digest("hex")));
});
}
export function interpolateFilename(filename, crawlId) {
filename = filename.replace("@ts", new Date().toISOString().replace(/[:TZz.-]/g, ""));
filename = filename.replace("@hostname", os.hostname());
filename = filename.replace("@hostsuffix", os.hostname().slice(-14));
filename = filename.replace("@id", crawlId);
return filename;
}

View file

@ -1,58 +0,0 @@
export class TextExtract {
constructor(dom){
this.dom = dom;
}
async parseText(node, metadata, accum) {
const SKIPPED_NODES = ["head", "script", "style", "header", "footer", "banner-div", "noscript"];
const EMPTY_LIST = [];
const TEXT = "#text";
const TITLE = "title";
const name = node.nodeName.toLowerCase();
if (SKIPPED_NODES.includes(name)) {
return;
}
const children = node.children || EMPTY_LIST;
if (name === TEXT) {
const value = node.nodeValue ? node.nodeValue.trim() : "";
if (value) {
accum.push(value);
}
} else if (name === TITLE) {
const title = [];
for (let child of children) {
this.parseText(child, null, title);
}
if (metadata) {
metadata.title = title.join(" ");
} else {
accum.push(title.join(" "));
}
} else {
for (let child of children) {
this.parseText(child, metadata, accum);
}
if (node.contentDocument) {
this.parseText(node.contentDocument, null, accum);
}
}
}
async parseTextFromDom() {
const accum = [];
const metadata = {};
this.parseText(this.dom.root, metadata, accum);
return accum.join("\n");
}
}

View file

@ -1,37 +0,0 @@
import { logger } from "./logger.js";
export function sleep(seconds) {
return new Promise(resolve => setTimeout(resolve, seconds * 1000));
}
export function timedRun(promise, seconds, message="Promise timed out", logDetails={}, context="general", isWarn=false) {
// return Promise return value or log error if timeout is reached first
const timeout = seconds * 1000;
const rejectPromiseOnTimeout = (timeout) => {
return new Promise((resolve, reject) => {
setTimeout(() => (reject("timeout reached")), timeout);
});
};
return Promise.race([promise, rejectPromiseOnTimeout(timeout)])
.catch((err) => {
if (err == "timeout reached") {
const logFunc = isWarn ? logger.warn : logger.error;
logFunc.call(logger, message, {"seconds": seconds, ...logDetails}, context);
} else {
//logger.error("Unknown exception", {...errJSON(err), ...logDetails}, context);
throw err;
}
});
}
export function secondsElapsed(startTime, nowDate = null) {
nowDate = nowDate || new Date();
return (nowDate.getTime() - startTime) / 1000;
}
export function timestampNow() {
return new Date().toISOString().replace(/[^\d]/g, "");
}

View file

@ -1,116 +0,0 @@
import fs from "fs";
import path from "path";
import { CDXIndexer } from "warcio";
import { WARCSerializer } from "warcio/node";
import { logger, errJSON } from "./logger.js";
// =================================================================
export class WARCWriter
{
constructor({archivesDir, tempCdxDir, filename, gzip, logDetails}) {
this.archivesDir = archivesDir;
this.tempCdxDir = tempCdxDir;
this.filename = filename;
this.gzip = gzip;
this.logDetails = logDetails;
this.offset = 0;
this.recordLength = 0;
if (this.tempCdxDir) {
this.indexer = new CDXIndexer({format: "cdxj"});
} else {
this.indexer = null;
}
this.fh = null;
this.cdxFH = null;
}
async initFH() {
if (!this.fh) {
this.fh = fs.createWriteStream(path.join(this.archivesDir, this.filename));
}
if (!this.cdxFH && this.tempCdxDir) {
this.cdxFH = fs.createWriteStream(path.join(this.tempCdxDir, this.filename + ".cdx"));
}
}
async writeRecordPair(responseRecord, requestRecord, responseSerializer = null) {
const opts = {gzip: this.gzip};
if (!responseSerializer) {
responseSerializer = new WARCSerializer(responseRecord, opts);
}
await this.initFH();
this.recordLength = await this._writeRecord(responseRecord, responseSerializer);
this._writeCDX(responseRecord);
const requestSerializer = new WARCSerializer(requestRecord, opts);
this.recordLength = await this._writeRecord(requestRecord, requestSerializer);
this._writeCDX(requestRecord);
}
async _writeRecord(record, serializer) {
let total = 0;
let count = 0;
const url = record.warcTargetURI;
for await (const chunk of serializer) {
total += chunk.length;
count++;
try {
this.fh.write(chunk);
} catch (e) {
logger.error("Error writing to WARC, corruption possible", {...errJSON(e), url, ...this.logDetails}, "writer");
}
if (!(count % 10)) {
//logNetwork("Writing WARC Chunk", {total, count, url, logDetails});
}
}
return total;
}
_writeCDX(record) {
if (this.indexer) {
const cdx = this.indexer.indexRecord(record, this, this.filename);
if (this.indexer && this.cdxFH && cdx) {
this.indexer.write(cdx, this.cdxFH);
}
}
this.offset += this.recordLength;
}
async flush() {
if (this.fh) {
await streamFinish(this.fh);
this.fh = null;
}
if (this.cdxFH) {
this._writeCDX(null);
await streamFinish(this.cdxFH);
this.cdxFH = null;
}
}
}
// =================================================================
export function streamFinish(fh) {
const p = new Promise(resolve => {
fh.once("finish", () => resolve());
});
fh.end();
return p;
}

View file

@ -1,320 +0,0 @@
import os from "os";
import { v4 as uuidv4 } from "uuid";
import { logger, errJSON } from "./logger.js";
import { sleep, timedRun } from "./timing.js";
import { Recorder } from "./recorder.js";
import { rxEscape } from "./seeds.js";
const MAX_REUSE = 5;
const NEW_WINDOW_TIMEOUT = 20;
const TEARDOWN_TIMEOUT = 10;
const FINISHED_TIMEOUT = 60;
// ===========================================================================
export function runWorkers(crawler, numWorkers, maxPageTime, collDir) {
logger.info(`Creating ${numWorkers} workers`, {}, "worker");
const workers = [];
let offset = 0;
// automatically set worker start by ordinal in k8s
// if hostname is "crawl-id-name-N"
// while CRAWL_ID is "crawl-id-name", then set starting
// worker index offset to N * numWorkers
if (process.env.CRAWL_ID) {
const rx = new RegExp(rxEscape(process.env.CRAWL_ID) + "\\-([\\d]+)$");
const m = os.hostname().match(rx);
if (m) {
offset = m[1] * numWorkers;
logger.info("Starting workerid index at " + offset, "worker");
}
}
for (let i = 0; i < numWorkers; i++) {
workers.push(new PageWorker(i + offset, crawler, maxPageTime, collDir));
}
return Promise.allSettled(workers.map((worker) => worker.run()));
}
// ===========================================================================
export class PageWorker
{
constructor(id, crawler, maxPageTime, collDir) {
this.id = id;
this.crawler = crawler;
this.maxPageTime = maxPageTime;
this.reuseCount = 0;
this.page = null;
this.cdp = null;
this.callbacks = null;
this.opts = null;
this.logDetails = {workerid: this.id};
this.crashed = false;
this.markCrashed = null;
this.crashBreak = null;
this.recorder = new Recorder({workerid: id, collDir, crawler: this.crawler});
this.crawler.browser.recorders.push(this.recorder);
}
async closePage() {
if (!this.page) {
return;
}
if (this.recorder) {
await this.recorder.onClosePage();
}
if (!this.crashed) {
try {
await timedRun(
this.crawler.teardownPage(this.opts),
TEARDOWN_TIMEOUT,
"Page Teardown Timed Out",
this.logDetails,
"worker"
);
} catch (e) {
// ignore
}
}
try {
logger.debug("Closing page", {crashed: this.crashed, workerid: this.id}, "worker");
await timedRun(
this.page.close(),
TEARDOWN_TIMEOUT,
"Page Close Timed Out",
this.logDetails,
"worker"
);
} catch (e) {
// ignore
} finally {
this.cdp = null;
this.page = null;
}
}
isSameOrigin(url) {
try {
const currURL = new URL(this.page.url());
const newURL = new URL(url);
return currURL.origin === newURL.origin;
} catch (e) {
return false;
}
}
async initPage(url) {
if (!this.crashed && this.page && ++this.reuseCount <= MAX_REUSE && this.isSameOrigin(url)) {
logger.debug("Reusing page", {reuseCount: this.reuseCount, ...this.logDetails}, "worker");
return this.opts;
} else if (this.page) {
await this.closePage();
}
this.reuseCount = 1;
const workerid = this.id;
let retry = 0;
while (await this.crawler.isCrawlRunning()) {
try {
logger.debug("Getting page in new window", {workerid}, "worker");
const result = await timedRun(
this.crawler.browser.newWindowPageWithCDP(),
NEW_WINDOW_TIMEOUT,
"New Window Timed Out",
{workerid},
"worker"
);
if (!result) {
throw new Error("timed out");
}
const { page, cdp } = result;
this.page = page;
this.cdp = cdp;
this.callbacks = {};
const directFetchCapture = this.recorder ? (x) => this.recorder.directFetchCapture(x) : null;
this.opts = {
page: this.page,
cdp: this.cdp,
workerid,
callbacks: this.callbacks,
directFetchCapture,
};
if (this.recorder) {
await this.recorder.onCreatePage(this.opts);
}
// updated per page crawl
this.crashed = false;
this.crashBreak = new Promise((resolve, reject) => this.markCrashed = reject);
this.logDetails = {page: this.page.url(), workerid};
// more serious page crash, mark as failed
this.page.on("error", (err) => {
// ensure we're still on this page, otherwise ignore!
if (this.page === page) {
logger.error("Page Crashed", {...errJSON(err), ...this.logDetails}, "worker");
this.crashed = true;
this.markCrashed("crashed");
}
});
await this.crawler.setupPage(this.opts);
return this.opts;
} catch (err) {
logger.warn("Error getting new page", {"workerid": this.id, ...errJSON(err)}, "worker");
retry++;
if (!this.crawler.browser.browser) {
break;
}
if (retry >= MAX_REUSE) {
logger.fatal("Unable to get new page, browser likely crashed", this.logDetails, "worker");
}
await sleep(0.5);
logger.warn("Retrying getting new page", this.logDetails, "worker");
if (this.crawler.healthChecker) {
this.crawler.healthChecker.incError();
}
}
}
}
async crawlPage(opts) {
const res = await this.crawler.crawlPage(opts);
if (this.recorder) {
await this.recorder.finishPage();
}
return res;
}
async timedCrawlPage(opts) {
const workerid = this.id;
const { data } = opts;
const { url } = data;
logger.info("Starting page", {workerid, "page": url}, "worker");
this.logDetails = {page: url, workerid};
// set new page id
const pageid = uuidv4();
data.pageid = pageid;
if (this.recorder) {
this.recorder.startPage({pageid, url});
}
try {
await Promise.race([
timedRun(
this.crawlPage(opts),
this.maxPageTime,
"Page Worker Timeout",
this.logDetails,
"worker"
),
this.crashBreak
]);
} catch (e) {
if (e.message !== "logged" && !this.crashed) {
logger.error("Worker Exception", {...errJSON(e), ...this.logDetails}, "worker");
}
} finally {
await timedRun(
this.crawler.pageFinished(data),
FINISHED_TIMEOUT,
"Page Finished Timed Out",
this.logDetails,
"worker"
);
}
}
async run() {
logger.info("Worker starting", {workerid: this.id}, "worker");
try {
await this.runLoop();
logger.info("Worker done, all tasks complete", {workerid: this.id}, "worker");
} catch (e) {
logger.error("Worker error, exiting", {...errJSON(e), workerid: this.id}, "worker");
} finally {
if (this.recorder) {
await this.recorder.onDone();
}
}
}
async runLoop() {
const crawlState = this.crawler.crawlState;
let loggedWaiting = false;
while (await this.crawler.isCrawlRunning()) {
const data = await crawlState.nextFromQueue();
// see if any work data in the queue
if (data) {
// init page (new or reuse)
const opts = await this.initPage(data.url);
// run timed crawl of page
await this.timedCrawlPage({...opts, data});
loggedWaiting = false;
} else {
// indicate that the worker has no more work (mostly for screencasting, status, etc...)
// depending on other works, will either get more work or crawl will end
this.crawler.workerIdle(this.id);
// check if any pending urls
const pending = await crawlState.numPending();
// if pending, sleep and check again
if (pending) {
if (!loggedWaiting) {
logger.debug("No crawl tasks, but pending tasks remain, waiting", {pending, workerid: this.id}, "worker");
loggedWaiting = true;
}
await sleep(0.5);
} else {
// if no pending and queue size is still empty, we're done!
if (!await crawlState.queueSize()) {
break;
}
}
}
}
}
}