diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index fa81755d2..d6a5efe57 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -203,7 +203,7 @@ func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo res startTime := time.Now() // call WithBlobUploader() once and then loop over all selectedSnapshots - err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { for len(selectedSnapshots) > 0 && (batchSize < targetSize || time.Since(startTime) < minDuration) { sn := selectedSnapshots[0] selectedSnapshots = selectedSnapshots[1:] @@ -242,7 +242,7 @@ func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo res } func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository, - visitedTrees restic.AssociatedBlobSet, rootTreeID restic.ID, printer progress.Printer, uploader restic.BlobSaver) (uint64, error) { + visitedTrees restic.AssociatedBlobSet, rootTreeID restic.ID, printer progress.Printer, uploader restic.BlobSaverWithAsync) (uint64, error) { wg, wgCtx := errgroup.WithContext(ctx) diff --git a/cmd/restic/cmd_debug.go b/cmd/restic/cmd_debug.go index 48e6d58b7..2dd9d5cf0 100644 --- a/cmd/restic/cmd_debug.go +++ b/cmd/restic/cmd_debug.go @@ -353,7 +353,7 @@ func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Reposi return err } - err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { for _, blob := range list { printer.S(" loading blob %v at %v (length %v)", blob.ID, blob.Offset, blob.Length) if int(blob.Offset+blob.Length) > len(pack) { diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index ca22ee2de..fec4c44b5 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -153,7 +153,7 @@ func runRecover(ctx context.Context, gopts global.Options, term ui.Terminal) err } var treeID restic.ID - err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error treeID, err = data.SaveTree(ctx, uploader, tree) if err != nil { diff --git a/cmd/restic/cmd_rewrite.go b/cmd/restic/cmd_rewrite.go index 76a504652..9c53dcae6 100644 --- a/cmd/restic/cmd_rewrite.go +++ b/cmd/restic/cmd_rewrite.go @@ -194,7 +194,7 @@ func filterAndReplaceSnapshot(ctx context.Context, repo restic.Repository, sn *d var filteredTree restic.ID var summary *data.SnapshotSummary - err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error filteredTree, summary, err = filter(ctx, sn, uploader) return err diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index d619ad9b4..9c6462887 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -874,7 +874,7 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps var rootTreeID restic.ID - err = arch.Repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + err = arch.Repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, wgCtx := errgroup.WithContext(ctx) start := time.Now() diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index adc7695cb..cefe87285 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -56,7 +56,7 @@ func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS return err } - err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch.runWorkers(ctx, wg, uploader) @@ -219,7 +219,7 @@ func TestArchiverSave(t *testing.T) { arch.summary = &Summary{} var fnr futureNodeResult - err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch.runWorkers(ctx, wg, uploader) @@ -296,7 +296,7 @@ func TestArchiverSaveReaderFS(t *testing.T) { arch.summary = &Summary{} var fnr futureNodeResult - err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch.runWorkers(ctx, wg, uploader) @@ -415,29 +415,39 @@ type blobCountingRepo struct { saved map[restic.BlobHandle]uint } -func (repo *blobCountingRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaver) error) error { - return repo.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { +func (repo *blobCountingRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaverWithAsync) error) error { + return repo.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { return fn(ctx, &blobCountingSaver{saver: uploader, blobCountingRepo: repo}) }) } type blobCountingSaver struct { - saver restic.BlobSaver + saver restic.BlobSaverWithAsync blobCountingRepo *blobCountingRepo } +func (repo *blobCountingSaver) count(exists bool, h restic.BlobHandle) { + if exists { + return + } + repo.blobCountingRepo.m.Lock() + repo.blobCountingRepo.saved[h]++ + repo.blobCountingRepo.m.Unlock() +} + func (repo *blobCountingSaver) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) { id, exists, size, err := repo.saver.SaveBlob(ctx, t, buf, id, storeDuplicate) - if exists { - return id, exists, size, err - } - h := restic.BlobHandle{ID: id, Type: t} - repo.blobCountingRepo.m.Lock() - repo.blobCountingRepo.saved[h]++ - repo.blobCountingRepo.m.Unlock() + repo.count(exists, restic.BlobHandle{ID: id, Type: t}) return id, exists, size, err } +func (repo *blobCountingSaver) SaveBlobAsync(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, size int, err error)) { + repo.saver.SaveBlobAsync(ctx, t, buf, id, storeDuplicate, func(newID restic.ID, known bool, size int, err error) { + repo.count(known, restic.BlobHandle{ID: newID, Type: t}) + cb(newID, known, size, err) + }) +} + func appendToFile(t testing.TB, filename string, data []byte) { f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { @@ -840,7 +850,7 @@ func TestArchiverSaveDir(t *testing.T) { defer back() var treeID restic.ID - err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch.runWorkers(ctx, wg, uploader) meta, err := testFS.OpenFile(test.target, fs.O_NOFOLLOW, true) @@ -906,7 +916,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) { arch.summary = &Summary{} var fnr futureNodeResult - err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch.runWorkers(ctx, wg, uploader) meta, err := testFS.OpenFile(tempdir, fs.O_NOFOLLOW, true) @@ -1096,7 +1106,7 @@ func TestArchiverSaveTree(t *testing.T) { } var treeID restic.ID - err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch.runWorkers(ctx, wg, uploader) @@ -2095,8 +2105,8 @@ type failSaveRepo struct { err error } -func (f *failSaveRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaver) error) error { - return f.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { +func (f *failSaveRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaverWithAsync) error) error { + return f.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { return fn(ctx, &failSaveSaver{saver: uploader, failSaveRepo: f}) }) } @@ -2115,6 +2125,11 @@ func (f *failSaveSaver) SaveBlob(ctx context.Context, t restic.BlobType, buf []b return f.saver.SaveBlob(ctx, t, buf, id, storeDuplicate) } +func (f *failSaveSaver) SaveBlobAsync(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, sizeInRepo int, err error)) { + newID, known, sizeInRepo, err := f.SaveBlob(ctx, tpe, buf, id, storeDuplicate) + cb(newID, known, sizeInRepo, err) +} + func TestArchiverAbortEarlyOnError(t *testing.T) { var testErr = errors.New("test error") @@ -2425,7 +2440,7 @@ func TestRacyFileTypeSwap(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _ = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + _ = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch := New(repo, fs.Track{FS: statfs}, Options{}) diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index 960942d80..8c78f4395 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -527,7 +527,7 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { } var id restic.ID - test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error id, err = data.SaveTree(ctx, uploader, damagedTree) return err @@ -536,7 +536,7 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { buf, err := repo.LoadBlob(ctx, restic.TreeBlob, id, nil) test.OK(t, err) - test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error _, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, buf, id, false) return err @@ -561,7 +561,7 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { } var rootID restic.ID - test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error rootID, err = data.SaveTree(ctx, uploader, rootTree) return err diff --git a/internal/data/testing.go b/internal/data/testing.go index be4ab4edb..8187833a6 100644 --- a/internal/data/testing.go +++ b/internal/data/testing.go @@ -136,7 +136,7 @@ func TestCreateSnapshot(t testing.TB, repo restic.Repository, at time.Time, dept } var treeID restic.ID - test.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + test.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { treeID = fs.saveTree(ctx, uploader, seed, depth) return nil })) diff --git a/internal/data/tree_test.go b/internal/data/tree_test.go index 9164f4da1..054cf7c0a 100644 --- a/internal/data/tree_test.go +++ b/internal/data/tree_test.go @@ -107,7 +107,7 @@ func TestEmptyLoadTree(t *testing.T) { tree := data.NewTree(0) var id restic.ID - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error // save tree id, err = data.SaveTree(ctx, uploader, tree) diff --git a/internal/repository/fuzz_test.go b/internal/repository/fuzz_test.go index 16155f3a4..62dbd167e 100644 --- a/internal/repository/fuzz_test.go +++ b/internal/repository/fuzz_test.go @@ -20,7 +20,7 @@ func FuzzSaveLoadBlob(f *testing.F) { id := restic.Hash(blob) repo, _, _ := TestRepositoryWithVersion(t, 2) - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { _, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, blob, id, false) return err })) diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 772765129..843837617 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -563,7 +563,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er if len(plan.repackPacks) != 0 { printer.P("repacking packs\n") bar := printer.NewCounter("packs repacked") - err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { return CopyBlobs(ctx, repo, repo, uploader, plan.repackPacks, plan.keepBlobs, bar, printer.P) }) if err != nil { diff --git a/internal/repository/prune_internal_test.go b/internal/repository/prune_internal_test.go index 49a876884..640ab061b 100644 --- a/internal/repository/prune_internal_test.go +++ b/internal/repository/prune_internal_test.go @@ -47,7 +47,7 @@ func TestPruneMaxUnusedDuplicate(t *testing.T) { {bufs[1], bufs[3]}, {bufs[2], bufs[3]}, } { - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { for _, blob := range blobs { id, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, blob, restic.ID{}, true) keep.Insert(restic.BlobHandle{Type: restic.DataBlob, ID: id}) diff --git a/internal/repository/prune_test.go b/internal/repository/prune_test.go index 744de0b14..a363acd41 100644 --- a/internal/repository/prune_test.go +++ b/internal/repository/prune_test.go @@ -25,7 +25,7 @@ func testPrune(t *testing.T, opts repository.PruneOptions, errOnUnused bool) { createRandomBlobs(t, random, repo, 5, 0.5, true) keep, _ := selectBlobs(t, random, repo, 0.5) - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { // duplicate a few blobs to exercise those code paths for blob := range keep { buf, err := repo.LoadBlob(ctx, blob.Type, blob.ID, nil) @@ -133,7 +133,7 @@ func TestPruneSmall(t *testing.T) { const numBlobsCreated = 55 keep := restic.NewBlobSet() - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { // we need a minum of 11 packfiles, each packfile will be about 5 Mb long for i := 0; i < numBlobsCreated; i++ { buf := make([]byte, blobSize) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index ca0a8a48b..c2eaa8f41 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -32,7 +32,7 @@ func CopyBlobs( ctx context.Context, repo restic.Repository, dstRepo restic.Repository, - dstUploader restic.BlobSaver, + dstUploader restic.BlobSaverWithAsync, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter, @@ -57,7 +57,7 @@ func repack( ctx context.Context, repo restic.Repository, dstRepo restic.Repository, - uploader restic.BlobSaver, + uploader restic.BlobSaverWithAsync, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter, diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index bedacaa7e..0c1095301 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -20,7 +20,7 @@ func randomSize(random *rand.Rand, min, max int) int { func createRandomBlobs(t testing.TB, random *rand.Rand, repo restic.Repository, blobs int, pData float32, smallBlobs bool) { // two loops to allow creating multiple pack files for blobs > 0 { - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { for blobs > 0 { blobs-- var ( @@ -70,7 +70,7 @@ func createRandomWrongBlob(t testing.TB, random *rand.Rand, repo restic.Reposito // invert first data byte buf[0] ^= 0xff - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { _, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, buf, id, false) return err })) @@ -150,7 +150,7 @@ func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSe } func repack(t *testing.T, repo restic.Repository, be backend.Backend, packs restic.IDSet, blobs restic.BlobSet) { - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { return repository.CopyBlobs(ctx, repo, repo, uploader, packs, blobs, nil, nil) })) @@ -265,7 +265,7 @@ func testRepackCopy(t *testing.T, version uint) { _, keepBlobs := selectBlobs(t, random, repo, 0.2) copyPacks := findPacksForBlobs(t, repo, keepBlobs) - rtest.OK(t, repoWrapped.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repoWrapped.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { return repository.CopyBlobs(ctx, repoWrapped, dstRepoWrapped, uploader, copyPacks, keepBlobs, nil, nil) })) rebuildAndReloadIndex(t, dstRepo) @@ -303,7 +303,7 @@ func testRepackWrongBlob(t *testing.T, version uint) { _, keepBlobs := selectBlobs(t, random, repo, 0) rewritePacks := findPacksForBlobs(t, repo, keepBlobs) - err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { return repository.CopyBlobs(ctx, repo, repo, uploader, rewritePacks, keepBlobs, nil, nil) }) if err == nil { @@ -336,7 +336,7 @@ func testRepackBlobFallback(t *testing.T, version uint) { modbuf[0] ^= 0xff // create pack with broken copy - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { _, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, modbuf, id, false) return err })) @@ -346,13 +346,13 @@ func testRepackBlobFallback(t *testing.T, version uint) { rewritePacks := findPacksForBlobs(t, repo, keepBlobs) // create pack with valid copy - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { _, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, buf, id, true) return err })) // repack must fallback to valid copy - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { return repository.CopyBlobs(ctx, repo, repo, uploader, rewritePacks, keepBlobs, nil, nil) })) diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index a6f4a52b8..0c9d3a43f 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -15,7 +15,7 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe bar.SetMax(uint64(len(ids))) defer bar.Done() - err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { // examine all data the indexes have for the pack file for b := range repo.ListPacksFromIndex(ctx, ids) { blobs := b.Blobs diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 9abdaeec2..1d5f5a505 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -42,6 +42,7 @@ type Repository struct { opts Options packerWg *errgroup.Group + blobWg *errgroup.Group uploader *packerUploader treePM *packerManager dataPM *packerManager @@ -559,11 +560,14 @@ func (r *Repository) removeUnpacked(ctx context.Context, t restic.FileType, id r return r.be.Remove(ctx, backend.Handle{Type: t, Name: id.String()}) } -func (r *Repository) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaver) error) error { +func (r *Repository) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaverWithAsync) error) error { wg, ctx := errgroup.WithContext(ctx) r.startPackUploader(ctx, wg) + saverCtx := r.startBlobSaver(ctx, wg) wg.Go(func() error { - if err := fn(ctx, &blobSaverRepo{repo: r}); err != nil { + // must use saverCtx to ensure that the ctx used for saveBlob calls is bound to it + // otherwise the blob saver could deadlock in case of an error. + if err := fn(saverCtx, &blobSaverRepo{repo: r}); err != nil { return err } if err := r.flush(ctx); err != nil { @@ -574,14 +578,6 @@ func (r *Repository) WithBlobUploader(ctx context.Context, fn func(ctx context.C return wg.Wait() } -type blobSaverRepo struct { - repo *Repository -} - -func (r *blobSaverRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, size int, err error) { - return r.repo.saveBlob(ctx, t, buf, id, storeDuplicate) -} - func (r *Repository) startPackUploader(ctx context.Context, wg *errgroup.Group) { if r.packerWg != nil { panic("uploader already started") @@ -598,8 +594,40 @@ func (r *Repository) startPackUploader(ctx context.Context, wg *errgroup.Group) }) } +func (r *Repository) startBlobSaver(ctx context.Context, wg *errgroup.Group) context.Context { + // blob upload computations are CPU bound + blobWg, blobCtx := errgroup.WithContext(ctx) + blobWg.SetLimit(runtime.GOMAXPROCS(0)) + r.blobWg = blobWg + + wg.Go(func() error { + // As the goroutines are only spawned on demand, wait until the context is canceled. + // This will either happen on an error while saving a blob or when blobWg.Wait() is called + // by flushBlobUploader(). + <-blobCtx.Done() + return blobWg.Wait() + }) + return blobCtx +} + +type blobSaverRepo struct { + repo *Repository +} + +func (r *blobSaverRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, size int, err error) { + return r.repo.saveBlob(ctx, t, buf, id, storeDuplicate) +} + +func (r *blobSaverRepo) SaveBlobAsync(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, size int, err error)) { + r.repo.saveBlobAsync(ctx, t, buf, id, storeDuplicate, cb) +} + // Flush saves all remaining packs and the index func (r *Repository) flush(ctx context.Context) error { + if err := r.flushBlobUploader(); err != nil { + return err + } + if err := r.flushPacks(ctx); err != nil { return err } @@ -607,6 +635,15 @@ func (r *Repository) flush(ctx context.Context) error { return r.idx.Flush(ctx, &internalRepository{r}) } +func (r *Repository) flushBlobUploader() error { + if r.blobWg == nil { + return nil + } + err := r.blobWg.Wait() + r.blobWg = nil + return err +} + // FlushPacks saves all remaining packs. func (r *Repository) flushPacks(ctx context.Context) error { if r.packerWg == nil { @@ -994,6 +1031,19 @@ func (r *Repository) saveBlob(ctx context.Context, t restic.BlobType, buf []byte return newID, known, size, err } +func (r *Repository) saveBlobAsync(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, size int, err error)) { + r.blobWg.Go(func() error { + if ctx.Err() != nil { + // fail fast if the context is cancelled + cb(restic.ID{}, false, 0, context.Cause(ctx)) + return context.Cause(ctx) + } + newID, known, size, err := r.saveBlob(ctx, t, buf, id, storeDuplicate) + cb(newID, known, size, err) + return err + }) +} + type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 2a181312c..ecd42c3db 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -51,7 +51,7 @@ func testSave(t *testing.T, version uint, calculateID bool) { id := restic.Hash(data) - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { // save inputID := restic.ID{} if !calculateID { @@ -97,7 +97,7 @@ func testSavePackMerging(t *testing.T, targetPercentage int, expectedPacks int) }) var ids restic.IDs - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { // add blobs with size targetPercentage / 100 * repo.PackSize to the repository blobSize := repository.MinPackSize / 100 for range targetPercentage { @@ -147,7 +147,7 @@ func benchmarkSaveAndEncrypt(t *testing.B, version uint) { t.ResetTimer() t.SetBytes(int64(size)) - _ = repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + _ = repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { for i := 0; i < t.N; i++ { _, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, data, id, true) rtest.OK(t, err) @@ -168,7 +168,7 @@ func testLoadBlob(t *testing.T, version uint) { rtest.OK(t, err) var id restic.ID - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error id, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, buf, restic.ID{}, false) return err @@ -196,7 +196,7 @@ func TestLoadBlobBroken(t *testing.T) { buf := rtest.Random(42, 1000) var id restic.ID - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error id, _, _, err = uploader.SaveBlob(ctx, restic.TreeBlob, buf, restic.ID{}, false) return err @@ -225,7 +225,7 @@ func benchmarkLoadBlob(b *testing.B, version uint) { rtest.OK(b, err) var id restic.ID - rtest.OK(b, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(b, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error id, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, buf, restic.ID{}, false) return err @@ -361,7 +361,7 @@ func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) { // saveRandomDataBlobs generates random data blobs and saves them to the repository. func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) { - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { for i := 0; i < num; i++ { size := rand.Int() % sizeMax @@ -432,7 +432,7 @@ func TestListPack(t *testing.T) { buf := rtest.Random(42, 1000) var id restic.ID - rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { var err error id, _, _, err = uploader.SaveBlob(ctx, restic.TreeBlob, buf, restic.ID{}, false) return err diff --git a/internal/restic/repository.go b/internal/restic/repository.go index ed0c64cf0..c7f326823 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -42,7 +42,7 @@ type Repository interface { // WithUploader starts the necessary workers to upload new blobs. Once the callback returns, // the workers are stopped and the index is written to the repository. The callback must use // the passed context and must not keep references to any of its parameters after returning. - WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader BlobSaver) error) error + WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader BlobSaverWithAsync) error) error // List calls the function fn for each file of type t in the repository. // When an error is returned by fn, processing stops and List() returns the @@ -162,11 +162,23 @@ type BlobLoader interface { } type WithBlobUploader interface { - WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader BlobSaver) error) error + WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader BlobSaverWithAsync) error) error +} + +type BlobSaverWithAsync interface { + BlobSaver + BlobSaverAsync } type BlobSaver interface { - SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, int, error) + // SaveBlob saves a blob to the repository. ctx must be derived from the context created by WithBlobUploader. + SaveBlob(ctx context.Context, tpe BlobType, buf []byte, id ID, storeDuplicate bool) (newID ID, known bool, sizeInRepo int, err error) +} + +type BlobSaverAsync interface { + // SaveBlobAsync saves a blob to the repository. ctx must be derived from the context created by WithBlobUploader. + // The callback is called asynchronously from a different goroutine. + SaveBlobAsync(ctx context.Context, tpe BlobType, buf []byte, id ID, storeDuplicate bool, cb func(newID ID, known bool, sizeInRepo int, err error)) } // Loader loads a blob from a repository. diff --git a/internal/restorer/restorer_test.go b/internal/restorer/restorer_test.go index fe9db6b33..2e419f55c 100644 --- a/internal/restorer/restorer_test.go +++ b/internal/restorer/restorer_test.go @@ -171,7 +171,7 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot, getGe defer cancel() var treeID restic.ID - rtest.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error { + rtest.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { treeID = saveDir(t, uploader, snapshot.Nodes, 1000, getGenericAttributes) return nil }))