(backport for 1.9.3 release) fix connection leaks in aborted fetch() requests (#924) (#925)

- in doCancel(), use abort controller and call abort(), instead of
body.cancel()
- ensure doCancel() is called when a WARC record is not written, eg. is
a dupe, as stream is likely not consumed
- also call IO.close() when uses browser network reader
- fixes #923
- also adds missing dupe check to async resources queued from behaviors
(were being deduped on write, but were still fetched unnecessarily)
- backport of #924 for 1.9.3
This commit is contained in:
Ilya Kreymer 2025-11-27 21:00:24 -08:00 committed by GitHub
parent 6a163ddc47
commit 5bb4527de2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -862,17 +862,29 @@ export class Recorder extends EventEmitter {
} }
addExternalFetch(url: string, cdp: CDPSession) { addExternalFetch(url: string, cdp: CDPSession) {
logger.debug(
"Handling fetch from behavior",
{ url, ...this.logDetails },
"recorder",
);
const reqresp = new RequestResponseInfo("0"); const reqresp = new RequestResponseInfo("0");
reqresp.url = url; reqresp.url = url;
reqresp.method = "GET"; reqresp.method = "GET";
reqresp.frameId = this.mainFrameId || undefined; reqresp.frameId = this.mainFrameId || undefined;
this.addAsyncFetch({ reqresp, recorder: this, cdp });
// return true if successful const details = { url, ...this.logDetails };
const fetchIfNotDupe = async () => {
if (await this.isDupeFetch(reqresp)) {
logger.debug("Skipping dupe fetch from behavior", details, "recorder");
return false;
}
logger.debug("Handling fetch from behavior", details, "recorder");
this.addAsyncFetch({ reqresp, recorder: this, cdp });
};
void fetchIfNotDupe().catch(() =>
logger.warn("Error fetching URL from behavior", details, "recorder"),
);
// return true to indicate no need for in-browser fetch
return true; return true;
} }
@ -1441,6 +1453,16 @@ export class Recorder extends EventEmitter {
"recorder", "recorder",
); );
reqresp.truncated = "disconnect"; reqresp.truncated = "disconnect";
} finally {
try {
await cdp.send("IO.close", { handle: stream });
} catch (e) {
logger.warn(
"takeStream close failed",
{ url: reqresp.url, ...this.logDetails },
"recorder",
);
}
} }
} }
@ -1652,6 +1674,7 @@ class AsyncFetcher {
stream?: string; stream?: string;
resp?: Response; resp?: Response;
abort?: AbortController;
maxFetchSize: number; maxFetchSize: number;
@ -1743,7 +1766,11 @@ class AsyncFetcher {
throw new Error("resp body missing"); throw new Error("resp body missing");
} }
return await recorder.serializeToWARC(reqresp, iter); if (!(await recorder.serializeToWARC(reqresp, iter))) {
await this.doCancel();
return false;
}
return true;
} catch (e) { } catch (e) {
logger.warn( logger.warn(
"Async load body failed", "Async load body failed",
@ -1755,14 +1782,10 @@ class AsyncFetcher {
} }
async doCancel() { async doCancel() {
const { resp, useBrowserNetwork } = this; const { abort } = this;
if (!useBrowserNetwork && resp) { if (abort) {
if (resp.status >= 300 && resp.status < 400) { abort.abort();
await resp.arrayBuffer(); this.abort = undefined;
} else {
// otherwise, just cancel
resp.body?.cancel().catch(() => {});
}
} }
} }
@ -1786,12 +1809,15 @@ class AsyncFetcher {
}); });
} }
this.abort = new AbortController();
const resp = await fetch(url!, { const resp = await fetch(url!, {
method, method,
headers, headers,
body: reqresp.postData || undefined, body: reqresp.postData || undefined,
redirect: this.manualRedirect ? "manual" : "follow", redirect: this.manualRedirect ? "manual" : "follow",
dispatcher, dispatcher,
signal: this.abort.signal,
}); });
if ( if (