From 705bc0cd9fca31d6032fae558f599e01c617fa35 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 10 Sep 2025 12:05:21 -0700 Subject: [PATCH] Async Fetch Refactor (#880) - separate out reading stream response while browser is waiting (not really async) from actual async loading, this is not handled via fetchResponseBody() - unify async fetch into first trying browser networking for regular GET, fallback to regular fetch() - load headers and body separately in async fetch, allowing for cancelling request after headers - refactor direct fetch of non-html pages: load headers and handle loading body, adding page async, allowing worker to continue loading browser-based pages (should allow more parallelization in the future) - unify WARC writing in preparation for dedup: unified serializeWARC() called for all paths, WARC digest computed, additional checks for payload added for streaming loading --- src/crawler.ts | 69 ++- src/util/recorder.ts | 986 +++++++++++++++++------------------ src/util/state.ts | 1 + tests/non-html-crawl.test.js | 19 +- 4 files changed, 524 insertions(+), 551 deletions(-) diff --git a/src/crawler.ts b/src/crawler.ts index 884200af..3b1805b3 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -1059,58 +1059,43 @@ self.__bx_behaviors.selectMainBehavior(); data.logDetails = logDetails; data.workerid = workerid; + let result = false; + if (recorder) { try { const headers = auth ? { Authorization: auth, ...this.headers } : this.headers; - const result = await timedRun( - recorder.directFetchCapture({ url, headers, cdp }), + result = await timedRun( + recorder.directFetchCapture({ + url, + headers, + cdp, + state: data, + crawler: this, + }), this.params.pageLoadTimeout, "Direct fetch of page URL timed out", logDetails, "fetch", ); - - // fetched timed out, already logged, don't retry in browser - if (!result) { - return; - } - - const { fetched, mime, ts } = result; - - if (mime) { - data.mime = mime; - data.isHTMLPage = isHTMLMime(mime); - } - if (fetched) { - data.loadState = LoadState.FULL_PAGE_LOADED; - data.status = 200; - data.ts = ts || new Date(); - logger.info( - "Direct fetch successful", - { url, mime, ...logDetails }, - "fetch", - ); - return; - } } catch (e) { - if (e instanceof Error && e.message === "response-filtered-out") { - // filtered out direct fetch - logger.debug( - "Direct fetch response not accepted, continuing with browser fetch", - logDetails, - "fetch", - ); - } else { - logger.error( - "Direct fetch of page URL failed", - { e, ...logDetails }, - "fetch", - ); - return; - } + logger.error( + "Direct fetch of page URL failed", + { e, ...logDetails }, + "fetch", + ); + } + + if (!result) { + logger.debug( + "Direct fetch response not accepted, continuing with browser fetch", + logDetails, + "fetch", + ); + } else { + return; } } @@ -1280,6 +1265,10 @@ self.__bx_behaviors.selectMainBehavior(); } async pageFinished(data: PageState, lastErrorText = "") { + // not yet finished + if (data.asyncLoading) { + return; + } // if page loaded, considered page finished successfully // (even if behaviors timed out) const { loadState, logDetails, depth, url, pageSkipped } = data; diff --git a/src/util/recorder.ts b/src/util/recorder.ts index 477f0515..96f95106 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -20,7 +20,7 @@ import { import { WARCRecord, multiValueHeader } from "warcio"; import { TempFileBuffer, WARCSerializer } from "warcio/node"; import { WARCWriter } from "./warcwriter.js"; -import { RedisCrawlState, WorkerId } from "./state.js"; +import { LoadState, PageState, RedisCrawlState, WorkerId } from "./state.js"; import { CDPSession, Protocol } from "puppeteer-core"; import { Crawler } from "../crawler.js"; import { getProxyDispatcher } from "./proxy.js"; @@ -87,11 +87,11 @@ export type AsyncFetchOptions = { expectedSize?: number; // eslint-disable-next-line no-use-before-define recorder: Recorder; - networkId: string; - filter?: (resp: Response) => boolean; ignoreDupe?: boolean; maxFetchSize?: number; manualRedirect?: boolean; + useBrowserNetwork?: boolean; + cdp?: CDPSession; }; // ================================================================= @@ -99,23 +99,8 @@ export type DirectFetchRequest = { url: string; headers: Record; cdp: CDPSession; -}; - -// ================================================================= -export type DirectFetchResponse = { - fetched: boolean; - mime: string; - ts: Date; -}; - -// ================================================================= -export type NetworkLoadAsyncFetchOptions = AsyncFetchOptions & { - cdp: CDPSession; -}; - -// ================================================================= -export type ResponseStreamAsyncFetchOptions = NetworkLoadAsyncFetchOptions & { - requestId: string; + state: PageState; + crawler: Crawler; }; // ================================================================= @@ -471,13 +456,8 @@ export class Recorder extends EventEmitter { reqresp.deleteRange(); reqresp.requestId = "0"; - const fetcher = new AsyncFetcher({ - reqresp, - expectedSize: reqresp.expectedSize ? reqresp.expectedSize : -1, - recorder: this, - networkId: "0", - }); - void this.fetcherQ.add(() => fetcher.load()); + const expectedSize = reqresp.expectedSize ? reqresp.expectedSize : -1; + this.addAsyncFetch({ reqresp, expectedSize, recorder: this }); return; } break; @@ -670,16 +650,12 @@ export class Recorder extends EventEmitter { reqrespNew.deleteRange(); reqrespNew.frameId = params.frameId; - this.addAsyncFetch( - { - reqresp: reqrespNew, - expectedSize: parseInt(range.split("/")[1]), - recorder: this, - networkId: "0", - cdp, - }, - contentLen, - ); + this.addAsyncFetch({ + reqresp: reqrespNew, + expectedSize: parseInt(range.split("/")[1]), + recorder: this, + cdp, + }); } return false; @@ -722,7 +698,6 @@ export class Recorder extends EventEmitter { this.addAsyncFetch({ reqresp: reqrespNew, recorder: this, - networkId: "0", cdp, }); } @@ -790,32 +765,25 @@ export class Recorder extends EventEmitter { mimeType, ) ) { - const opts: ResponseStreamAsyncFetchOptions = { - reqresp, - expectedSize: contentLen, - recorder: this, - networkId, - cdp, - requestId, - }; - - // fetching using response stream as first attempt, - // await here and then either call fulFill, or if dupe, return false - const fetcher = new ResponseStreamAsyncFetcher(opts); - const res = await fetcher.load(); - switch (res) { - case "dupe": - this.removeReqResp(networkId); - return false; - - case "fetched": - streamingConsume = true; - break; + if (await this.isDupeFetch(reqresp)) { + this.removeReqResp(networkId); + return false; } + streamingConsume = await this.fetchResponseBody(networkId, reqresp, cdp); + // if not consumed via takeStream, attempt async loading if (!streamingConsume) { - this.addAsyncFetch(opts, contentLen); + this.removeReqResp(networkId); + + const opts: AsyncFetchOptions = { + reqresp, + expectedSize: contentLen, + recorder: this, + cdp, + }; + + this.addAsyncFetch(opts); return false; } } else { @@ -888,18 +856,8 @@ export class Recorder extends EventEmitter { return true; } - addAsyncFetch(opts: NetworkLoadAsyncFetchOptions, contentLen: number = -1) { - let fetcher: AsyncFetcher; - - if ( - opts.reqresp.method !== "GET" || - contentLen > MAX_NETWORK_LOAD_SIZE || - !opts.reqresp.inPageContext - ) { - fetcher = new AsyncFetcher(opts); - } else { - fetcher = new NetworkLoadStreamAsyncFetcher(opts); - } + addAsyncFetch(opts: AsyncFetchOptions) { + const fetcher = new AsyncFetcher(opts); void this.fetcherQ.add(() => fetcher.load()); } @@ -913,13 +871,7 @@ export class Recorder extends EventEmitter { reqresp.url = url; reqresp.method = "GET"; reqresp.frameId = this.mainFrameId || undefined; - const fetcher = new NetworkLoadStreamAsyncFetcher({ - reqresp, - recorder: this, - cdp, - networkId: "0", - }); - void this.fetcherQ.add(() => fetcher.load()); + this.addAsyncFetch({ reqresp, recorder: this, cdp }); // return true if successful return true; } @@ -1364,57 +1316,13 @@ export class Recorder extends EventEmitter { return reqresp; } - async serializeToWARC(reqresp: RequestResponseInfo) { - // always include in pageinfo record if going to serialize to WARC - // even if serialization does not happen - this.addPageRecord(reqresp); - - const { url, method, status, payload, requestId } = reqresp; - - // Specifically log skipping cached resources - if (reqresp.isCached()) { - logger.debug( - "Skipping cached resource, should be already recorded", - { url, status }, - "recorder", - ); - return; - } else if (reqresp.shouldSkipSave()) { - logger.debug( - "Skipping writing request/response", - { - requestId, - url, - method, - status, - payloadLength: (payload && payload.length) || 0, - }, - "recorder", - ); - return; - } - - if ( - url && - method === "GET" && - !isRedirectStatus(status) && - !(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, status)) - ) { - logNetwork("Skipping dupe", { url, status, ...this.logDetails }); - return; - } - - const responseRecord = createResponse(reqresp, this.pageid); - const requestRecord = createRequest(reqresp, responseRecord, this.pageid); - - this.writer.writeRecordPair(responseRecord, requestRecord); - } - async directFetchCapture({ url, headers, cdp, - }: DirectFetchRequest): Promise { + crawler, + state, + }: DirectFetchRequest): Promise { const reqresp = new RequestResponseInfo("0"); const ts = new Date(); @@ -1428,61 +1336,29 @@ export class Recorder extends EventEmitter { reqresp.requestHeaders = headers; reqresp.ts = ts; - let mime: string = ""; - - const filter = (resp: Response) => { - // only direct load 200 responses - if (resp.status !== 200) { - return false; - } - - const ct = resp.headers.get("content-type"); - if (ct) { - mime = ct.split(";")[0]; - } - - const result = !isHTMLMime(mime); - - if (result) { - logger.info( - "Directly fetching page URL without browser", - { url, ...this.logDetails }, - "fetch", - ); - } - - return result; - }; - // ignore dupes: if previous URL was not a page, still load as page. if previous was page, // should not get here, as dupe pages tracked via seen list const fetcher = new AsyncFetcher({ reqresp, recorder: this, - networkId: "0", - filter, ignoreDupe: true, manualRedirect: true, + useBrowserNetwork: false, }); - const res = await fetcher.load(); - // if we get here, resource was not filtered out, has status code of 200 - - this.addPageRecord(reqresp); - - const fetched = res === "fetched"; - - if ( - url === this.pageUrl && - fetched && - (!this.pageInfo.ts || 200 < this.pageInfo.tsStatus) - ) { - logger.debug("Setting page timestamp", { ts, url, status: 200 }); - this.pageInfo.ts = ts; - this.pageInfo.tsStatus = 200; + if (!(await fetcher.loadHeaders())) { + return false; } - return { fetched, mime, ts }; + const mime = reqresp.getMimeType() || ""; + // cancel if not 200 or mime is html + if (reqresp.status !== 200 || isHTMLMime(mime)) { + await fetcher.doCancel(); + return false; + } + state.asyncLoading = true; + void this.fetcherQ.add(() => fetcher.loadDirectPage(state, crawler)); + return true; } async getCookieString(cdp: CDPSession, url: string): Promise { @@ -1499,15 +1375,283 @@ export class Recorder extends EventEmitter { return ""; } } + + async fetchResponseBody( + requestId: string, + reqresp: RequestResponseInfo, + cdp: CDPSession, + ) { + const { url } = reqresp; + try { + logger.debug("Async started: takeStream", { url }, "recorder"); + + const { stream } = await cdp.send("Fetch.takeResponseBodyAsStream", { + requestId, + }); + + const iter = this.takeStreamIter(reqresp, cdp, stream); + + if (!(await this.serializeToWARC(reqresp, iter))) { + return false; + } + + this.removeReqResp(requestId); + } catch (e) { + logger.debug( + "Fetch responseBodyAsStream failed, will retry async", + { url, error: e, ...this.logDetails }, + "recorder", + ); + return false; + } + + return true; + } + + async *takeStreamIter( + reqresp: RequestResponseInfo, + cdp: CDPSession, + stream: Protocol.IO.StreamHandle, + ) { + let size = 0; + try { + while (true) { + const { data, base64Encoded, eof } = await cdp.send("IO.read", { + handle: stream, + size: TAKE_STREAM_BUFF_SIZE, + }); + const buff = Buffer.from(data, base64Encoded ? "base64" : "utf-8"); + + size += buff.length; + yield buff; + + if (eof) { + break; + } + } + } catch (e) { + logger.warn( + "takeStream interrupted", + { + size, + url: reqresp.url, + ...formatErr(e), + ...this.logDetails, + }, + "recorder", + ); + reqresp.truncated = "disconnect"; + } + } + + async isDupeFetch(reqresp: RequestResponseInfo) { + const { url, method, status } = reqresp; + if ( + method === "GET" && + url && + !(await this.crawlState.addIfNoDupe(ASYNC_FETCH_DUPE_KEY, url, status)) + ) { + reqresp.asyncLoading = false; + return true; + } + + return false; + } + + async checkStreamingRecordPayload( + reqresp: RequestResponseInfo, + serializer: WARCSerializer, + canRetry: boolean, + ) { + const { url } = reqresp; + const { logDetails } = this; + try { + let readSize = await serializer.digestRecord(); + if (serializer.httpHeadersBuff) { + readSize -= serializer.httpHeadersBuff.length; + } + reqresp.readSize = readSize; + // set truncated field and recompute header buff + if (reqresp.truncated) { + logger.warn( + "Response truncated", + { url, canRetry, ...logDetails }, + "recorder", + ); + // if retries available, just retry + if (canRetry) { + return false; + } + } + } catch (e) { + logger.error( + "Error reading + digesting payload", + { url, canRetry, ...formatErr(e), ...logDetails }, + "recorder", + ); + return false; + } + + if (reqresp.readSize === reqresp.expectedSize || reqresp.expectedSize < 0) { + logger.debug( + "Async fetch: streaming done", + { + size: reqresp.readSize, + expected: reqresp.expectedSize, + url, + ...logDetails, + }, + "recorder", + ); + } else { + logger.warn( + "Async fetch: possible response size mismatch", + { + type: this.constructor.name, + size: reqresp.readSize, + expected: reqresp.expectedSize, + url, + canRetry, + ...logDetails, + }, + "recorder", + ); + return false; + } + + return true; + } + + async serializeToWARC( + reqresp: RequestResponseInfo, + iter?: AsyncIterable, + ): Promise { + // always include in pageinfo record if going to serialize to WARC + // even if serialization does not happen, indicates this URL was on the page + this.addPageRecord(reqresp); + + const { pageid, gzip } = this; + const { url, status, requestId, method, payload } = reqresp; + + // Specifically log skipping cached resources + if (reqresp.isCached()) { + logger.debug( + "Skipping cached resource, should be already recorded", + { url, status }, + "recorder", + ); + return false; + } else if (!iter && reqresp.shouldSkipSave()) { + logger.debug( + "Skipping writing request/response", + { + requestId, + url, + method, + status, + payloadLength: (payload && payload.length) || 0, + }, + "recorder", + ); + return false; + } + + if ( + url && + method === "GET" && + !isRedirectStatus(status) && + !(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, status)) + ) { + logNetwork("Skipping dupe", { url, status, ...this.logDetails }); + return false; + } + + const responseRecord = createResponse(reqresp, pageid, iter); + const requestRecord = createRequest(reqresp, responseRecord, pageid); + + const serializer = new WARCSerializer(responseRecord, { + gzip, + maxMemSize: MAX_BROWSER_DEFAULT_FETCH_SIZE, + }); + + if (iter) { + if ( + !(await this.checkStreamingRecordPayload(reqresp, serializer, false)) + ) { + serializer.externalBuffer?.purge(); + await this.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status); + return false; + } + + const externalBuffer: TempFileBuffer = + serializer.externalBuffer as TempFileBuffer; + + if (externalBuffer) { + const { currSize, buffers, fh } = externalBuffer; + + // if fully buffered in memory, then populate the payload to return to browser + if (buffers && buffers.length && !fh) { + reqresp.payload = Buffer.concat(buffers, currSize); + externalBuffer.buffers = [reqresp.payload]; + } else if (fh) { + logger.debug( + "Large payload written to WARC, but not returned to browser (would require rereading into memory)", + { + url, + actualSize: reqresp.readSize, + maxSize: MAX_BROWSER_DEFAULT_FETCH_SIZE, + }, + "recorder", + ); + } + } + } else { + await serializer.digestRecord(); + } + + let modified = false; + + if (reqresp.truncated) { + responseRecord.warcHeaders.headers.set( + "WARC-Truncated", + reqresp.truncated, + ); + modified = true; + } + + if (Object.keys(reqresp.extraOpts).length) { + responseRecord.warcHeaders.headers.set( + "WARC-JSON-Metadata", + JSON.stringify(reqresp.extraOpts), + ); + modified = true; + } + + if (modified) { + serializer.warcHeadersBuff = encoder.encode( + responseRecord.warcHeaders.toString(), + ); + } + + this.writer.writeRecordPair(responseRecord, requestRecord, serializer); + + this.addPageRecord(reqresp); + + return true; + } } // ================================================================= class AsyncFetcher { reqresp: RequestResponseInfo; - networkId: string; - filter?: (resp: Response) => boolean; ignoreDupe = false; + useBrowserNetwork = true; + + cdp: CDPSession | null = null; + + stream?: string; + resp?: Response; maxFetchSize: number; @@ -1521,19 +1665,17 @@ class AsyncFetcher { reqresp, expectedSize = -1, recorder, - networkId, - filter = undefined, ignoreDupe = false, maxFetchSize = MAX_BROWSER_DEFAULT_FETCH_SIZE, manualRedirect = false, + useBrowserNetwork = true, }: AsyncFetchOptions) { this.reqresp = reqresp; this.reqresp.expectedSize = expectedSize; this.reqresp.asyncLoading = true; - this.networkId = networkId; - this.filter = filter; this.ignoreDupe = ignoreDupe; + this.useBrowserNetwork = useBrowserNetwork; this.recorder = recorder; @@ -1543,196 +1685,88 @@ class AsyncFetcher { } async load() { - const { reqresp, recorder, networkId } = this; - const { url, status } = reqresp; - - const { pageid, crawlState, gzip, logDetails } = recorder; - - let fetched = "notfetched"; - - try { - if ( - reqresp.method === "GET" && - url && - !(await crawlState.addIfNoDupe(ASYNC_FETCH_DUPE_KEY, url, status)) - ) { - if (!this.ignoreDupe) { - this.reqresp.asyncLoading = false; - return "dupe"; - } + for (let i = 0; i < DEFAULT_MAX_RETRIES; i++) { + if (!(await this.loadHeaders())) { + continue; } - - let retries = 0; - - while (retries <= this.maxRetries) { - try { - reqresp.truncated = undefined; - const body = await this._doFetch(); - fetched = "fetched"; - - const responseRecord = createResponse(reqresp, pageid, body); - const requestRecord = createRequest(reqresp, responseRecord, pageid); - - const serializer = new WARCSerializer(responseRecord, { - gzip, - maxMemSize: this.maxFetchSize, - }); - - try { - let readSize = await serializer.digestRecord(); - if (serializer.httpHeadersBuff) { - readSize -= serializer.httpHeadersBuff.length; - } - reqresp.readSize = readSize; - // set truncated field and recompute header buff - if (reqresp.truncated) { - const retry = retries < this.maxRetries; - logger.warn( - "Response truncated", - { url, retry, ...logDetails }, - "recorder", - ); - // if retries available, just retry - if (retry) { - void serializer.externalBuffer?.purge(); - retries++; - continue; - } - responseRecord.warcHeaders.headers.set( - "WARC-Truncated", - reqresp.truncated, - ); - // todo: keep this internal in warcio after adding new header - serializer.warcHeadersBuff = encoder.encode( - responseRecord.warcHeaders.toString(), - ); - } - } catch (e) { - const retry = retries < this.maxRetries; - logger.error( - "Error reading + digesting payload", - { url, retry, ...formatErr(e), ...logDetails }, - "recorder", - ); - if (retry) { - void serializer.externalBuffer?.purge(); - retries++; - continue; - } - } - - if ( - reqresp.readSize === reqresp.expectedSize || - reqresp.expectedSize < 0 - ) { - logger.debug( - "Async fetch: streaming done", - { - size: reqresp.readSize, - expected: reqresp.expectedSize, - networkId, - url, - ...logDetails, - }, - "recorder", - ); - } else { - logger.warn( - "Async fetch: possible response size mismatch", - { - type: this.constructor.name, - size: reqresp.readSize, - expected: reqresp.expectedSize, - url, - retry: - retries < this.maxRetries && - (status === 206 || status === 200), - ...logDetails, - }, - "recorder", - ); - if (status === 206 || status === 200) { - void serializer.externalBuffer?.purge(); - await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status); - if (retries < this.maxRetries) { - retries++; - continue; - } - return "notfetched"; - } - } - - const externalBuffer: TempFileBuffer = - serializer.externalBuffer as TempFileBuffer; - - if (externalBuffer) { - const { currSize, buffers, fh } = externalBuffer; - - // if fully buffered in memory, then populate the payload to return to browser - if (buffers && buffers.length && !fh) { - reqresp.payload = Buffer.concat(buffers, currSize); - externalBuffer.buffers = [reqresp.payload]; - } else if (fh) { - logger.debug( - "Large payload written to WARC, but not returned to browser (would require rereading into memory)", - { - url, - actualSize: reqresp.readSize, - maxSize: this.maxFetchSize, - }, - "recorder", - ); - } - } - - if (Object.keys(reqresp.extraOpts).length) { - responseRecord.warcHeaders.headers.set( - "WARC-JSON-Metadata", - JSON.stringify(reqresp.extraOpts), - ); - } - - recorder.writer.writeRecordPair( - responseRecord, - requestRecord, - serializer, - ); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (e: any) { - await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url!, status); - if (e.message === "response-filtered-out") { - throw e; - } - const retry = retries < this.maxRetries; - logger.debug( - "Streaming Fetch Error", - { url, networkId, retry, ...formatErr(e), ...logDetails }, - "recorder", - ); - if (retry) { - retries++; - continue; - } - // indicate response is ultimately not valid - reqresp.status = 0; - reqresp.errorText = e.message; - } - // if we get here, successful (or out of retries), break out of loop - break; - } - } finally { - recorder.addPageRecord(reqresp); - // exclude direct fetch request with fake id - if (networkId !== "0") { - recorder.removeReqResp(networkId); + if (!(await this.loadBody())) { + continue; } + return true; } - - return fetched; + return false; } - async _doFetch() { + async loadHeaders() { + let success = false; + try { + if (this.useBrowserNetwork) { + const { method, expectedSize, inPageContext } = this.reqresp; + if ( + method !== "GET" || + expectedSize > MAX_NETWORK_LOAD_SIZE || + !inPageContext + ) { + this.useBrowserNetwork = false; + } + } + if (this.useBrowserNetwork) { + success = await this.loadHeadersNetwork(); + } + if (!success) { + this.useBrowserNetwork = false; + success = await this.loadHeadersFetch(); + } + } catch (e) { + logger.warn( + "Async load headers failed", + { ...formatErr(e), ...this.recorder.logDetails }, + "fetch", + ); + } + + return success; + } + + async loadBody() { + try { + const { reqresp, useBrowserNetwork, resp, stream, cdp, recorder } = this; + + let iter: AsyncIterable | undefined; + if (reqresp.expectedSize === 0) { + iter = undefined; + } else if (stream && useBrowserNetwork && cdp) { + iter = recorder.takeStreamIter(this.reqresp, cdp, stream); + } else if (resp && resp.body) { + iter = this.takeReader(resp.body.getReader()); + } else { + throw new Error("resp body missing"); + } + + return await recorder.serializeToWARC(reqresp, iter); + } catch (e) { + logger.warn( + "Async load body failed", + { ...formatErr(e), ...this.recorder.logDetails }, + "fetch", + ); + return false; + } + } + + async doCancel() { + const { resp, useBrowserNetwork } = this; + if (!useBrowserNetwork && resp) { + if (resp.status >= 300 && resp.status < 400) { + await resp.arrayBuffer(); + } else { + // otherwise, just cancel + resp.body?.cancel().catch(() => {}); + } + } + } + + async loadHeadersFetch() { const { reqresp } = this; const { method, url } = reqresp; logger.debug("Async started: fetch", { url }, "recorder"); @@ -1760,17 +1794,6 @@ class AsyncFetcher { dispatcher, }); - if (this.filter && !this.filter(resp)) { - // if redirect and cancelled, read whole buffer to avoid possible node error event - if (resp.status >= 300 && resp.status < 400) { - await resp.arrayBuffer(); - } else { - // otherwise, just cancel - resp.body?.cancel().catch(() => {}); - } - throw new Error("response-filtered-out"); - } - if ( reqresp.expectedSize < 0 && resp.headers.get("content-length") && @@ -1782,14 +1805,81 @@ class AsyncFetcher { if (reqresp.expectedSize === 0) { reqresp.fillFetchResponse(resp); reqresp.payload = new Uint8Array(); - return; + return true; } else if (!resp.body) { - throw new Error("fetch body missing, fetch aborted"); + return false; } reqresp.fillFetchResponse(resp); + this.resp = resp; + return true; + } - return this.takeReader(resp.body.getReader()); + async loadHeadersNetwork() { + const { reqresp, cdp } = this; + if (!cdp) { + return false; + } + const { url } = reqresp; + logger.debug("Async started: loadNetworkResource", { url }, "recorder"); + + const options = { disableCache: false, includeCredentials: true }; + + let result = null; + + try { + result = await cdp.send("Network.loadNetworkResource", { + frameId: reqresp.frameId, + url, + options, + }); + } catch (e) { + logger.debug( + "Network.loadNetworkResource failed, attempting node fetch", + { url, ...formatErr(e), ...this.recorder.logDetails }, + "recorder", + ); + return false; + } + + const { stream, headers, httpStatusCode, success, netError, netErrorName } = + result.resource; + + if (!success || !stream) { + //await this.recorder.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url); + logger.debug( + "Network.loadNetworkResource failed, attempting node fetch", + { + url, + netErrorName, + netError, + httpStatusCode, + ...this.recorder.logDetails, + }, + "recorder", + ); + return false; + } + + if ( + reqresp.expectedSize < 0 && + headers && + headers["content-length"] && + !headers["content-encoding"] + ) { + reqresp.expectedSize = Number(headers["content-length"] || -1); + } + + if (reqresp.expectedSize === 0) { + reqresp.payload = new Uint8Array(); + return true; + } + + reqresp.setStatus(httpStatusCode || 200); + reqresp.responseHeaders = headers || {}; + + this.stream = stream; + return true; } async *takeReader(reader: ReadableStreamDefaultReader) { @@ -1819,135 +1909,31 @@ class AsyncFetcher { } } - async *takeStreamIter(cdp: CDPSession, stream: Protocol.IO.StreamHandle) { - let size = 0; - try { - while (true) { - const { data, base64Encoded, eof } = await cdp.send("IO.read", { - handle: stream, - size: TAKE_STREAM_BUFF_SIZE, - }); - const buff = Buffer.from(data, base64Encoded ? "base64" : "utf-8"); + async loadDirectPage(state: PageState, crawler: Crawler) { + state.asyncLoading = true; - size += buff.length; - yield buff; + const success = await this.loadBody(); - if (eof) { - break; - } - } - } catch (e) { - logger.warn( - "takeStream interrupted", - { - size, - url: this.reqresp.url, - ...formatErr(e), - ...this.recorder.logDetails, - }, - "recorder", + this.recorder.addPageRecord(this.reqresp); + + const mime = this.reqresp.getMimeType(); + + if (mime) { + state.mime = mime; + state.isHTMLPage = isHTMLMime(mime); + } + if (success) { + state.loadState = LoadState.FULL_PAGE_LOADED; + state.status = 200; + state.ts = this.reqresp.ts || new Date(); + logger.info( + "Direct fetch successful", + { url: this.reqresp.url, mime, workerid: this.recorder.workerid }, + "fetch", ); - this.reqresp.truncated = "disconnect"; } - } -} - -// ================================================================= -class ResponseStreamAsyncFetcher extends AsyncFetcher { - cdp: CDPSession; - requestId: string; - - constructor(opts: ResponseStreamAsyncFetchOptions) { - super(opts); - this.cdp = opts.cdp; - this.requestId = opts.requestId; - // can't retry this type of fetch - this.maxRetries = 0; - } - - async _doFetch() { - const { requestId, reqresp, cdp } = this; - const { url } = reqresp; - logger.debug("Async started: takeStream", { url }, "recorder"); - - const { stream } = await cdp.send("Fetch.takeResponseBodyAsStream", { - requestId, - }); - - return this.takeStreamIter(cdp, stream); - } -} - -// ================================================================= -class NetworkLoadStreamAsyncFetcher extends AsyncFetcher { - cdp: CDPSession; - - constructor(opts: NetworkLoadAsyncFetchOptions) { - super(opts); - this.cdp = opts.cdp; - } - - async _doFetch() { - const { reqresp, cdp } = this; - const { url } = reqresp; - logger.debug("Async started: loadNetworkResource", { url }, "recorder"); - - const options = { disableCache: false, includeCredentials: true }; - - let result = null; - - try { - result = await cdp.send("Network.loadNetworkResource", { - frameId: reqresp.frameId, - url, - options, - }); - } catch (e) { - logger.debug( - "Network.loadNetworkResource failed, attempting node fetch", - { url, ...formatErr(e), ...this.recorder.logDetails }, - "recorder", - ); - return await super._doFetch(); - } - - const { stream, headers, httpStatusCode, success, netError, netErrorName } = - result.resource; - - if (!success || !stream) { - //await this.recorder.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url); - logger.debug( - "Network.loadNetworkResource failed, attempting node fetch", - { - url, - netErrorName, - netError, - httpStatusCode, - ...this.recorder.logDetails, - }, - "recorder", - ); - return await super._doFetch(); - } - - if ( - reqresp.expectedSize < 0 && - headers && - headers["content-length"] && - !headers["content-encoding"] - ) { - reqresp.expectedSize = Number(headers["content-length"] || -1); - } - - if (reqresp.expectedSize === 0) { - reqresp.payload = new Uint8Array(); - return; - } - - reqresp.setStatus(httpStatusCode || 200); - reqresp.responseHeaders = headers || {}; - - return this.takeStreamIter(cdp, stream); + state.asyncLoading = false; + await crawler.pageFinished(state); } } diff --git a/src/util/state.ts b/src/util/state.ts index ec18145e..9309116a 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -85,6 +85,7 @@ export class PageState { skipBehaviors = false; pageSkipped = false; + asyncLoading = false; filteredFrames: Frame[] = []; loadState: LoadState = LoadState.FAILED; contentCheckAllowed = false; diff --git a/tests/non-html-crawl.test.js b/tests/non-html-crawl.test.js index 83da9335..b015cc0e 100644 --- a/tests/non-html-crawl.test.js +++ b/tests/non-html-crawl.test.js @@ -76,7 +76,7 @@ test("PDF: check that the pages.jsonl file entry contains status code and mime t expect(pageH.loadState).toBe(2); }); -test("PDF: check that CDX contains one pdf 200, one 301 and one 200, two pageinfo entries", () => { +test("PDF: check that CDX contains data from two crawls: one pdf 200, one 301 and one 200, two pageinfo entries", () => { const filedata = fs.readFileSync( "test-crawls/collections/crawl-pdf/indexes/index.cdxj", { encoding: "utf-8" }, @@ -90,6 +90,7 @@ test("PDF: check that CDX contains one pdf 200, one 301 and one 200, two pageinf expect(cdxj[0].url).toBe(PDF_HTTP); expect(cdxj[0].status).toBe("301"); + // this is duplicated as this is data from two crawls expect(cdxj[1].url).toBe(PDF); expect(cdxj[1].status).toBe("200"); expect(cdxj[1].mime).toBe("application/pdf"); @@ -149,7 +150,7 @@ test("XML: check that CDX contains one xml 200, one 301 and one 200, two pageinf const lines = filedata.trim().split("\n"); const cdxj = lines.map(line => JSON.parse(line.split(" ").slice(2).join(" "))).sort((a, b) => a.url < b.url ? -1 : 1); - expect(cdxj.length).toBe(6); + expect(cdxj.length).toBe(5); expect(cdxj[0].url).toBe("https://webrecorder.net/favicon.ico"); @@ -157,18 +158,14 @@ test("XML: check that CDX contains one xml 200, one 301 and one 200, two pageinf expect(cdxj[1].status).toBe("200"); expect(cdxj[1].mime).toBe("application/xml"); - expect(cdxj[2].url).toBe(XML); - expect(cdxj[2].status).toBe("200"); - expect(cdxj[2].mime).toBe("application/xml"); + expect(cdxj[2].url).toBe(XML_REDIR); + expect(cdxj[2].status).toBe("301"); - expect(cdxj[3].url).toBe(XML_REDIR); - expect(cdxj[3].status).toBe("301"); + expect(cdxj[3].url).toBe("urn:pageinfo:" + XML); + expect(cdxj[3].mime).toBe("application/json"); - expect(cdxj[4].url).toBe("urn:pageinfo:" + XML); + expect(cdxj[4].url).toBe("urn:pageinfo:" + XML_REDIR); expect(cdxj[4].mime).toBe("application/json"); - - expect(cdxj[5].url).toBe("urn:pageinfo:" + XML_REDIR); - expect(cdxj[5].mime).toBe("application/json"); });