mirror of
https://github.com/webrecorder/browsertrix-crawler.git
synced 2025-12-07 21:59:48 +00:00
sitemapper refactor to fix concurrency: (#930)
- original implementation did not actually wait for sitemap to complete before queuing new ones, resulting in concurrency resource leak - refactor to await completion of sitemap parser, replacing pending list with counter - also, don't parse sitemap if single-page and no extra hops! - fixes issues in #928
This commit is contained in:
parent
59df6bbd3f
commit
2914e93152
2 changed files with 102 additions and 49 deletions
|
|
@ -2714,8 +2714,17 @@ self.__bx_behaviors.selectMainBehavior();
|
|||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
(this.params.scopeType === "page" ||
|
||||
this.params.scopeType === "page-spa") &&
|
||||
!this.params.extraHops
|
||||
) {
|
||||
logger.info("Single page crawl, skipping sitemap", {}, "sitemap");
|
||||
return;
|
||||
}
|
||||
|
||||
if (await this.crawlState.isSitemapDone()) {
|
||||
logger.info("Sitemap already processed, skipping", "sitemap");
|
||||
logger.info("Sitemap already processed, skipping", {}, "sitemap");
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -2739,23 +2748,12 @@ self.__bx_behaviors.selectMainBehavior();
|
|||
limit: this.pageLimit,
|
||||
});
|
||||
|
||||
try {
|
||||
await sitemapper.parse(sitemap, url);
|
||||
} catch (e) {
|
||||
logger.warn(
|
||||
"Sitemap for seed failed",
|
||||
{ url, sitemap, ...formatErr(e) },
|
||||
"sitemap",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let power = 1;
|
||||
let resolved = false;
|
||||
|
||||
let finished = false;
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const p = new Promise<void>((resolve) => {
|
||||
sitemapper.on("end", () => {
|
||||
resolve();
|
||||
if (!finished) {
|
||||
|
|
@ -2778,9 +2776,10 @@ self.__bx_behaviors.selectMainBehavior();
|
|||
power++;
|
||||
}
|
||||
const sitemapsQueued = sitemapper.getSitemapsQueued();
|
||||
const pending = sitemapper.getNumPending();
|
||||
logger.debug(
|
||||
"Sitemap URLs processed so far",
|
||||
{ count, sitemapsQueued },
|
||||
{ count, sitemapsQueued, pending },
|
||||
"sitemap",
|
||||
);
|
||||
}
|
||||
|
|
@ -2798,6 +2797,19 @@ self.__bx_behaviors.selectMainBehavior();
|
|||
}
|
||||
});
|
||||
});
|
||||
|
||||
try {
|
||||
await sitemapper.parse(sitemap, url);
|
||||
} catch (e) {
|
||||
logger.warn(
|
||||
"Sitemap for seed failed",
|
||||
{ url, sitemap, ...formatErr(e) },
|
||||
"sitemap",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
await p;
|
||||
}
|
||||
|
||||
async combineWARC() {
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ export class SitemapReader extends EventEmitter {
|
|||
queue: PQueue;
|
||||
|
||||
seenSitemapSet: Set<string>;
|
||||
pending: Set<string>;
|
||||
pending = 0;
|
||||
|
||||
count = 0;
|
||||
limit: number;
|
||||
|
|
@ -52,8 +52,6 @@ export class SitemapReader extends EventEmitter {
|
|||
this.seenSitemapSet = new Set<string>();
|
||||
|
||||
this.limit = opts.limit || 0;
|
||||
|
||||
this.pending = new Set<string>();
|
||||
}
|
||||
|
||||
getCT(headers: Headers) {
|
||||
|
|
@ -191,8 +189,10 @@ export class SitemapReader extends EventEmitter {
|
|||
{ fullUrl, seedUrl },
|
||||
"sitemap",
|
||||
);
|
||||
this._parseSitemapFromResponse(fullUrl, resp);
|
||||
await this.parseSitemapFromResponse(fullUrl, resp);
|
||||
}
|
||||
|
||||
await this.checkIfDone();
|
||||
}
|
||||
|
||||
async parseFromRobots(url: string) {
|
||||
|
|
@ -218,23 +218,61 @@ export class SitemapReader extends EventEmitter {
|
|||
|
||||
async parseSitemap(url: string) {
|
||||
this.seenSitemapSet.add(url);
|
||||
this.pending.add(url);
|
||||
|
||||
const resp = await this._fetchWithRetry(url, "Sitemap parse failed");
|
||||
if (!resp) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._parseSitemapFromResponse(url, resp);
|
||||
await this.parseSitemapFromResponse(url, resp);
|
||||
|
||||
await this.checkIfDone();
|
||||
}
|
||||
|
||||
private _parseSitemapFromResponse(url: string, resp: Response) {
|
||||
private async parseSitemapFromResponse(url: string, resp: Response) {
|
||||
let resolve: () => void;
|
||||
let reject: () => void;
|
||||
|
||||
const promise = new Promise<void>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
|
||||
this.pending++;
|
||||
|
||||
try {
|
||||
this.doParseSitemapFromResponse(url, resp, resolve!, reject!);
|
||||
|
||||
await promise;
|
||||
} finally {
|
||||
this.pending--;
|
||||
}
|
||||
}
|
||||
|
||||
async checkIfDone() {
|
||||
if (!this.pending) {
|
||||
// this needs to happen async since if its in the current task,
|
||||
// queue won't be idle until this completes
|
||||
setTimeout(async () => {
|
||||
await this.queue.onIdle();
|
||||
this.emit("end");
|
||||
}, 100);
|
||||
}
|
||||
}
|
||||
|
||||
private doParseSitemapFromResponse(
|
||||
url: string,
|
||||
resp: Response,
|
||||
resolve: () => void,
|
||||
reject: () => void,
|
||||
) {
|
||||
let stream;
|
||||
|
||||
const { body } = resp;
|
||||
if (!body) {
|
||||
void this.closeSitemap(url);
|
||||
throw new Error("missing response body");
|
||||
logger.warn("Sitemap missing response body", {}, "sitemap");
|
||||
reject();
|
||||
return;
|
||||
}
|
||||
// decompress .gz sitemaps
|
||||
// if content-encoding is gzip, then likely already being decompressed by fetch api
|
||||
|
|
@ -255,23 +293,18 @@ export class SitemapReader extends EventEmitter {
|
|||
|
||||
readableNodeStream.on("error", (e: Error) => {
|
||||
logger.warn("Error parsing sitemap", formatErr(e), "sitemap");
|
||||
void this.closeSitemap(url);
|
||||
reject();
|
||||
});
|
||||
|
||||
this.initSaxParser(url, readableNodeStream);
|
||||
this.initSaxParser(url, readableNodeStream, resolve, reject);
|
||||
}
|
||||
|
||||
private async closeSitemap(url: string) {
|
||||
this.pending.delete(url);
|
||||
if (!this.pending.size) {
|
||||
await this.queue.onIdle();
|
||||
this.emit("end");
|
||||
}
|
||||
}
|
||||
|
||||
initSaxParser(url: string, sourceStream: Readable) {
|
||||
this.pending.add(url);
|
||||
|
||||
private initSaxParser(
|
||||
url: string,
|
||||
sourceStream: Readable,
|
||||
resolve: () => void,
|
||||
reject: () => void,
|
||||
) {
|
||||
const parserStream = sax.createStream(false, {
|
||||
trim: true,
|
||||
normalize: true,
|
||||
|
|
@ -315,7 +348,7 @@ export class SitemapReader extends EventEmitter {
|
|||
}
|
||||
};
|
||||
|
||||
parserStream.on("end", () => this.closeSitemap(url));
|
||||
parserStream.on("end", () => resolve());
|
||||
|
||||
parserStream.on("opentag", (node: sax.Tag) => {
|
||||
switch (node.name) {
|
||||
|
|
@ -398,19 +431,23 @@ export class SitemapReader extends EventEmitter {
|
|||
parserStream.on("cdata", (text: string) => processText(text));
|
||||
|
||||
parserStream.on("error", (err: Error) => {
|
||||
const msg = { url, err, errCount };
|
||||
if (this.atLimit()) {
|
||||
this.pending.delete(url);
|
||||
return;
|
||||
}
|
||||
logger.warn(
|
||||
"Sitemap error parsing XML",
|
||||
{ url, err, errCount },
|
||||
"sitemap",
|
||||
);
|
||||
if (errCount++ < 3) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(parserStream._parser as any).error = null;
|
||||
parserStream._parser.resume();
|
||||
logger.warn(
|
||||
"Sitemap parsing aborting, page limit reached",
|
||||
msg,
|
||||
"sitemap",
|
||||
);
|
||||
resolve();
|
||||
} else {
|
||||
logger.warn("Sitemap error parsing XML", msg, "sitemap");
|
||||
if (errCount++ < 3) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(parserStream._parser as any).error = null;
|
||||
parserStream._parser.resume();
|
||||
} else {
|
||||
reject();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -488,4 +525,8 @@ export class SitemapReader extends EventEmitter {
|
|||
getSitemapsQueued() {
|
||||
return this.queue.size;
|
||||
}
|
||||
|
||||
getNumPending() {
|
||||
return this.pending;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue