mirror of
https://github.com/webrecorder/browsertrix-crawler.git
synced 2025-12-07 13:49:47 +00:00
fix connection leaks in aborted fetch() requests (#924)
- in doCancel(), use abort controller and call abort(), instead of body.cancel() - ensure doCancel() is called when a WARC record is not written, eg. is a dupe, as stream is likely not consumed - also call IO.close() when uses browser network reader - fixes #923 - also adds missing dupe check to async resources queued from behaviors (were being deduped on write, but were still fetched unnecessarily)
This commit is contained in:
parent
8658df3999
commit
2ef8e00268
1 changed files with 42 additions and 16 deletions
|
|
@ -866,17 +866,29 @@ export class Recorder extends EventEmitter {
|
|||
}
|
||||
|
||||
addExternalFetch(url: string, cdp: CDPSession) {
|
||||
logger.debug(
|
||||
"Handling fetch from behavior",
|
||||
{ url, ...this.logDetails },
|
||||
"recorder",
|
||||
);
|
||||
const reqresp = new RequestResponseInfo("0");
|
||||
reqresp.url = url;
|
||||
reqresp.method = "GET";
|
||||
reqresp.frameId = this.mainFrameId || undefined;
|
||||
this.addAsyncFetch({ reqresp, recorder: this, cdp });
|
||||
// return true if successful
|
||||
|
||||
const details = { url, ...this.logDetails };
|
||||
|
||||
const fetchIfNotDupe = async () => {
|
||||
if (await this.isDupeFetch(reqresp)) {
|
||||
logger.debug("Skipping dupe fetch from behavior", details, "recorder");
|
||||
return false;
|
||||
}
|
||||
|
||||
logger.debug("Handling fetch from behavior", details, "recorder");
|
||||
|
||||
this.addAsyncFetch({ reqresp, recorder: this, cdp });
|
||||
};
|
||||
|
||||
void fetchIfNotDupe().catch(() =>
|
||||
logger.warn("Error fetching URL from behavior", details, "recorder"),
|
||||
);
|
||||
|
||||
// return true to indicate no need for in-browser fetch
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -1451,6 +1463,16 @@ export class Recorder extends EventEmitter {
|
|||
"recorder",
|
||||
);
|
||||
reqresp.truncated = "disconnect";
|
||||
} finally {
|
||||
try {
|
||||
await cdp.send("IO.close", { handle: stream });
|
||||
} catch (e) {
|
||||
logger.warn(
|
||||
"takeStream close failed",
|
||||
{ url: reqresp.url, ...this.logDetails },
|
||||
"recorder",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1662,6 +1684,7 @@ class AsyncFetcher {
|
|||
|
||||
stream?: string;
|
||||
resp?: Response;
|
||||
abort?: AbortController;
|
||||
|
||||
maxFetchSize: number;
|
||||
|
||||
|
|
@ -1753,7 +1776,11 @@ class AsyncFetcher {
|
|||
throw new Error("resp body missing");
|
||||
}
|
||||
|
||||
return await recorder.serializeToWARC(reqresp, iter);
|
||||
if (!(await recorder.serializeToWARC(reqresp, iter))) {
|
||||
await this.doCancel();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch (e) {
|
||||
logger.warn(
|
||||
"Async load body failed",
|
||||
|
|
@ -1765,14 +1792,10 @@ class AsyncFetcher {
|
|||
}
|
||||
|
||||
async doCancel() {
|
||||
const { resp, useBrowserNetwork } = this;
|
||||
if (!useBrowserNetwork && resp) {
|
||||
if (resp.status >= 300 && resp.status < 400) {
|
||||
await resp.arrayBuffer();
|
||||
} else {
|
||||
// otherwise, just cancel
|
||||
resp.body?.cancel().catch(() => {});
|
||||
}
|
||||
const { abort } = this;
|
||||
if (abort) {
|
||||
abort.abort();
|
||||
this.abort = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1796,12 +1819,15 @@ class AsyncFetcher {
|
|||
});
|
||||
}
|
||||
|
||||
this.abort = new AbortController();
|
||||
|
||||
const resp = await fetch(url!, {
|
||||
method,
|
||||
headers,
|
||||
body: reqresp.postData || undefined,
|
||||
redirect: this.manualRedirect ? "manual" : "follow",
|
||||
dispatcher,
|
||||
signal: this.abort.signal,
|
||||
});
|
||||
|
||||
if (
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue