From 5bb4527de22aadcbf24f881379c24a8975b6cbee Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Thu, 27 Nov 2025 21:00:24 -0800 Subject: [PATCH] (backport for 1.9.3 release) fix connection leaks in aborted fetch() requests (#924) (#925) - in doCancel(), use abort controller and call abort(), instead of body.cancel() - ensure doCancel() is called when a WARC record is not written, eg. is a dupe, as stream is likely not consumed - also call IO.close() when uses browser network reader - fixes #923 - also adds missing dupe check to async resources queued from behaviors (were being deduped on write, but were still fetched unnecessarily) - backport of #924 for 1.9.3 --- src/util/recorder.ts | 58 ++++++++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/src/util/recorder.ts b/src/util/recorder.ts index 96f95106..419c1d89 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -862,17 +862,29 @@ export class Recorder extends EventEmitter { } addExternalFetch(url: string, cdp: CDPSession) { - logger.debug( - "Handling fetch from behavior", - { url, ...this.logDetails }, - "recorder", - ); const reqresp = new RequestResponseInfo("0"); reqresp.url = url; reqresp.method = "GET"; reqresp.frameId = this.mainFrameId || undefined; - this.addAsyncFetch({ reqresp, recorder: this, cdp }); - // return true if successful + + const details = { url, ...this.logDetails }; + + const fetchIfNotDupe = async () => { + if (await this.isDupeFetch(reqresp)) { + logger.debug("Skipping dupe fetch from behavior", details, "recorder"); + return false; + } + + logger.debug("Handling fetch from behavior", details, "recorder"); + + this.addAsyncFetch({ reqresp, recorder: this, cdp }); + }; + + void fetchIfNotDupe().catch(() => + logger.warn("Error fetching URL from behavior", details, "recorder"), + ); + + // return true to indicate no need for in-browser fetch return true; } @@ -1441,6 +1453,16 @@ export class Recorder extends EventEmitter { "recorder", ); reqresp.truncated = "disconnect"; + } finally { + try { + await cdp.send("IO.close", { handle: stream }); + } catch (e) { + logger.warn( + "takeStream close failed", + { url: reqresp.url, ...this.logDetails }, + "recorder", + ); + } } } @@ -1652,6 +1674,7 @@ class AsyncFetcher { stream?: string; resp?: Response; + abort?: AbortController; maxFetchSize: number; @@ -1743,7 +1766,11 @@ class AsyncFetcher { throw new Error("resp body missing"); } - return await recorder.serializeToWARC(reqresp, iter); + if (!(await recorder.serializeToWARC(reqresp, iter))) { + await this.doCancel(); + return false; + } + return true; } catch (e) { logger.warn( "Async load body failed", @@ -1755,14 +1782,10 @@ class AsyncFetcher { } async doCancel() { - const { resp, useBrowserNetwork } = this; - if (!useBrowserNetwork && resp) { - if (resp.status >= 300 && resp.status < 400) { - await resp.arrayBuffer(); - } else { - // otherwise, just cancel - resp.body?.cancel().catch(() => {}); - } + const { abort } = this; + if (abort) { + abort.abort(); + this.abort = undefined; } } @@ -1786,12 +1809,15 @@ class AsyncFetcher { }); } + this.abort = new AbortController(); + const resp = await fetch(url!, { method, headers, body: reqresp.postData || undefined, redirect: this.manualRedirect ? "manual" : "follow", dispatcher, + signal: this.abort.signal, }); if (