async fetch: allow retrying async fetch if interrupted (#863)

- retry if 'truncated' set, or if size mismatch, or other exception
occurs
- retry only for network load and async fetch, not for response fetch
- set max retries to 2 (same as default for pages currently)
- fixes #831
This commit is contained in:
Ilya Kreymer 2025-07-08 10:02:09 -07:00 committed by GitHub
parent c84f58f539
commit 6244515818
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -26,6 +26,7 @@ import { Crawler } from "../crawler.js";
import { getProxyDispatcher } from "./proxy.js"; import { getProxyDispatcher } from "./proxy.js";
import { ScopedSeed } from "./seeds.js"; import { ScopedSeed } from "./seeds.js";
import EventEmitter from "events"; import EventEmitter from "events";
import { DEFAULT_MAX_RETRIES } from "./constants.js";
const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000; const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000;
const MAX_TEXT_REWRITE_SIZE = 25_000_000; const MAX_TEXT_REWRITE_SIZE = 25_000_000;
@ -1510,6 +1511,8 @@ class AsyncFetcher {
manualRedirect = false; manualRedirect = false;
maxRetries = DEFAULT_MAX_RETRIES;
constructor({ constructor({
reqresp, reqresp,
expectedSize = -1, expectedSize = -1,
@ -1555,122 +1558,165 @@ class AsyncFetcher {
} }
} }
const body = await this._doFetch(); let retries = 0;
fetched = "fetched";
const responseRecord = createResponse(reqresp, pageid, body); while (retries <= this.maxRetries) {
const requestRecord = createRequest(reqresp, responseRecord, pageid); try {
reqresp.truncated = undefined;
const body = await this._doFetch();
fetched = "fetched";
const serializer = new WARCSerializer(responseRecord, { const responseRecord = createResponse(reqresp, pageid, body);
gzip, const requestRecord = createRequest(reqresp, responseRecord, pageid);
maxMemSize: this.maxFetchSize,
});
try { const serializer = new WARCSerializer(responseRecord, {
let readSize = await serializer.digestRecord(); gzip,
if (serializer.httpHeadersBuff) { maxMemSize: this.maxFetchSize,
readSize -= serializer.httpHeadersBuff.length; });
}
reqresp.readSize = readSize; try {
// set truncated field and recompute header buff let readSize = await serializer.digestRecord();
if (reqresp.truncated) { if (serializer.httpHeadersBuff) {
responseRecord.warcHeaders.headers.set( readSize -= serializer.httpHeadersBuff.length;
"WARC-Truncated", }
reqresp.truncated, reqresp.readSize = readSize;
// set truncated field and recompute header buff
if (reqresp.truncated) {
const retry = retries < this.maxRetries;
logger.warn(
"Response truncated",
{ url, retry, ...logDetails },
"recorder",
);
// if retries available, just retry
if (retry) {
void serializer.externalBuffer?.purge();
retries++;
continue;
}
responseRecord.warcHeaders.headers.set(
"WARC-Truncated",
reqresp.truncated,
);
// todo: keep this internal in warcio after adding new header
serializer.warcHeadersBuff = encoder.encode(
responseRecord.warcHeaders.toString(),
);
}
} catch (e) {
const retry = retries < this.maxRetries;
logger.error(
"Error reading + digesting payload",
{ url, retry, ...formatErr(e), ...logDetails },
"recorder",
);
if (retry) {
void serializer.externalBuffer?.purge();
retries++;
continue;
}
}
if (
reqresp.readSize === reqresp.expectedSize ||
reqresp.expectedSize < 0
) {
logger.debug(
"Async fetch: streaming done",
{
size: reqresp.readSize,
expected: reqresp.expectedSize,
networkId,
url,
...logDetails,
},
"recorder",
);
} else {
logger.warn(
"Async fetch: possible response size mismatch",
{
type: this.constructor.name,
size: reqresp.readSize,
expected: reqresp.expectedSize,
url,
retry:
retries < this.maxRetries &&
(status === 206 || status === 200),
...logDetails,
},
"recorder",
);
if (status === 206 || status === 200) {
void serializer.externalBuffer?.purge();
await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status);
if (retries < this.maxRetries) {
retries++;
continue;
}
return "notfetched";
}
}
const externalBuffer: TempFileBuffer =
serializer.externalBuffer as TempFileBuffer;
if (externalBuffer) {
const { currSize, buffers, fh } = externalBuffer;
// if fully buffered in memory, then populate the payload to return to browser
if (buffers && buffers.length && !fh) {
reqresp.payload = Buffer.concat(buffers, currSize);
externalBuffer.buffers = [reqresp.payload];
} else if (fh) {
logger.debug(
"Large payload written to WARC, but not returned to browser (would require rereading into memory)",
{
url,
actualSize: reqresp.readSize,
maxSize: this.maxFetchSize,
},
"recorder",
);
}
}
if (Object.keys(reqresp.extraOpts).length) {
responseRecord.warcHeaders.headers.set(
"WARC-JSON-Metadata",
JSON.stringify(reqresp.extraOpts),
);
}
recorder.writer.writeRecordPair(
responseRecord,
requestRecord,
serializer,
); );
// todo: keep this internal in warcio after adding new header
serializer.warcHeadersBuff = encoder.encode(
responseRecord.warcHeaders.toString(),
);
}
} catch (e) {
logger.error(
"Error reading + digesting payload",
{ url, ...formatErr(e), ...logDetails },
"recorder",
);
}
if ( // eslint-disable-next-line @typescript-eslint/no-explicit-any
reqresp.readSize === reqresp.expectedSize || } catch (e: any) {
reqresp.expectedSize < 0 await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url!, status);
) { if (e.message === "response-filtered-out") {
logger.debug( throw e;
"Async fetch: streaming done", }
{ const retry = retries < this.maxRetries;
size: reqresp.readSize,
expected: reqresp.expectedSize,
networkId,
url,
...logDetails,
},
"recorder",
);
} else {
logger.warn(
"Async fetch: possible response size mismatch",
{
type: this.constructor.name,
size: reqresp.readSize,
expected: reqresp.expectedSize,
url,
...logDetails,
},
"recorder",
);
if (status === 206 || status === 200) {
void serializer.externalBuffer?.purge();
await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status);
return "notfetched";
}
}
const externalBuffer: TempFileBuffer =
serializer.externalBuffer as TempFileBuffer;
if (externalBuffer) {
const { currSize, buffers, fh } = externalBuffer;
// if fully buffered in memory, then populate the payload to return to browser
if (buffers && buffers.length && !fh) {
reqresp.payload = Buffer.concat(buffers, currSize);
externalBuffer.buffers = [reqresp.payload];
} else if (fh) {
logger.debug( logger.debug(
"Large payload written to WARC, but not returned to browser (would require rereading into memory)", "Streaming Fetch Error",
{ url, actualSize: reqresp.readSize, maxSize: this.maxFetchSize }, { url, networkId, retry, ...formatErr(e), ...logDetails },
"recorder", "recorder",
); );
if (retry) {
retries++;
continue;
}
// indicate response is ultimately not valid
reqresp.status = 0;
reqresp.errorText = e.message;
} }
// if we get here, successful (or out of retries), break out of loop
break;
} }
if (Object.keys(reqresp.extraOpts).length) {
responseRecord.warcHeaders.headers.set(
"WARC-JSON-Metadata",
JSON.stringify(reqresp.extraOpts),
);
}
recorder.writer.writeRecordPair(
responseRecord,
requestRecord,
serializer,
);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url!, status);
if (e.message === "response-filtered-out") {
throw e;
}
logger.debug(
"Streaming Fetch Error",
{ url, networkId, ...formatErr(e), ...logDetails },
"recorder",
);
// indicate response is ultimately not valid
reqresp.status = 0;
reqresp.errorText = e.message;
} finally { } finally {
recorder.addPageRecord(reqresp); recorder.addPageRecord(reqresp);
// exclude direct fetch request with fake id // exclude direct fetch request with fake id
@ -1811,6 +1857,8 @@ class ResponseStreamAsyncFetcher extends AsyncFetcher {
super(opts); super(opts);
this.cdp = opts.cdp; this.cdp = opts.cdp;
this.requestId = opts.requestId; this.requestId = opts.requestId;
// can't retry this type of fetch
this.maxRetries = 0;
} }
async _doFetch() { async _doFetch() {