improvements to support pausing: (#919)

- clear size to 0 immediately after wacz is uploaded
- if crawler is in paused, ensure upload of any data on startup
- fetcher q: stop queuing async requests if recorder is marked for
stopping
This commit is contained in:
Ilya Kreymer 2025-11-25 19:17:39 -08:00 committed by GitHub
parent 565ba54454
commit b9b804e660
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 23 additions and 5 deletions

View file

@ -1554,7 +1554,10 @@ self.__bx_behaviors.selectMainBehavior();
if (interrupt) { if (interrupt) {
this.uploadAndDeleteLocal = true; this.uploadAndDeleteLocal = true;
this.gracefulFinishOnInterrupt(interrupt); this.gracefulFinishOnInterrupt(interrupt);
return true;
} }
return false;
} }
gracefulFinishOnInterrupt(interruptReason: InterruptReason) { gracefulFinishOnInterrupt(interruptReason: InterruptReason) {
@ -1691,7 +1694,11 @@ self.__bx_behaviors.selectMainBehavior();
return; return;
} }
await this.checkLimits(); if (await this.checkLimits()) {
// if interrupted
await this.postCrawl();
return;
}
await this.crawlState.setStatus("running"); await this.crawlState.setStatus("running");
@ -1869,6 +1876,7 @@ self.__bx_behaviors.selectMainBehavior();
const uploaded = await this.generateWACZ(); const uploaded = await this.generateWACZ();
if (uploaded && this.uploadAndDeleteLocal) { if (uploaded && this.uploadAndDeleteLocal) {
await this.crawlState.setArchiveSize(0);
logger.info( logger.info(
`Uploaded WACZ, deleting local data to free up space: ${this.collDir}`, `Uploaded WACZ, deleting local data to free up space: ${this.collDir}`,
); );

View file

@ -145,6 +145,8 @@ export class Recorder extends EventEmitter {
shouldSaveStorage = false; shouldSaveStorage = false;
stopping = false;
constructor({ constructor({
workerid, workerid,
writer, writer,
@ -857,9 +859,11 @@ export class Recorder extends EventEmitter {
} }
addAsyncFetch(opts: AsyncFetchOptions) { addAsyncFetch(opts: AsyncFetchOptions) {
if (!this.stopping) {
const fetcher = new AsyncFetcher(opts); const fetcher = new AsyncFetcher(opts);
void this.fetcherQ.add(() => fetcher.load()); void this.fetcherQ.add(() => fetcher.load());
} }
}
addExternalFetch(url: string, cdp: CDPSession) { addExternalFetch(url: string, cdp: CDPSession) {
logger.debug( logger.debug(
@ -1046,6 +1050,8 @@ export class Recorder extends EventEmitter {
} }
async onDone(timeout: number) { async onDone(timeout: number) {
this.stopping = true;
await this.crawlState.setStatus("pending-wait"); await this.crawlState.setStatus("pending-wait");
const finishFetch = async () => { const finishFetch = async () => {
@ -1063,6 +1069,8 @@ export class Recorder extends EventEmitter {
); );
} }
this.fetcherQ.clear();
logger.debug("Finishing WARC writing", this.logDetails, "recorder"); logger.debug("Finishing WARC writing", this.logDetails, "recorder");
await this.writer.flush(); await this.writer.flush();
@ -1356,8 +1364,10 @@ export class Recorder extends EventEmitter {
await fetcher.doCancel(); await fetcher.doCancel();
return false; return false;
} }
if (!this.stopping) {
state.asyncLoading = true; state.asyncLoading = true;
void this.fetcherQ.add(() => fetcher.loadDirectPage(state, crawler)); void this.fetcherQ.add(() => fetcher.loadDirectPage(state, crawler));
}
return true; return true;
} }