mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2025-12-07 14:09:47 +00:00
fix: garbage collect lingering actions logs (#10009)
If, for any reason (e.g. server crash), a task is recorded as done in the database but the logs are still in the database instead of being in storage, they need to be collected. The log_in_storage field is only set to true after the logs have been transfered to storage and can be relied upon to reflect which tasks have lingering logs. A cron job collects lingering logs every day, 3000 at a time, sleeping one second between them. In normal circumstances there will be only a few of them, even on a large instance, and there is no need to collect them as quickly as possible. When there are a lot of them for some reason, garbage collection must happen at a rate that is not too hard on storage I/O. Refs https://codeberg.org/forgejo/forgejo/issues/9999 --- Note on backports: the v11 backport is done manually because of minor conflicts. https://codeberg.org/forgejo/forgejo/pulls/10024 ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests - I added test coverage for Go changes... - [x] in their respective `*_test.go` for unit tests. - [x] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I added test coverage for JavaScript changes... - [ ] in `web_src/js/*.test.js` if it can be unit tested. - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)). ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [x] I did not document these changes and I do not expect someone else to do it. ### Release notes - [ ] I do not want this change to show in the release notes. - [x] I want the title to show in the release notes with a link to this pull request. - [ ] I want the content of the `release-notes/<pull request number>.md` to be be used for the release notes instead of the title. <!--start release-notes-assistant--> ## Release notes <!--URL:https://codeberg.org/forgejo/forgejo--> - Bug fixes - [PR](https://codeberg.org/forgejo/forgejo/pulls/10009): <!--number 10009 --><!--line 0 --><!--description Z2FyYmFnZSBjb2xsZWN0IGxpbmdlcmluZyBhY3Rpb25zIGxvZ3M=-->garbage collect lingering actions logs<!--description--> <!--end release-notes-assistant--> Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10009 Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org> Reviewed-by: Gusted <gusted@noreply.codeberg.org> Co-authored-by: Earl Warren <contact@earl-warren.org> Co-committed-by: Earl Warren <contact@earl-warren.org>
This commit is contained in:
parent
dea9ef6706
commit
238ecfdeb8
12 changed files with 355 additions and 24 deletions
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"forgejo.org/models/db"
|
||||
"forgejo.org/modules/container"
|
||||
"forgejo.org/modules/optional"
|
||||
"forgejo.org/modules/timeutil"
|
||||
|
||||
"xorm.io/builder"
|
||||
|
|
@ -54,6 +55,8 @@ type FindTaskOptions struct {
|
|||
UpdatedBefore timeutil.TimeStamp
|
||||
StartedBefore timeutil.TimeStamp
|
||||
RunnerID int64
|
||||
LogExpired optional.Option[bool]
|
||||
LogInStorage optional.Option[bool]
|
||||
}
|
||||
|
||||
func (opts FindTaskOptions) ToConds() builder.Cond {
|
||||
|
|
@ -79,6 +82,12 @@ func (opts FindTaskOptions) ToConds() builder.Cond {
|
|||
if opts.RunnerID > 0 {
|
||||
cond = cond.And(builder.Eq{"runner_id": opts.RunnerID})
|
||||
}
|
||||
if opts.LogExpired.Has() {
|
||||
cond = cond.And(builder.Eq{"log_expired": opts.LogExpired.Value()})
|
||||
}
|
||||
if opts.LogInStorage.Has() {
|
||||
cond = cond.And(builder.Eq{"log_in_storage": opts.LogInStorage.Value()})
|
||||
}
|
||||
return cond
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ type file struct {
|
|||
|
||||
var _ File = (*file)(nil)
|
||||
|
||||
func (f *file) readAt(fileMeta *dbfsMeta, offset int64, p []byte) (n int, err error) {
|
||||
func (f *file) readAt(fileMeta *DbfsMeta, offset int64, p []byte) (n int, err error) {
|
||||
if offset >= fileMeta.FileSize {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
|
@ -56,7 +56,7 @@ func (f *file) readAt(fileMeta *dbfsMeta, offset int64, p []byte) (n int, err er
|
|||
if needRead <= 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
var fileData dbfsData
|
||||
var fileData DbfsData
|
||||
ok, err := db.GetEngine(f.ctx).Where("meta_id = ? AND blob_offset = ?", f.metaID, blobOffset).Get(&fileData)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
|
@ -129,7 +129,7 @@ func (f *file) Write(p []byte) (n int, err error) {
|
|||
buf = buf[:readBytes]
|
||||
}
|
||||
|
||||
fileData := dbfsData{
|
||||
fileData := DbfsData{
|
||||
MetaID: fileMeta.ID,
|
||||
BlobOffset: blobOffset,
|
||||
BlobData: buf,
|
||||
|
|
@ -152,7 +152,7 @@ func (f *file) Write(p []byte) (n int, err error) {
|
|||
p = p[needWrite:]
|
||||
}
|
||||
|
||||
fileMetaUpdate := dbfsMeta{
|
||||
fileMetaUpdate := DbfsMeta{
|
||||
ModifyTimestamp: timeToFileTimestamp(time.Now()),
|
||||
}
|
||||
if needUpdateSize {
|
||||
|
|
@ -216,7 +216,7 @@ func fileTimestampToTime(timestamp int64) time.Time {
|
|||
}
|
||||
|
||||
func (f *file) loadMetaByPath() error {
|
||||
var fileMeta dbfsMeta
|
||||
var fileMeta DbfsMeta
|
||||
if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
|
|
@ -278,7 +278,7 @@ func (f *file) createEmpty() error {
|
|||
return os.ErrExist
|
||||
}
|
||||
now := time.Now()
|
||||
_, err := db.GetEngine(f.ctx).Insert(&dbfsMeta{
|
||||
_, err := db.GetEngine(f.ctx).Insert(&DbfsMeta{
|
||||
FullPath: f.fullPath,
|
||||
BlockSize: f.blockSize,
|
||||
CreateTimestamp: timeToFileTimestamp(now),
|
||||
|
|
@ -298,7 +298,7 @@ func (f *file) truncate() error {
|
|||
if _, err := db.GetEngine(ctx).Exec("UPDATE dbfs_meta SET file_size = 0 WHERE id = ?", f.metaID); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := db.GetEngine(ctx).Delete(&dbfsData{MetaID: f.metaID}); err != nil {
|
||||
if _, err := db.GetEngine(ctx).Delete(&DbfsData{MetaID: f.metaID}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -323,10 +323,10 @@ func (f *file) delete() error {
|
|||
return os.ErrNotExist
|
||||
}
|
||||
return db.WithTx(f.ctx, func(ctx context.Context) error {
|
||||
if _, err := db.GetEngine(ctx).Delete(&dbfsMeta{ID: f.metaID}); err != nil {
|
||||
if _, err := db.GetEngine(ctx).Delete(&DbfsMeta{ID: f.metaID}); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := db.GetEngine(ctx).Delete(&dbfsData{MetaID: f.metaID}); err != nil {
|
||||
if _, err := db.GetEngine(ctx).Delete(&DbfsData{MetaID: f.metaID}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -344,8 +344,8 @@ func (f *file) size() (int64, error) {
|
|||
return fileMeta.FileSize, nil
|
||||
}
|
||||
|
||||
func findFileMetaByID(ctx context.Context, metaID int64) (*dbfsMeta, error) {
|
||||
var fileMeta dbfsMeta
|
||||
func findFileMetaByID(ctx context.Context, metaID int64) (*DbfsMeta, error) {
|
||||
var fileMeta DbfsMeta
|
||||
if ok, err := db.GetEngine(ctx).Where("id = ?", metaID).Get(&fileMeta); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ The DBFS solution:
|
|||
The seeking and finding is not the fastest way, but it's still acceptable and won't affect the performance too much.
|
||||
*/
|
||||
|
||||
type dbfsMeta struct {
|
||||
type DbfsMeta struct { //revive:disable-line:exported
|
||||
ID int64 `xorm:"pk autoincr"`
|
||||
FullPath string `xorm:"VARCHAR(500) UNIQUE NOT NULL"`
|
||||
BlockSize int64 `xorm:"BIGINT NOT NULL"`
|
||||
|
|
@ -51,7 +51,7 @@ type dbfsMeta struct {
|
|||
ModifyTimestamp int64 `xorm:"BIGINT NOT NULL"`
|
||||
}
|
||||
|
||||
type dbfsData struct {
|
||||
type DbfsData struct { //revive:disable-line:exported
|
||||
ID int64 `xorm:"pk autoincr"`
|
||||
Revision int64 `xorm:"BIGINT NOT NULL"`
|
||||
MetaID int64 `xorm:"BIGINT index(meta_offset) NOT NULL"`
|
||||
|
|
@ -61,8 +61,8 @@ type dbfsData struct {
|
|||
}
|
||||
|
||||
func init() {
|
||||
db.RegisterModel(new(dbfsMeta))
|
||||
db.RegisterModel(new(dbfsData))
|
||||
db.RegisterModel(new(DbfsMeta))
|
||||
db.RegisterModel(new(DbfsData))
|
||||
}
|
||||
|
||||
func OpenFile(ctx context.Context, name string, flag int) (File, error) {
|
||||
|
|
@ -104,28 +104,28 @@ func Remove(ctx context.Context, name string) error {
|
|||
return f.delete()
|
||||
}
|
||||
|
||||
var _ fs.FileInfo = (*dbfsMeta)(nil)
|
||||
var _ fs.FileInfo = (*DbfsMeta)(nil)
|
||||
|
||||
func (m *dbfsMeta) Name() string {
|
||||
func (m *DbfsMeta) Name() string {
|
||||
return path.Base(m.FullPath)
|
||||
}
|
||||
|
||||
func (m *dbfsMeta) Size() int64 {
|
||||
func (m *DbfsMeta) Size() int64 {
|
||||
return m.FileSize
|
||||
}
|
||||
|
||||
func (m *dbfsMeta) Mode() fs.FileMode {
|
||||
func (m *DbfsMeta) Mode() fs.FileMode {
|
||||
return os.ModePerm
|
||||
}
|
||||
|
||||
func (m *dbfsMeta) ModTime() time.Time {
|
||||
func (m *DbfsMeta) ModTime() time.Time {
|
||||
return fileTimestampToTime(m.ModifyTimestamp)
|
||||
}
|
||||
|
||||
func (m *dbfsMeta) IsDir() bool {
|
||||
func (m *DbfsMeta) IsDir() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *dbfsMeta) Sys() any {
|
||||
func (m *DbfsMeta) Sys() any {
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package actions
|
|||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
|
@ -29,6 +30,19 @@ const (
|
|||
defaultBufSize = MaxLineSize
|
||||
)
|
||||
|
||||
func ExistsLogs(ctx context.Context, filename string) (bool, error) {
|
||||
name := DBFSPrefix + filename
|
||||
f, err := dbfs.Open(ctx, name)
|
||||
if err == nil {
|
||||
f.Close()
|
||||
return true, nil
|
||||
}
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
// WriteLogs appends logs to DBFS file for temporary storage.
|
||||
// It doesn't respect the file format in the filename like ".zst", since it's difficult to reopen a closed compressed file and append new content.
|
||||
// Why doesn't it store logs in object storage directly? Because it's not efficient to append content to object storage.
|
||||
|
|
@ -164,6 +178,9 @@ func RemoveLogs(ctx context.Context, inStorage bool, filename string) error {
|
|||
name := DBFSPrefix + filename
|
||||
err := dbfs.Remove(ctx, name)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("dbfs remove %q: %w", name, err)
|
||||
}
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -152,6 +152,7 @@
|
|||
"admin.dashboard.cleanup_offline_runners": "Cleanup offline runners",
|
||||
"admin.dashboard.remove_resolved_reports": "Remove resolved reports",
|
||||
"admin.dashboard.actions_action_user": "Revoke Forgejo Actions trust for inactive users",
|
||||
"admin.dashboard.transfer_lingering_logs": "Transfer actions logs of finished actions jobs from the database to storage",
|
||||
"admin.config.security": "Security configuration",
|
||||
"admin.config.global_2fa_requirement.title": "Global two-factor requirement",
|
||||
"admin.config.global_2fa_requirement.none": "No",
|
||||
|
|
|
|||
|
|
@ -0,0 +1,80 @@
|
|||
# all entries will be used with now() == 2024-12-01
|
||||
-
|
||||
id: 1000 # lingering log
|
||||
attempt: 3
|
||||
runner_id: 1
|
||||
status: 3 # cancelled
|
||||
repo_id: 4
|
||||
owner_id: 1
|
||||
commit_sha: "1000"
|
||||
is_fork_pull_request: 0
|
||||
token_hash: "1000"
|
||||
log_filename: path1
|
||||
log_in_storage: false
|
||||
log_expired: 0
|
||||
created: 1732575600 # date +%s --date 2024-11-26
|
||||
updated: 1732575605 # a few seconds later
|
||||
|
||||
-
|
||||
id: 2000 # lingering log too new to be garbage collected
|
||||
attempt: 3
|
||||
runner_id: 1
|
||||
status: 3 # cancelled
|
||||
repo_id: 4
|
||||
owner_id: 1
|
||||
commit_sha: "2000"
|
||||
is_fork_pull_request: 0
|
||||
token_hash: "2000"
|
||||
log_filename: path2
|
||||
log_in_storage: false
|
||||
log_expired: 0
|
||||
created: 1732921200 # date +%s --date 2024-11-30
|
||||
updated: 1732921205 # a few seconds later
|
||||
|
||||
-
|
||||
id: 3000 # log already in storage
|
||||
attempt: 3
|
||||
runner_id: 1
|
||||
status: 3 # cancelled
|
||||
repo_id: 4
|
||||
owner_id: 1
|
||||
commit_sha: "3000"
|
||||
is_fork_pull_request: 0
|
||||
token_hash: "3000"
|
||||
log_filename: path3
|
||||
log_in_storage: true
|
||||
log_expired: 0
|
||||
created: 1732575600 # date +%s --date 2024-11-26
|
||||
updated: 1732575605 # a few seconds later
|
||||
|
||||
-
|
||||
id: 4000 # lingering log
|
||||
attempt: 3
|
||||
runner_id: 1
|
||||
status: 3 # cancelled
|
||||
repo_id: 4
|
||||
owner_id: 1
|
||||
commit_sha: "4000"
|
||||
is_fork_pull_request: 0
|
||||
token_hash: "4000"
|
||||
log_filename: path4
|
||||
log_in_storage: false
|
||||
log_expired: 0
|
||||
created: 1732575600 # date +%s --date 2024-11-26
|
||||
updated: 1732575605 # a few seconds later
|
||||
|
||||
-
|
||||
id: 5000 # lingering log
|
||||
attempt: 3
|
||||
runner_id: 1
|
||||
status: 3 # cancelled
|
||||
repo_id: 4
|
||||
owner_id: 1
|
||||
commit_sha: "5000"
|
||||
is_fork_pull_request: 0
|
||||
token_hash: "5000"
|
||||
log_filename: path5
|
||||
log_in_storage: false
|
||||
log_expired: 0
|
||||
created: 1732575600 # date +%s --date 2024-11-26
|
||||
updated: 1732575605 # a few seconds later
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
-
|
||||
id: 1
|
||||
revision: 1
|
||||
meta_id: 1
|
||||
blob_offset: 5
|
||||
blob_size: 5
|
||||
blob_data: "12345"
|
||||
-
|
||||
id: 4
|
||||
revision: 1
|
||||
meta_id: 4
|
||||
blob_offset: 5
|
||||
blob_size: 5
|
||||
blob_data: "12345"
|
||||
-
|
||||
id: 5
|
||||
revision: 1
|
||||
meta_id: 5
|
||||
blob_offset: 5
|
||||
blob_size: 5
|
||||
blob_data: "12345"
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
# all entries will be used with now() == 2024-12-01
|
||||
-
|
||||
id: 1
|
||||
full_path: 1:actions_log/path1
|
||||
block_size: 4096
|
||||
file_size: 5
|
||||
create_timestamp: 1732057200 # 2024-11-20
|
||||
modify_timestamp: 1732057205 # a few seconds after create_timestamp
|
||||
-
|
||||
id: 4
|
||||
full_path: 1:actions_log/path4
|
||||
block_size: 4096
|
||||
file_size: 5
|
||||
create_timestamp: 1732057200 # 2024-11-20
|
||||
modify_timestamp: 1732057205 # a few seconds after create_timestamp
|
||||
-
|
||||
id: 5
|
||||
full_path: 1:actions_log/path5
|
||||
block_size: 4096
|
||||
file_size: 5
|
||||
create_timestamp: 1732057200 # 2024-11-20
|
||||
modify_timestamp: 1732057205 # a few seconds after create_timestamp
|
||||
96
services/actions/log.go
Normal file
96
services/actions/log.go
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
// Copyright 2025 The Forgejo Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
actions_model "forgejo.org/models/actions"
|
||||
"forgejo.org/models/db"
|
||||
"forgejo.org/modules/actions"
|
||||
"forgejo.org/modules/log"
|
||||
"forgejo.org/modules/optional"
|
||||
"forgejo.org/modules/timeutil"
|
||||
)
|
||||
|
||||
var (
|
||||
transferLingeringLogsMax = 3000
|
||||
transferLingeringLogsSleep = 1 * time.Second
|
||||
transferLingeringLogsOld = 24 * time.Hour
|
||||
)
|
||||
|
||||
func TransferLingeringLogs(ctx context.Context) error {
|
||||
return transferLingeringLogs(ctx, transferLingeringLogsOpts(time.Now()))
|
||||
}
|
||||
|
||||
func transferLingeringLogsOpts(now time.Time) actions_model.FindTaskOptions {
|
||||
// performance considerations: the search is linear because
|
||||
// LogInStorage has no index. But it is bounded by
|
||||
// LogExpired which is always true for older records and has an index.
|
||||
return actions_model.FindTaskOptions{
|
||||
Status: actions_model.DoneStatuses(),
|
||||
LogInStorage: optional.Some(false),
|
||||
LogExpired: optional.Some(false),
|
||||
// do it after a long delay to avoid any possibility of race with an ongoing operation
|
||||
// as it is not protected by a transaction
|
||||
UpdatedBefore: timeutil.TimeStamp(now.Add(-transferLingeringLogsOld).Unix()),
|
||||
}
|
||||
}
|
||||
|
||||
func transferLingeringLogs(ctx context.Context, opts actions_model.FindTaskOptions) error {
|
||||
count := 0
|
||||
err := db.Iterate(ctx, opts.ToConds(), func(ctx context.Context, task *actions_model.ActionTask) error {
|
||||
if err := TransferLogsAndUpdateLogInStorage(ctx, task); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("processed task %d", task.ID)
|
||||
count++
|
||||
if count < transferLingeringLogsMax {
|
||||
log.Debug("sleeping %v to not stress the storage", transferLingeringLogsSleep)
|
||||
time.Sleep(transferLingeringLogsSleep)
|
||||
}
|
||||
if count >= transferLingeringLogsMax {
|
||||
return fmt.Errorf("stopped after processing %v tasks and will resume later", transferLingeringLogsMax)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if count >= transferLingeringLogsMax {
|
||||
log.Info("%v", err)
|
||||
return nil
|
||||
}
|
||||
if count > 0 {
|
||||
log.Info("processed %d tasks", count)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func TransferLogsAndUpdateLogInStorage(ctx context.Context, task *actions_model.ActionTask) error {
|
||||
if task.LogInStorage {
|
||||
return nil
|
||||
}
|
||||
remove, err := TransferLogs(ctx, task.LogFilename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
task.LogInStorage = true
|
||||
if err := actions_model.UpdateTask(ctx, task, "log_in_storage"); err != nil {
|
||||
return err
|
||||
}
|
||||
remove()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TransferLogs(ctx context.Context, logFilename string) (func(), error) {
|
||||
exists, err := actions.ExistsLogs(ctx, logFilename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return func() {}, nil
|
||||
}
|
||||
return actions.TransferLogs(ctx, logFilename)
|
||||
}
|
||||
74
services/actions/log_test.go
Normal file
74
services/actions/log_test.go
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
// Copyright 2025 The Forgejo Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
actions_model "forgejo.org/models/actions"
|
||||
dbfs_model "forgejo.org/models/dbfs"
|
||||
"forgejo.org/models/unittest"
|
||||
"forgejo.org/modules/test"
|
||||
"forgejo.org/modules/timeutil"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"xorm.io/builder"
|
||||
)
|
||||
|
||||
func TestServicesActions_transferLingeringLogs(t *testing.T) {
|
||||
// it would be easier to dynamically create fixtures instead of injecting them
|
||||
// in the database for testing, but the dbfs API does not have what is needed to
|
||||
// create them
|
||||
defer unittest.OverrideFixtures("services/actions/TestServicesActions_TransferLingeringLogs")()
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
defer test.MockVariableValue(&transferLingeringLogsMax, 2)()
|
||||
defer test.MockVariableValue(&transferLingeringLogsOld, 2*24*time.Hour)()
|
||||
defer test.MockVariableValue(&transferLingeringLogsSleep, time.Millisecond)()
|
||||
|
||||
now, err := time.Parse("2006-01-02", "2024-12-01")
|
||||
require.NoError(t, err)
|
||||
old := timeutil.TimeStamp(now.Add(-transferLingeringLogsOld).Unix())
|
||||
|
||||
// a task has a lingering log but was updated more recently than
|
||||
// transferLingeringLogsOld
|
||||
recentID := int64(2000)
|
||||
recent := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: recentID}, builder.Eq{"log_in_storage": false})
|
||||
require.Greater(t, recent.Updated, old)
|
||||
|
||||
// a task has logs already in storage but would be garbage collected if it was not
|
||||
inStorageID := int64(3000)
|
||||
inStorage := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: inStorageID}, builder.Eq{"log_in_storage": true})
|
||||
require.Greater(t, old, inStorage.Updated)
|
||||
|
||||
taskWithLingeringLogIDs := []int64{1000, 4000, 5000}
|
||||
for _, taskWithLingeringLogID := range taskWithLingeringLogIDs {
|
||||
lingeringLog := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: taskWithLingeringLogID}, builder.Eq{"log_in_storage": false})
|
||||
require.Greater(t, old, lingeringLog.Updated)
|
||||
}
|
||||
lingeringLogIDs := []int64{1, 4, 5}
|
||||
|
||||
assert.True(t, unittest.BeanExists(t, &dbfs_model.DbfsMeta{}, builder.In("id", []any{lingeringLogIDs}...)))
|
||||
|
||||
// first pass transfer logs for transferLingeringLogsMax tasks
|
||||
require.NoError(t, transferLingeringLogs(t.Context(), transferLingeringLogsOpts(now)))
|
||||
assert.True(t, unittest.BeanExists(t, &dbfs_model.DbfsMeta{}, builder.In("id", []any{lingeringLogIDs[transferLingeringLogsMax:]}...)))
|
||||
for _, lingeringLogID := range lingeringLogIDs[:transferLingeringLogsMax] {
|
||||
unittest.AssertNotExistsBean(t, &dbfs_model.DbfsMeta{ID: lingeringLogID})
|
||||
}
|
||||
|
||||
// second pass transfer logs for the remainder tasks and there are none left
|
||||
require.NoError(t, transferLingeringLogs(t.Context(), transferLingeringLogsOpts(now)))
|
||||
for _, lingeringLogID := range lingeringLogIDs {
|
||||
unittest.AssertNotExistsBean(t, &dbfs_model.DbfsMeta{ID: lingeringLogID})
|
||||
}
|
||||
|
||||
// third pass is happilly doing nothing
|
||||
require.NoError(t, transferLingeringLogs(t.Context(), transferLingeringLogsOpts(now)))
|
||||
|
||||
// verify the tasks that are not to be garbage collected are still present
|
||||
assert.True(t, unittest.BeanExists(t, &actions_model.ActionTask{ID: recentID}, builder.Eq{"log_in_storage": false}))
|
||||
assert.True(t, unittest.BeanExists(t, &actions_model.ActionTask{ID: inStorageID}, builder.Eq{"log_in_storage": true}))
|
||||
}
|
||||
|
|
@ -19,6 +19,7 @@ func initActionsTasks() {
|
|||
registerStopZombieTasks()
|
||||
registerStopEndlessTasks()
|
||||
registerCancelAbandonedJobs()
|
||||
registerTransferLingeringLogs()
|
||||
registerScheduleTasks()
|
||||
registerActionsCleanup()
|
||||
registerOfflineRunnersCleanup()
|
||||
|
|
@ -55,6 +56,16 @@ func registerCancelAbandonedJobs() {
|
|||
})
|
||||
}
|
||||
|
||||
func registerTransferLingeringLogs() {
|
||||
RegisterTaskFatal("transfer_lingering_logs", &BaseConfig{
|
||||
Enabled: true,
|
||||
RunAtStart: true,
|
||||
Schedule: "@midnight",
|
||||
}, func(ctx context.Context, _ *user_model.User, cfg Config) error {
|
||||
return actions_service.TransferLingeringLogs(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
// registerScheduleTasks registers a scheduled task that runs every minute to start any due schedule tasks.
|
||||
func registerScheduleTasks() {
|
||||
// Register the task with a unique name, enabled status, and schedule for every minute.
|
||||
|
|
|
|||
|
|
@ -357,11 +357,11 @@ func TestAPICron(t *testing.T) {
|
|||
AddTokenAuth(token)
|
||||
resp := MakeRequest(t, req, http.StatusOK)
|
||||
|
||||
assert.Equal(t, "30", resp.Header().Get("X-Total-Count"))
|
||||
assert.Equal(t, "31", resp.Header().Get("X-Total-Count"))
|
||||
|
||||
var crons []api.Cron
|
||||
DecodeJSON(t, resp, &crons)
|
||||
assert.Len(t, crons, 30)
|
||||
assert.Len(t, crons, 31)
|
||||
})
|
||||
|
||||
t.Run("Execute", func(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue