diff --git a/src/util/recorder.ts b/src/util/recorder.ts index 360f8153..5c2c96e0 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -26,6 +26,7 @@ import { Crawler } from "../crawler.js"; import { getProxyDispatcher } from "./proxy.js"; import { ScopedSeed } from "./seeds.js"; import EventEmitter from "events"; +import { DEFAULT_MAX_RETRIES } from "./constants.js"; const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000; const MAX_TEXT_REWRITE_SIZE = 25_000_000; @@ -1510,6 +1511,8 @@ class AsyncFetcher { manualRedirect = false; + maxRetries = DEFAULT_MAX_RETRIES; + constructor({ reqresp, expectedSize = -1, @@ -1555,122 +1558,165 @@ class AsyncFetcher { } } - const body = await this._doFetch(); - fetched = "fetched"; + let retries = 0; - const responseRecord = createResponse(reqresp, pageid, body); - const requestRecord = createRequest(reqresp, responseRecord, pageid); + while (retries <= this.maxRetries) { + try { + reqresp.truncated = undefined; + const body = await this._doFetch(); + fetched = "fetched"; - const serializer = new WARCSerializer(responseRecord, { - gzip, - maxMemSize: this.maxFetchSize, - }); + const responseRecord = createResponse(reqresp, pageid, body); + const requestRecord = createRequest(reqresp, responseRecord, pageid); - try { - let readSize = await serializer.digestRecord(); - if (serializer.httpHeadersBuff) { - readSize -= serializer.httpHeadersBuff.length; - } - reqresp.readSize = readSize; - // set truncated field and recompute header buff - if (reqresp.truncated) { - responseRecord.warcHeaders.headers.set( - "WARC-Truncated", - reqresp.truncated, + const serializer = new WARCSerializer(responseRecord, { + gzip, + maxMemSize: this.maxFetchSize, + }); + + try { + let readSize = await serializer.digestRecord(); + if (serializer.httpHeadersBuff) { + readSize -= serializer.httpHeadersBuff.length; + } + reqresp.readSize = readSize; + // set truncated field and recompute header buff + if (reqresp.truncated) { + const retry = retries < this.maxRetries; + logger.warn( + "Response truncated", + { url, retry, ...logDetails }, + "recorder", + ); + // if retries available, just retry + if (retry) { + void serializer.externalBuffer?.purge(); + retries++; + continue; + } + responseRecord.warcHeaders.headers.set( + "WARC-Truncated", + reqresp.truncated, + ); + // todo: keep this internal in warcio after adding new header + serializer.warcHeadersBuff = encoder.encode( + responseRecord.warcHeaders.toString(), + ); + } + } catch (e) { + const retry = retries < this.maxRetries; + logger.error( + "Error reading + digesting payload", + { url, retry, ...formatErr(e), ...logDetails }, + "recorder", + ); + if (retry) { + void serializer.externalBuffer?.purge(); + retries++; + continue; + } + } + + if ( + reqresp.readSize === reqresp.expectedSize || + reqresp.expectedSize < 0 + ) { + logger.debug( + "Async fetch: streaming done", + { + size: reqresp.readSize, + expected: reqresp.expectedSize, + networkId, + url, + ...logDetails, + }, + "recorder", + ); + } else { + logger.warn( + "Async fetch: possible response size mismatch", + { + type: this.constructor.name, + size: reqresp.readSize, + expected: reqresp.expectedSize, + url, + retry: + retries < this.maxRetries && + (status === 206 || status === 200), + ...logDetails, + }, + "recorder", + ); + if (status === 206 || status === 200) { + void serializer.externalBuffer?.purge(); + await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status); + if (retries < this.maxRetries) { + retries++; + continue; + } + return "notfetched"; + } + } + + const externalBuffer: TempFileBuffer = + serializer.externalBuffer as TempFileBuffer; + + if (externalBuffer) { + const { currSize, buffers, fh } = externalBuffer; + + // if fully buffered in memory, then populate the payload to return to browser + if (buffers && buffers.length && !fh) { + reqresp.payload = Buffer.concat(buffers, currSize); + externalBuffer.buffers = [reqresp.payload]; + } else if (fh) { + logger.debug( + "Large payload written to WARC, but not returned to browser (would require rereading into memory)", + { + url, + actualSize: reqresp.readSize, + maxSize: this.maxFetchSize, + }, + "recorder", + ); + } + } + + if (Object.keys(reqresp.extraOpts).length) { + responseRecord.warcHeaders.headers.set( + "WARC-JSON-Metadata", + JSON.stringify(reqresp.extraOpts), + ); + } + + recorder.writer.writeRecordPair( + responseRecord, + requestRecord, + serializer, ); - // todo: keep this internal in warcio after adding new header - serializer.warcHeadersBuff = encoder.encode( - responseRecord.warcHeaders.toString(), - ); - } - } catch (e) { - logger.error( - "Error reading + digesting payload", - { url, ...formatErr(e), ...logDetails }, - "recorder", - ); - } - if ( - reqresp.readSize === reqresp.expectedSize || - reqresp.expectedSize < 0 - ) { - logger.debug( - "Async fetch: streaming done", - { - size: reqresp.readSize, - expected: reqresp.expectedSize, - networkId, - url, - ...logDetails, - }, - "recorder", - ); - } else { - logger.warn( - "Async fetch: possible response size mismatch", - { - type: this.constructor.name, - size: reqresp.readSize, - expected: reqresp.expectedSize, - url, - ...logDetails, - }, - "recorder", - ); - if (status === 206 || status === 200) { - void serializer.externalBuffer?.purge(); - await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status); - return "notfetched"; - } - } - - const externalBuffer: TempFileBuffer = - serializer.externalBuffer as TempFileBuffer; - - if (externalBuffer) { - const { currSize, buffers, fh } = externalBuffer; - - // if fully buffered in memory, then populate the payload to return to browser - if (buffers && buffers.length && !fh) { - reqresp.payload = Buffer.concat(buffers, currSize); - externalBuffer.buffers = [reqresp.payload]; - } else if (fh) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (e: any) { + await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url!, status); + if (e.message === "response-filtered-out") { + throw e; + } + const retry = retries < this.maxRetries; logger.debug( - "Large payload written to WARC, but not returned to browser (would require rereading into memory)", - { url, actualSize: reqresp.readSize, maxSize: this.maxFetchSize }, + "Streaming Fetch Error", + { url, networkId, retry, ...formatErr(e), ...logDetails }, "recorder", ); + if (retry) { + retries++; + continue; + } + // indicate response is ultimately not valid + reqresp.status = 0; + reqresp.errorText = e.message; } + // if we get here, successful (or out of retries), break out of loop + break; } - - if (Object.keys(reqresp.extraOpts).length) { - responseRecord.warcHeaders.headers.set( - "WARC-JSON-Metadata", - JSON.stringify(reqresp.extraOpts), - ); - } - - recorder.writer.writeRecordPair( - responseRecord, - requestRecord, - serializer, - ); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (e: any) { - await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url!, status); - if (e.message === "response-filtered-out") { - throw e; - } - logger.debug( - "Streaming Fetch Error", - { url, networkId, ...formatErr(e), ...logDetails }, - "recorder", - ); - // indicate response is ultimately not valid - reqresp.status = 0; - reqresp.errorText = e.message; } finally { recorder.addPageRecord(reqresp); // exclude direct fetch request with fake id @@ -1811,6 +1857,8 @@ class ResponseStreamAsyncFetcher extends AsyncFetcher { super(opts); this.cdp = opts.cdp; this.requestId = opts.requestId; + // can't retry this type of fetch + this.maxRetries = 0; } async _doFetch() {