Collapse entity event batches in EventQueue

This commit is contained in:
ivk 2020-11-04 15:52:09 +01:00
parent f3c4cc3013
commit bf54876ac3
No known key found for this signature in database
GPG key ID: C103E7EF5463D318
39 changed files with 941 additions and 770 deletions

View file

@ -421,11 +421,11 @@ export class EntityRestCache implements EntityRestInterface {
*
* @return Promise, which resolves to the array of valid events (if response is NotFound or NotAuthorized we filter it out)
*/
entityEventsReceived(data: Array<EntityUpdate>): Promise<Array<EntityUpdate>> {
entityEventsReceived(batch: $ReadOnlyArray<EntityUpdate>): Promise<Array<EntityUpdate>> {
return Promise
.map(data, (update) => {
.map(batch, (update) => {
const {instanceListId, instanceId, operation, type, application} = update
if (application === "monitor") return
if (application === "monitor") return null
const typeRef = new TypeRef(application, type)
switch (operation) {
@ -433,7 +433,7 @@ export class EntityRestCache implements EntityRestInterface {
return this._processUpdateEvent(typeRef, update)
case OperationType.DELETE:
if (isSameTypeRef(MailTypeRef, typeRef) && containsEventOfType(data, OperationType.CREATE, instanceId)) {
if (isSameTypeRef(MailTypeRef, typeRef) && containsEventOfType(batch, OperationType.CREATE, instanceId)) {
// move for mail is handled in create event.
} else {
this._tryRemoveFromCache(typeRef, instanceListId, instanceId)
@ -441,36 +441,48 @@ export class EntityRestCache implements EntityRestInterface {
return update
case OperationType.CREATE:
return this._processCreateEvent(typeRef, update, data)
return this._processCreateEvent(typeRef, update, batch)
}
})
}, {concurrency: 1})
.filter(Boolean)
}
_processCreateEvent(typeRef: TypeRef<*>, update: EntityUpdate, batch: Array<EntityUpdate>): $Promisable<EntityUpdate | null> { // do not return undefined
_processCreateEvent(
typeRef: TypeRef<*>,
update: EntityUpdate,
batch: $ReadOnlyArray<EntityUpdate>,
): $Promisable<EntityUpdate | null> { // do not return undefined to avoid implicit returns
const {instanceListId, instanceId} = update
// We put new instances into cache only when it's a new instance in the cached range which is only for the list instances.
if (instanceListId) {
const path = typeRefToPath(typeRef)
const deleteEvent = getEventOfType(batch, OperationType.DELETE, instanceId)
if (deleteEvent && isSameTypeRef(MailTypeRef, typeRef) && this._isInCache(typeRef, deleteEvent.instanceListId, instanceId)) { // It is a move event
const path = typeRefToPath(typeRef)
if (deleteEvent && isSameTypeRef(MailTypeRef, typeRef) && this._isInCache(typeRef, deleteEvent.instanceListId, instanceId)) {
// It is a move event for cached mail
const element = this._getFromCache(typeRef, deleteEvent.instanceListId, instanceId)
this._tryRemoveFromCache(typeRef, deleteEvent.instanceListId, instanceId)
element._id = [instanceListId, instanceId]
this._putIntoCache(element)
return update
} else if (this._isInCacheRange(path, instanceListId, instanceId)) {
// No need to try to download something that's not there anymore
return this._entityRestClient.entityRequest(typeRef, HttpMethod.GET, instanceListId, instanceId)
.then(entity => this._putIntoCache(entity))
.return(update)
.catch(this._handleProcessingError)
} else {
return update
}
} else {
return update
}
return update
}
_processUpdateEvent(typeRef: TypeRef<*>, update: EntityUpdate): $Promisable<EntityUpdate | null> {
const {instanceListId, instanceId} = update
if (this._isInCache(typeRef, instanceListId, instanceId)) {
// No need to try to download something that's not there anymore
return this._entityRestClient.entityRequest(typeRef, HttpMethod.GET, instanceListId, instanceId)
.then(entity => this._putIntoCache(entity))
.return(update)
@ -479,9 +491,9 @@ export class EntityRestCache implements EntityRestInterface {
return update
}
_handleProcessingError(e: Error): ?EntityUpdate {
_handleProcessingError(e: Error): EntityUpdate | null {
// skip event if NotFoundError. May occur if an entity is removed in parallel.
// Skip event if. May occur if the user was removed from the owner group.
// Skip event if NotAuthorizedError. May occur if the user was removed from the owner group.
if (e instanceof NotFoundError || e instanceof NotAuthorizedError) {
return null
} else {
@ -568,4 +580,4 @@ export class EntityRestCache implements EntityRestInterface {
}
}
}
}
}