repository: add SaveBlobAsync method

This commit is contained in:
Michael Eischer 2025-10-13 21:56:42 +02:00
parent 4d2da63829
commit 046b0e711d
20 changed files with 145 additions and 68 deletions

View file

@ -203,7 +203,7 @@ func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo res
startTime := time.Now()
// call WithBlobUploader() once and then loop over all selectedSnapshots
err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
for len(selectedSnapshots) > 0 && (batchSize < targetSize || time.Since(startTime) < minDuration) {
sn := selectedSnapshots[0]
selectedSnapshots = selectedSnapshots[1:]
@ -242,7 +242,7 @@ func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo res
}
func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
visitedTrees restic.AssociatedBlobSet, rootTreeID restic.ID, printer progress.Printer, uploader restic.BlobSaver) (uint64, error) {
visitedTrees restic.AssociatedBlobSet, rootTreeID restic.ID, printer progress.Printer, uploader restic.BlobSaverWithAsync) (uint64, error) {
wg, wgCtx := errgroup.WithContext(ctx)

View file

@ -353,7 +353,7 @@ func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Reposi
return err
}
err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
for _, blob := range list {
printer.S(" loading blob %v at %v (length %v)", blob.ID, blob.Offset, blob.Length)
if int(blob.Offset+blob.Length) > len(pack) {

View file

@ -153,7 +153,7 @@ func runRecover(ctx context.Context, gopts global.Options, term ui.Terminal) err
}
var treeID restic.ID
err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
treeID, err = data.SaveTree(ctx, uploader, tree)
if err != nil {

View file

@ -194,7 +194,7 @@ func filterAndReplaceSnapshot(ctx context.Context, repo restic.Repository, sn *d
var filteredTree restic.ID
var summary *data.SnapshotSummary
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
filteredTree, summary, err = filter(ctx, sn, uploader)
return err

View file

@ -874,7 +874,7 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
var rootTreeID restic.ID
err = arch.Repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
err = arch.Repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
wg, wgCtx := errgroup.WithContext(ctx)
start := time.Now()

View file

@ -56,7 +56,7 @@ func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS
return err
}
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
wg, ctx := errgroup.WithContext(ctx)
arch.runWorkers(ctx, wg, uploader)
@ -219,7 +219,7 @@ func TestArchiverSave(t *testing.T) {
arch.summary = &Summary{}
var fnr futureNodeResult
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
wg, ctx := errgroup.WithContext(ctx)
arch.runWorkers(ctx, wg, uploader)
@ -296,7 +296,7 @@ func TestArchiverSaveReaderFS(t *testing.T) {
arch.summary = &Summary{}
var fnr futureNodeResult
err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
err = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
wg, ctx := errgroup.WithContext(ctx)
arch.runWorkers(ctx, wg, uploader)
@ -415,29 +415,39 @@ type blobCountingRepo struct {
saved map[restic.BlobHandle]uint
}
func (repo *blobCountingRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaver) error) error {
return repo.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
func (repo *blobCountingRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaverWithAsync) error) error {
return repo.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
return fn(ctx, &blobCountingSaver{saver: uploader, blobCountingRepo: repo})
})
}
type blobCountingSaver struct {
saver restic.BlobSaver
saver restic.BlobSaverWithAsync
blobCountingRepo *blobCountingRepo
}
func (repo *blobCountingSaver) count(exists bool, h restic.BlobHandle) {
if exists {
return
}
repo.blobCountingRepo.m.Lock()
repo.blobCountingRepo.saved[h]++
repo.blobCountingRepo.m.Unlock()
}
func (repo *blobCountingSaver) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) {
id, exists, size, err := repo.saver.SaveBlob(ctx, t, buf, id, storeDuplicate)
if exists {
return id, exists, size, err
}
h := restic.BlobHandle{ID: id, Type: t}
repo.blobCountingRepo.m.Lock()
repo.blobCountingRepo.saved[h]++
repo.blobCountingRepo.m.Unlock()
repo.count(exists, restic.BlobHandle{ID: id, Type: t})
return id, exists, size, err
}
func (repo *blobCountingSaver) SaveBlobAsync(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, size int, err error)) {
repo.saver.SaveBlobAsync(ctx, t, buf, id, storeDuplicate, func(newID restic.ID, known bool, size int, err error) {
repo.count(known, restic.BlobHandle{ID: newID, Type: t})
cb(newID, known, size, err)
})
}
func appendToFile(t testing.TB, filename string, data []byte) {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
@ -840,7 +850,7 @@ func TestArchiverSaveDir(t *testing.T) {
defer back()
var treeID restic.ID
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
wg, ctx := errgroup.WithContext(ctx)
arch.runWorkers(ctx, wg, uploader)
meta, err := testFS.OpenFile(test.target, fs.O_NOFOLLOW, true)
@ -906,7 +916,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
arch.summary = &Summary{}
var fnr futureNodeResult
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
wg, ctx := errgroup.WithContext(ctx)
arch.runWorkers(ctx, wg, uploader)
meta, err := testFS.OpenFile(tempdir, fs.O_NOFOLLOW, true)
@ -1096,7 +1106,7 @@ func TestArchiverSaveTree(t *testing.T) {
}
var treeID restic.ID
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
wg, ctx := errgroup.WithContext(ctx)
arch.runWorkers(ctx, wg, uploader)
@ -2095,8 +2105,8 @@ type failSaveRepo struct {
err error
}
func (f *failSaveRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaver) error) error {
return f.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
func (f *failSaveRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaverWithAsync) error) error {
return f.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
return fn(ctx, &failSaveSaver{saver: uploader, failSaveRepo: f})
})
}
@ -2115,6 +2125,11 @@ func (f *failSaveSaver) SaveBlob(ctx context.Context, t restic.BlobType, buf []b
return f.saver.SaveBlob(ctx, t, buf, id, storeDuplicate)
}
func (f *failSaveSaver) SaveBlobAsync(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, sizeInRepo int, err error)) {
newID, known, sizeInRepo, err := f.SaveBlob(ctx, tpe, buf, id, storeDuplicate)
cb(newID, known, sizeInRepo, err)
}
func TestArchiverAbortEarlyOnError(t *testing.T) {
var testErr = errors.New("test error")
@ -2425,7 +2440,7 @@ func TestRacyFileTypeSwap(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
_ = repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
wg, ctx := errgroup.WithContext(ctx)
arch := New(repo, fs.Track{FS: statfs}, Options{})

View file

@ -527,7 +527,7 @@ func TestCheckerBlobTypeConfusion(t *testing.T) {
}
var id restic.ID
test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
id, err = data.SaveTree(ctx, uploader, damagedTree)
return err
@ -536,7 +536,7 @@ func TestCheckerBlobTypeConfusion(t *testing.T) {
buf, err := repo.LoadBlob(ctx, restic.TreeBlob, id, nil)
test.OK(t, err)
test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
_, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, buf, id, false)
return err
@ -561,7 +561,7 @@ func TestCheckerBlobTypeConfusion(t *testing.T) {
}
var rootID restic.ID
test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
test.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
rootID, err = data.SaveTree(ctx, uploader, rootTree)
return err

View file

@ -136,7 +136,7 @@ func TestCreateSnapshot(t testing.TB, repo restic.Repository, at time.Time, dept
}
var treeID restic.ID
test.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
test.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
treeID = fs.saveTree(ctx, uploader, seed, depth)
return nil
}))

View file

@ -107,7 +107,7 @@ func TestEmptyLoadTree(t *testing.T) {
tree := data.NewTree(0)
var id restic.ID
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
// save tree
id, err = data.SaveTree(ctx, uploader, tree)

View file

@ -20,7 +20,7 @@ func FuzzSaveLoadBlob(f *testing.F) {
id := restic.Hash(blob)
repo, _, _ := TestRepositoryWithVersion(t, 2)
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
_, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, blob, id, false)
return err
}))

View file

@ -563,7 +563,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er
if len(plan.repackPacks) != 0 {
printer.P("repacking packs\n")
bar := printer.NewCounter("packs repacked")
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
return CopyBlobs(ctx, repo, repo, uploader, plan.repackPacks, plan.keepBlobs, bar, printer.P)
})
if err != nil {

View file

@ -47,7 +47,7 @@ func TestPruneMaxUnusedDuplicate(t *testing.T) {
{bufs[1], bufs[3]},
{bufs[2], bufs[3]},
} {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
for _, blob := range blobs {
id, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, blob, restic.ID{}, true)
keep.Insert(restic.BlobHandle{Type: restic.DataBlob, ID: id})

View file

@ -25,7 +25,7 @@ func testPrune(t *testing.T, opts repository.PruneOptions, errOnUnused bool) {
createRandomBlobs(t, random, repo, 5, 0.5, true)
keep, _ := selectBlobs(t, random, repo, 0.5)
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
// duplicate a few blobs to exercise those code paths
for blob := range keep {
buf, err := repo.LoadBlob(ctx, blob.Type, blob.ID, nil)
@ -133,7 +133,7 @@ func TestPruneSmall(t *testing.T) {
const numBlobsCreated = 55
keep := restic.NewBlobSet()
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
// we need a minum of 11 packfiles, each packfile will be about 5 Mb long
for i := 0; i < numBlobsCreated; i++ {
buf := make([]byte, blobSize)

View file

@ -32,7 +32,7 @@ func CopyBlobs(
ctx context.Context,
repo restic.Repository,
dstRepo restic.Repository,
dstUploader restic.BlobSaver,
dstUploader restic.BlobSaverWithAsync,
packs restic.IDSet,
keepBlobs repackBlobSet,
p *progress.Counter,
@ -57,7 +57,7 @@ func repack(
ctx context.Context,
repo restic.Repository,
dstRepo restic.Repository,
uploader restic.BlobSaver,
uploader restic.BlobSaverWithAsync,
packs restic.IDSet,
keepBlobs repackBlobSet,
p *progress.Counter,

View file

@ -20,7 +20,7 @@ func randomSize(random *rand.Rand, min, max int) int {
func createRandomBlobs(t testing.TB, random *rand.Rand, repo restic.Repository, blobs int, pData float32, smallBlobs bool) {
// two loops to allow creating multiple pack files
for blobs > 0 {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
for blobs > 0 {
blobs--
var (
@ -70,7 +70,7 @@ func createRandomWrongBlob(t testing.TB, random *rand.Rand, repo restic.Reposito
// invert first data byte
buf[0] ^= 0xff
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
_, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, buf, id, false)
return err
}))
@ -150,7 +150,7 @@ func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSe
}
func repack(t *testing.T, repo restic.Repository, be backend.Backend, packs restic.IDSet, blobs restic.BlobSet) {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
return repository.CopyBlobs(ctx, repo, repo, uploader, packs, blobs, nil, nil)
}))
@ -265,7 +265,7 @@ func testRepackCopy(t *testing.T, version uint) {
_, keepBlobs := selectBlobs(t, random, repo, 0.2)
copyPacks := findPacksForBlobs(t, repo, keepBlobs)
rtest.OK(t, repoWrapped.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repoWrapped.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
return repository.CopyBlobs(ctx, repoWrapped, dstRepoWrapped, uploader, copyPacks, keepBlobs, nil, nil)
}))
rebuildAndReloadIndex(t, dstRepo)
@ -303,7 +303,7 @@ func testRepackWrongBlob(t *testing.T, version uint) {
_, keepBlobs := selectBlobs(t, random, repo, 0)
rewritePacks := findPacksForBlobs(t, repo, keepBlobs)
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
return repository.CopyBlobs(ctx, repo, repo, uploader, rewritePacks, keepBlobs, nil, nil)
})
if err == nil {
@ -336,7 +336,7 @@ func testRepackBlobFallback(t *testing.T, version uint) {
modbuf[0] ^= 0xff
// create pack with broken copy
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
_, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, modbuf, id, false)
return err
}))
@ -346,13 +346,13 @@ func testRepackBlobFallback(t *testing.T, version uint) {
rewritePacks := findPacksForBlobs(t, repo, keepBlobs)
// create pack with valid copy
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
_, _, _, err := uploader.SaveBlob(ctx, restic.DataBlob, buf, id, true)
return err
}))
// repack must fallback to valid copy
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
return repository.CopyBlobs(ctx, repo, repo, uploader, rewritePacks, keepBlobs, nil, nil)
}))

View file

@ -15,7 +15,7 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe
bar.SetMax(uint64(len(ids)))
defer bar.Done()
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
// examine all data the indexes have for the pack file
for b := range repo.ListPacksFromIndex(ctx, ids) {
blobs := b.Blobs

View file

@ -42,6 +42,7 @@ type Repository struct {
opts Options
packerWg *errgroup.Group
blobWg *errgroup.Group
uploader *packerUploader
treePM *packerManager
dataPM *packerManager
@ -559,11 +560,14 @@ func (r *Repository) removeUnpacked(ctx context.Context, t restic.FileType, id r
return r.be.Remove(ctx, backend.Handle{Type: t, Name: id.String()})
}
func (r *Repository) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaver) error) error {
func (r *Repository) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaverWithAsync) error) error {
wg, ctx := errgroup.WithContext(ctx)
r.startPackUploader(ctx, wg)
saverCtx := r.startBlobSaver(ctx, wg)
wg.Go(func() error {
if err := fn(ctx, &blobSaverRepo{repo: r}); err != nil {
// must use saverCtx to ensure that the ctx used for saveBlob calls is bound to it
// otherwise the blob saver could deadlock in case of an error.
if err := fn(saverCtx, &blobSaverRepo{repo: r}); err != nil {
return err
}
if err := r.flush(ctx); err != nil {
@ -574,14 +578,6 @@ func (r *Repository) WithBlobUploader(ctx context.Context, fn func(ctx context.C
return wg.Wait()
}
type blobSaverRepo struct {
repo *Repository
}
func (r *blobSaverRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, size int, err error) {
return r.repo.saveBlob(ctx, t, buf, id, storeDuplicate)
}
func (r *Repository) startPackUploader(ctx context.Context, wg *errgroup.Group) {
if r.packerWg != nil {
panic("uploader already started")
@ -598,8 +594,40 @@ func (r *Repository) startPackUploader(ctx context.Context, wg *errgroup.Group)
})
}
func (r *Repository) startBlobSaver(ctx context.Context, wg *errgroup.Group) context.Context {
// blob upload computations are CPU bound
blobWg, blobCtx := errgroup.WithContext(ctx)
blobWg.SetLimit(runtime.GOMAXPROCS(0))
r.blobWg = blobWg
wg.Go(func() error {
// As the goroutines are only spawned on demand, wait until the context is canceled.
// This will either happen on an error while saving a blob or when blobWg.Wait() is called
// by flushBlobUploader().
<-blobCtx.Done()
return blobWg.Wait()
})
return blobCtx
}
type blobSaverRepo struct {
repo *Repository
}
func (r *blobSaverRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, size int, err error) {
return r.repo.saveBlob(ctx, t, buf, id, storeDuplicate)
}
func (r *blobSaverRepo) SaveBlobAsync(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, size int, err error)) {
r.repo.saveBlobAsync(ctx, t, buf, id, storeDuplicate, cb)
}
// Flush saves all remaining packs and the index
func (r *Repository) flush(ctx context.Context) error {
if err := r.flushBlobUploader(); err != nil {
return err
}
if err := r.flushPacks(ctx); err != nil {
return err
}
@ -607,6 +635,15 @@ func (r *Repository) flush(ctx context.Context) error {
return r.idx.Flush(ctx, &internalRepository{r})
}
func (r *Repository) flushBlobUploader() error {
if r.blobWg == nil {
return nil
}
err := r.blobWg.Wait()
r.blobWg = nil
return err
}
// FlushPacks saves all remaining packs.
func (r *Repository) flushPacks(ctx context.Context) error {
if r.packerWg == nil {
@ -994,6 +1031,19 @@ func (r *Repository) saveBlob(ctx context.Context, t restic.BlobType, buf []byte
return newID, known, size, err
}
func (r *Repository) saveBlobAsync(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, size int, err error)) {
r.blobWg.Go(func() error {
if ctx.Err() != nil {
// fail fast if the context is cancelled
cb(restic.ID{}, false, 0, context.Cause(ctx))
return context.Cause(ctx)
}
newID, known, size, err := r.saveBlob(ctx, t, buf, id, storeDuplicate)
cb(newID, known, size, err)
return err
})
}
type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error)

View file

@ -51,7 +51,7 @@ func testSave(t *testing.T, version uint, calculateID bool) {
id := restic.Hash(data)
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
// save
inputID := restic.ID{}
if !calculateID {
@ -97,7 +97,7 @@ func testSavePackMerging(t *testing.T, targetPercentage int, expectedPacks int)
})
var ids restic.IDs
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
// add blobs with size targetPercentage / 100 * repo.PackSize to the repository
blobSize := repository.MinPackSize / 100
for range targetPercentage {
@ -147,7 +147,7 @@ func benchmarkSaveAndEncrypt(t *testing.B, version uint) {
t.ResetTimer()
t.SetBytes(int64(size))
_ = repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
_ = repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
for i := 0; i < t.N; i++ {
_, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, data, id, true)
rtest.OK(t, err)
@ -168,7 +168,7 @@ func testLoadBlob(t *testing.T, version uint) {
rtest.OK(t, err)
var id restic.ID
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
id, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, buf, restic.ID{}, false)
return err
@ -196,7 +196,7 @@ func TestLoadBlobBroken(t *testing.T) {
buf := rtest.Random(42, 1000)
var id restic.ID
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
id, _, _, err = uploader.SaveBlob(ctx, restic.TreeBlob, buf, restic.ID{}, false)
return err
@ -225,7 +225,7 @@ func benchmarkLoadBlob(b *testing.B, version uint) {
rtest.OK(b, err)
var id restic.ID
rtest.OK(b, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(b, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
id, _, _, err = uploader.SaveBlob(ctx, restic.DataBlob, buf, restic.ID{}, false)
return err
@ -361,7 +361,7 @@ func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) {
// saveRandomDataBlobs generates random data blobs and saves them to the repository.
func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
for i := 0; i < num; i++ {
size := rand.Int() % sizeMax
@ -432,7 +432,7 @@ func TestListPack(t *testing.T) {
buf := rtest.Random(42, 1000)
var id restic.ID
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
var err error
id, _, _, err = uploader.SaveBlob(ctx, restic.TreeBlob, buf, restic.ID{}, false)
return err

View file

@ -42,7 +42,7 @@ type Repository interface {
// WithUploader starts the necessary workers to upload new blobs. Once the callback returns,
// the workers are stopped and the index is written to the repository. The callback must use
// the passed context and must not keep references to any of its parameters after returning.
WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader BlobSaver) error) error
WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader BlobSaverWithAsync) error) error
// List calls the function fn for each file of type t in the repository.
// When an error is returned by fn, processing stops and List() returns the
@ -162,11 +162,23 @@ type BlobLoader interface {
}
type WithBlobUploader interface {
WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader BlobSaver) error) error
WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader BlobSaverWithAsync) error) error
}
type BlobSaverWithAsync interface {
BlobSaver
BlobSaverAsync
}
type BlobSaver interface {
SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, int, error)
// SaveBlob saves a blob to the repository. ctx must be derived from the context created by WithBlobUploader.
SaveBlob(ctx context.Context, tpe BlobType, buf []byte, id ID, storeDuplicate bool) (newID ID, known bool, sizeInRepo int, err error)
}
type BlobSaverAsync interface {
// SaveBlobAsync saves a blob to the repository. ctx must be derived from the context created by WithBlobUploader.
// The callback is called asynchronously from a different goroutine.
SaveBlobAsync(ctx context.Context, tpe BlobType, buf []byte, id ID, storeDuplicate bool, cb func(newID ID, known bool, sizeInRepo int, err error))
}
// Loader loads a blob from a repository.

View file

@ -171,7 +171,7 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot, getGe
defer cancel()
var treeID restic.ID
rtest.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
rtest.OK(t, repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
treeID = saveDir(t, uploader, snapshot.Nodes, 1000, getGenericAttributes)
return nil
}))