State Save + Restore State from Config + Redis State + Scope Fix 0.5.0 (#78)
* save state work:
- support interrupting and saving crawl
- support loading crawl state (frontier queue, pending, done) from YAML
- support scope check when loading to apply new scoping rules when restarting crawl
- failed urls added to done as failed, can be retried if crawl is stopped and restarted
- save state to crawls/crawl-<ts>-<id>.yaml when interrupted
- --saveState option controls when crawl state is saved, default to partial/when interrupted, also always, never.
- support in-memory or redis based crawl state, using fork of puppeteer-cluster
- --redisStore used to enable redis-based state
* signals/crawl interruption:
- crawl state set to drain/not provide any more urls to crawl
- graceful stop of crawl in response to sigint/sigterm
- initial sigint/sigterm waits for graceful end of current pages, second terminates immediately
- initial sigabrt followed by sigterm terminates immediately
- puppeteer disable handleSIGTERM, handleSIGHUP, handleSIGINT
* redis state support:
- use lua scripts for atomic move from queue -> pending, and pending -> done
- pending key expiry set to page timeout
- add numPending() and numSeen() to support better puppeteer-cluster semantics for early termination
- drainMax returns the numPending() + numSeen() to work with cluster stats
* arg improvements:
- add --crawlId param, also settable via CRAWL_ID env var, defaulting to os.hostname() (used for redis key and crawl state file)
- support setting cmdline args via env var CRAWL_ARGS
- use 'choices' in args when possible
* build update:
- switch base browser image to new webrecorder/browsertrix-browser-base, simple image with .deb files only for amd64 and arm64 builds
- use setuptools<58.0
* misc crawl/scoping rule fixes:
- scoping rules fix when external is used with scopeType
state:
- limit: ensure no urls, including initial seeds, are added past the limit
- signals: fix immediate shutdown on second signal
- tests: add scope test for default scope + excludes
* py-wacz update
- add 'seed': true to pages that are seeds for optimized wacz creation, keeping non-seeds separate (supported via wacz 0.3.2)
- pywb: use latest pywb branch for improved twitter video capture
* update to latest browsertrix-behaviors
* fix setuptools dependency #88
* update README for 0.5.0 beta
2021-09-28 09:41:16 -07:00
const Job = require ( "puppeteer-cluster/dist/Job" ) . default ;
// ============================================================================
class BaseState
{
constructor ( ) {
this . drainMax = 0 ;
}
async setDrain ( ) {
this . drainMax = ( await this . numPending ( ) ) + ( await this . numDone ( ) ) ;
}
async size ( ) {
return this . drainMax ? 0 : await this . realSize ( ) ;
}
async finished ( ) {
return await this . realSize ( ) == 0 ;
}
async numSeen ( ) {
return this . drainMax ? this . drainMax : await this . numRealSeen ( ) ;
}
recheckScope ( data , seeds ) {
const seed = seeds [ data . seedId ] ;
2022-01-15 09:03:09 -08:00
return seed . isIncluded ( data . url , data . depth , data . extraHops ) ;
State Save + Restore State from Config + Redis State + Scope Fix 0.5.0 (#78)
* save state work:
- support interrupting and saving crawl
- support loading crawl state (frontier queue, pending, done) from YAML
- support scope check when loading to apply new scoping rules when restarting crawl
- failed urls added to done as failed, can be retried if crawl is stopped and restarted
- save state to crawls/crawl-<ts>-<id>.yaml when interrupted
- --saveState option controls when crawl state is saved, default to partial/when interrupted, also always, never.
- support in-memory or redis based crawl state, using fork of puppeteer-cluster
- --redisStore used to enable redis-based state
* signals/crawl interruption:
- crawl state set to drain/not provide any more urls to crawl
- graceful stop of crawl in response to sigint/sigterm
- initial sigint/sigterm waits for graceful end of current pages, second terminates immediately
- initial sigabrt followed by sigterm terminates immediately
- puppeteer disable handleSIGTERM, handleSIGHUP, handleSIGINT
* redis state support:
- use lua scripts for atomic move from queue -> pending, and pending -> done
- pending key expiry set to page timeout
- add numPending() and numSeen() to support better puppeteer-cluster semantics for early termination
- drainMax returns the numPending() + numSeen() to work with cluster stats
* arg improvements:
- add --crawlId param, also settable via CRAWL_ID env var, defaulting to os.hostname() (used for redis key and crawl state file)
- support setting cmdline args via env var CRAWL_ARGS
- use 'choices' in args when possible
* build update:
- switch base browser image to new webrecorder/browsertrix-browser-base, simple image with .deb files only for amd64 and arm64 builds
- use setuptools<58.0
* misc crawl/scoping rule fixes:
- scoping rules fix when external is used with scopeType
state:
- limit: ensure no urls, including initial seeds, are added past the limit
- signals: fix immediate shutdown on second signal
- tests: add scope test for default scope + excludes
* py-wacz update
- add 'seed': true to pages that are seeds for optimized wacz creation, keeping non-seeds separate (supported via wacz 0.3.2)
- pywb: use latest pywb branch for improved twitter video capture
* update to latest browsertrix-behaviors
* fix setuptools dependency #88
* update README for 0.5.0 beta
2021-09-28 09:41:16 -07:00
}
}
// ============================================================================
class MemoryCrawlState extends BaseState
{
constructor ( ) {
super ( ) ;
this . seenList = new Set ( ) ;
this . queue = [ ] ;
this . pending = new Set ( ) ;
this . done = [ ] ;
}
push ( job ) {
this . queue . unshift ( job . data ) ;
}
realSize ( ) {
return this . queue . length ;
}
shift ( ) {
const data = this . queue . pop ( ) ;
data . started = new Date ( ) . toISOString ( ) ;
const str = JSON . stringify ( data ) ;
this . pending . add ( str ) ;
const callback = {
resolve : ( ) => {
this . pending . delete ( str ) ;
data . finished = new Date ( ) . toISOString ( ) ;
this . done . unshift ( data ) ;
} ,
reject : ( e ) => {
this . pending . delete ( str ) ;
console . warn ( ` URL Load Failed: ${ data . url } , Reason: ${ e } ` ) ;
data . failed = true ;
this . done . unshift ( data ) ;
}
} ;
return new Job ( data , undefined , callback ) ;
}
has ( url ) {
return this . seenList . has ( url ) ;
}
add ( url ) {
return this . seenList . add ( url ) ;
}
async serialize ( ) {
const queued = this . queue . map ( x => JSON . stringify ( x ) ) ;
const pending = Array . from ( this . pending . values ( ) ) ;
const done = this . done . map ( x => JSON . stringify ( x ) ) ;
return { queued , pending , done } ;
}
async load ( state , seeds , checkScope = false ) {
for ( const json of state . queued ) {
const data = JSON . parse ( json ) ;
if ( checkScope && ! this . recheckScope ( data , seeds ) ) {
continue ;
}
this . queue . push ( data ) ;
this . seenList . add ( data . url ) ;
}
for ( const json of state . pending ) {
const data = JSON . parse ( json ) ;
if ( checkScope && ! this . recheckScope ( data , seeds ) ) {
continue ;
}
this . queue . push ( data ) ;
this . seenList . add ( data . url ) ;
}
for ( const json of state . done ) {
const data = JSON . parse ( json ) ;
if ( data . failed ) {
this . queue . push ( data ) ;
} else {
this . done . push ( data ) ;
}
this . seenList . add ( data . url ) ;
}
return this . seenList . size ;
}
async numDone ( ) {
return this . done . length ;
}
async numRealSeen ( ) {
return this . seenList . size ;
}
async numPending ( ) {
return this . pending . size ;
}
}
// ============================================================================
class RedisCrawlState extends BaseState
{
constructor ( redis , key , pageTimeout ) {
super ( ) ;
this . redis = redis ;
this . key = key ;
this . pageTimeout = pageTimeout / 1000 ;
this . qkey = this . key + ":q" ;
this . pkey = this . key + ":p" ;
this . skey = this . key + ":s" ;
this . dkey = this . key + ":d" ;
redis . defineCommand ( "movestarted" , {
numberOfKeys : 2 ,
lua : "local val = redis.call('rpop', KEYS[1]); if (val) then local json = cjson.decode(val); json['started'] = ARGV[1]; val = cjson.encode(json); redis.call('sadd', KEYS[2], val); redis.call('expire', KEYS[2], ARGV[2]); end; return val"
} ) ;
redis . defineCommand ( "movefinished" , {
numberOfKeys : 2 ,
lua : "local val = ARGV[1]; if (redis.call('srem', KEYS[1], val)) then local json = cjson.decode(val); json[ARGV[3]] = ARGV[2]; val = cjson.encode(json); redis.call('lpush', KEYS[2], val); end; return val"
} ) ;
}
async push ( job ) {
await this . redis . lpush ( this . qkey , JSON . stringify ( job . data ) ) ;
}
async realSize ( ) {
return await this . redis . llen ( this . qkey ) ;
}
async shift ( ) {
const started = new Date ( ) . toISOString ( ) ;
// atomically move from queue list -> pending set while adding started timestamp
// set pending set expire to page timeout
const json = await this . redis . movestarted ( this . qkey , this . pkey , started , this . pageTimeout ) ;
const data = JSON . parse ( json ) ;
const callback = {
resolve : async ( ) => {
const finished = new Date ( ) . toISOString ( ) ;
// atomically move from pending set -> done list while adding finished timestamp
await this . redis . movefinished ( this . pkey , this . dkey , json , finished , "finished" ) ;
} ,
reject : async ( e ) => {
console . warn ( ` URL Load Failed: ${ data . url } , Reason: ${ e } ` ) ;
await this . redis . movefinished ( this . pkey , this . dkey , json , true , "failed" ) ;
}
} ;
return new Job ( data , undefined , callback ) ;
}
async has ( url ) {
return ! ! await this . redis . sismember ( this . skey , url ) ;
}
async add ( url ) {
return await this . redis . sadd ( this . skey , url ) ;
}
async serialize ( ) {
const queued = await this . redis . lrange ( this . qkey , 0 , - 1 ) ;
const pending = await this . redis . smembers ( this . pkey ) ;
const done = await this . redis . lrange ( this . dkey , 0 , - 1 ) ;
return { queued , pending , done } ;
}
async load ( state , seeds , checkScope ) {
const seen = [ ] ;
// need to delete existing keys, if exist to fully reset state
await this . redis . del ( this . qkey ) ;
await this . redis . del ( this . pkey ) ;
await this . redis . del ( this . dkey ) ;
await this . redis . del ( this . skey ) ;
for ( const json of state . queued ) {
const data = JSON . parse ( json ) ;
if ( checkScope ) {
if ( ! this . recheckScope ( data , seeds ) ) {
continue ;
}
}
await this . redis . rpush ( this . qkey , json ) ;
seen . push ( data . url ) ;
}
for ( const json of state . pending ) {
const data = JSON . parse ( json ) ;
if ( checkScope ) {
if ( ! this . recheckScope ( data , seeds ) ) {
continue ;
}
}
await this . redis . rpush ( this . qkey , json ) ;
seen . push ( data . url ) ;
}
for ( const json of state . done ) {
const data = JSON . parse ( json ) ;
if ( data . failed ) {
await this . redis . rpush ( this . qkey , json ) ;
} else {
await this . redis . rpush ( this . dkey , json ) ;
}
seen . push ( data . url ) ;
}
await this . redis . sadd ( this . skey , seen ) ;
return seen . length ;
}
async numDone ( ) {
return await this . redis . llen ( this . dkey ) ;
}
async numRealSeen ( ) {
return await this . redis . scard ( this . skey ) ;
}
async numPending ( ) {
return await this . redis . scard ( this . pkey ) ;
}
}
module . exports . RedisCrawlState = RedisCrawlState ;
module . exports . MemoryCrawlState = MemoryCrawlState ;