archiver: port to repository.SaveBlobAsync

This commit is contained in:
Michael Eischer 2025-10-13 22:01:40 +02:00
parent 046b0e711d
commit dd6cb0dd8e
8 changed files with 89 additions and 302 deletions

View file

@ -96,7 +96,6 @@ type Archiver struct {
FS fs.FS
Options Options
blobSaver *blobSaver
fileSaver *fileSaver
treeSaver *treeSaver
mu sync.Mutex
@ -145,11 +144,6 @@ type Options struct {
// turned out to be a good default for most situations).
ReadConcurrency uint
// SaveBlobConcurrency sets how many blobs are hashed and saved
// concurrently. If it's set to zero, the default is the number of CPUs
// available in the system.
SaveBlobConcurrency uint
// SaveTreeConcurrency sets how many trees are marshalled and saved to the
// repo concurrently.
SaveTreeConcurrency uint
@ -165,12 +159,6 @@ func (o Options) ApplyDefaults() Options {
o.ReadConcurrency = 2
}
if o.SaveBlobConcurrency == 0 {
// blob saving is CPU bound due to hash checking and encryption
// the actual upload is handled by the repository itself
o.SaveBlobConcurrency = uint(runtime.GOMAXPROCS(0))
}
if o.SaveTreeConcurrency == 0 {
// can either wait for a file, wait for a tree, serialize a tree or wait for saveblob
// the last two are cpu-bound and thus mutually exclusive.
@ -834,24 +822,20 @@ func (arch *Archiver) loadParentTree(ctx context.Context, sn *data.Snapshot) *da
}
// runWorkers starts the worker pools, which are stopped when the context is cancelled.
func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaver) {
arch.blobSaver = newBlobSaver(ctx, wg, uploader, arch.Options.SaveBlobConcurrency)
func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync) {
arch.fileSaver = newFileSaver(ctx, wg,
arch.blobSaver.Save,
uploader,
arch.Repo.Config().ChunkerPolynomial,
arch.Options.ReadConcurrency, arch.Options.SaveBlobConcurrency)
arch.Options.ReadConcurrency)
arch.fileSaver.CompleteBlob = arch.CompleteBlob
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
arch.treeSaver = newTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, arch.blobSaver.Save, arch.Error)
arch.treeSaver = newTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, uploader, arch.Error)
}
func (arch *Archiver) stopWorkers() {
arch.blobSaver.TriggerShutdown()
arch.fileSaver.TriggerShutdown()
arch.treeSaver.TriggerShutdown()
arch.blobSaver = nil
arch.fileSaver = nil
arch.treeSaver = nil
}

View file

@ -2084,8 +2084,6 @@ func TestArchiverContextCanceled(t *testing.T) {
type TrackFS struct {
fs.FS
errorOn map[string]error
opened map[string]uint
m sync.Mutex
}
@ -2101,33 +2099,53 @@ func (m *TrackFS) OpenFile(name string, flag int, metadataOnly bool) (fs.File, e
type failSaveRepo struct {
archiverRepo
failAfter int32
cnt int32
cnt atomic.Int32
err error
}
func (f *failSaveRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaverWithAsync) error) error {
return f.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
return fn(ctx, &failSaveSaver{saver: uploader, failSaveRepo: f})
return fn(ctx, &failSaveSaver{saver: uploader, failSaveRepo: f, semaphore: make(chan struct{}, 1)})
})
}
type failSaveSaver struct {
saver restic.BlobSaver
saver restic.BlobSaverWithAsync
failSaveRepo *failSaveRepo
semaphore chan struct{}
}
func (f *failSaveSaver) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) {
val := atomic.AddInt32(&f.failSaveRepo.cnt, 1)
val := f.failSaveRepo.cnt.Add(1)
if val >= f.failSaveRepo.failAfter {
return restic.Hash(buf), false, 0, f.failSaveRepo.err
return restic.ID{}, false, 0, f.failSaveRepo.err
}
return f.saver.SaveBlob(ctx, t, buf, id, storeDuplicate)
}
func (f *failSaveSaver) SaveBlobAsync(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, sizeInRepo int, err error)) {
newID, known, sizeInRepo, err := f.SaveBlob(ctx, tpe, buf, id, storeDuplicate)
cb(newID, known, sizeInRepo, err)
func (f *failSaveSaver) SaveBlobAsync(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, size int, err error)) {
// limit concurrency to make test reliable
f.semaphore <- struct{}{}
val := f.failSaveRepo.cnt.Add(1)
if val >= f.failSaveRepo.failAfter {
// use a canceled context to make SaveBlobAsync fail
var cancel context.CancelCauseFunc
ctx, cancel = context.WithCancelCause(ctx)
cancel(f.failSaveRepo.err)
}
f.saver.SaveBlobAsync(ctx, t, buf, id, storeDuplicate, func(newID restic.ID, known bool, size int, err error) {
if val >= f.failSaveRepo.failAfter {
if err == nil {
panic("expected error")
}
err = f.failSaveRepo.err
}
cb(newID, known, size, err)
<-f.semaphore
})
}
func TestArchiverAbortEarlyOnError(t *testing.T) {
@ -2152,6 +2170,7 @@ func TestArchiverAbortEarlyOnError(t *testing.T) {
filepath.FromSlash("dir/baz"): 1,
filepath.FromSlash("dir/foo"): 1,
},
err: testErr,
},
{
src: TestDir{
@ -2177,7 +2196,7 @@ func TestArchiverAbortEarlyOnError(t *testing.T) {
filepath.FromSlash("dir/file9"): 0,
},
// fails after four to seven files were opened, as the ReadConcurrency allows for
// two queued files and SaveBlobConcurrency for one blob queued for saving.
// two queued files and one blob queued for saving.
failAfter: 4,
err: testErr,
},
@ -2198,10 +2217,6 @@ func TestArchiverAbortEarlyOnError(t *testing.T) {
opened: make(map[string]uint),
}
if testFS.errorOn == nil {
testFS.errorOn = make(map[string]error)
}
testRepo := &failSaveRepo{
archiverRepo: repo,
failAfter: int32(test.failAfter),
@ -2210,9 +2225,12 @@ func TestArchiverAbortEarlyOnError(t *testing.T) {
// at most two files may be queued
arch := New(testRepo, testFS, Options{
ReadConcurrency: 2,
SaveBlobConcurrency: 1,
ReadConcurrency: 2,
})
arch.Error = func(item string, err error) error {
t.Logf("archiver error for %q: %v", item, err)
return err
}
_, _, _, err := arch.Snapshot(ctx, []string{"."}, SnapshotOptions{Time: time.Now()})
if !errors.Is(err, test.err) {

View file

@ -1,105 +0,0 @@
package archiver
import (
"context"
"fmt"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
// saver allows saving a blob.
type saver interface {
SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error)
}
// blobSaver concurrently saves incoming blobs to the repo.
type blobSaver struct {
repo saver
ch chan<- saveBlobJob
}
// newBlobSaver returns a new blob. A worker pool is started, it is stopped
// when ctx is cancelled.
func newBlobSaver(ctx context.Context, wg *errgroup.Group, repo saver, workers uint) *blobSaver {
ch := make(chan saveBlobJob)
s := &blobSaver{
repo: repo,
ch: ch,
}
for i := uint(0); i < workers; i++ {
wg.Go(func() error {
return s.worker(ctx, ch)
})
}
return s
}
func (s *blobSaver) TriggerShutdown() {
close(s.ch)
}
// Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. It takes ownership of the buffer passed in.
func (s *blobSaver) Save(ctx context.Context, t restic.BlobType, buf *buffer, filename string, cb func(res saveBlobResponse)) {
select {
case s.ch <- saveBlobJob{BlobType: t, buf: buf, fn: filename, cb: cb}:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled")
}
}
type saveBlobJob struct {
restic.BlobType
buf *buffer
fn string
cb func(res saveBlobResponse)
}
type saveBlobResponse struct {
id restic.ID
length int
sizeInRepo int
known bool
}
func (s *blobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) {
id, known, sizeInRepo, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false)
if err != nil {
return saveBlobResponse{}, err
}
return saveBlobResponse{
id: id,
length: len(buf),
sizeInRepo: sizeInRepo,
known: known,
}, nil
}
func (s *blobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
for {
var job saveBlobJob
var ok bool
select {
case <-ctx.Done():
return nil
case job, ok = <-jobs:
if !ok {
return nil
}
}
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
if err != nil {
debug.Log("saveBlob returned error, exiting: %v", err)
return fmt.Errorf("failed to save blob from file %q: %w", job.fn, err)
}
job.cb(res)
job.buf.Release()
}
}

View file

@ -1,116 +0,0 @@
package archiver
import (
"context"
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
)
var errTest = errors.New("test error")
type saveFail struct {
cnt int32
failAt int32
}
func (b *saveFail) SaveBlob(_ context.Context, _ restic.BlobType, _ []byte, id restic.ID, _ bool) (restic.ID, bool, int, error) {
val := atomic.AddInt32(&b.cnt, 1)
if val == b.failAt {
return restic.ID{}, false, 0, errTest
}
return id, false, 0, nil
}
func TestBlobSaver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg, ctx := errgroup.WithContext(ctx)
saver := &saveFail{}
b := newBlobSaver(ctx, wg, saver, uint(runtime.NumCPU()))
var wait sync.WaitGroup
var results []saveBlobResponse
var lock sync.Mutex
wait.Add(20)
for i := 0; i < 20; i++ {
buf := &buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
idx := i
lock.Lock()
results = append(results, saveBlobResponse{})
lock.Unlock()
b.Save(ctx, restic.DataBlob, buf, "file", func(res saveBlobResponse) {
lock.Lock()
results[idx] = res
lock.Unlock()
wait.Done()
})
}
wait.Wait()
for i, sbr := range results {
if sbr.known {
t.Errorf("blob %v is known, that should not be the case", i)
}
}
b.TriggerShutdown()
err := wg.Wait()
if err != nil {
t.Fatal(err)
}
}
func TestBlobSaverError(t *testing.T) {
var tests = []struct {
blobs int
failAt int
}{
{20, 2},
{20, 5},
{20, 15},
{200, 150},
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg, ctx := errgroup.WithContext(ctx)
saver := &saveFail{
failAt: int32(test.failAt),
}
b := newBlobSaver(ctx, wg, saver, uint(runtime.NumCPU()))
for i := 0; i < test.blobs; i++ {
buf := &buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
b.Save(ctx, restic.DataBlob, buf, "errfile", func(res saveBlobResponse) {})
}
b.TriggerShutdown()
err := wg.Wait()
if err == nil {
t.Errorf("expected error not found")
}
rtest.Assert(t, errors.Is(err, errTest), "unexpected error %v", err)
rtest.Assert(t, strings.Contains(err.Error(), "errfile"), "expected error to contain 'errfile' got: %v", err)
})
}
}

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"runtime"
"sync"
"github.com/restic/chunker"
@ -15,13 +16,10 @@ import (
"golang.org/x/sync/errgroup"
)
// saveBlobFn saves a blob to a repo.
type saveBlobFn func(context.Context, restic.BlobType, *buffer, string, func(res saveBlobResponse))
// fileSaver concurrently saves incoming files to the repo.
type fileSaver struct {
saveFilePool *bufferPool
saveBlob saveBlobFn
uploader restic.BlobSaverAsync
pol chunker.Pol
@ -34,15 +32,17 @@ type fileSaver struct {
// newFileSaver returns a new file saver. A worker pool with fileWorkers is
// started, it is stopped when ctx is cancelled.
func newFileSaver(ctx context.Context, wg *errgroup.Group, save saveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *fileSaver {
func newFileSaver(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync, pol chunker.Pol, fileWorkers uint) *fileSaver {
ch := make(chan saveFileJob)
// TODO find a way to get rid of this parameter
blobWorkers := uint(runtime.GOMAXPROCS(0))
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
poolSize := fileWorkers + blobWorkers
s := &fileSaver{
saveBlob: save,
uploader: uploader,
saveFilePool: newBufferPool(int(poolSize), chunker.MaxSize),
pol: pol,
ch: ch,
@ -203,15 +203,20 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
node.Content = append(node.Content, restic.ID{})
lock.Unlock()
s.saveBlob(ctx, restic.DataBlob, buf, target, func(sbr saveBlobResponse) {
lock.Lock()
if !sbr.known {
fnr.stats.DataBlobs++
fnr.stats.DataSize += uint64(sbr.length)
fnr.stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
s.uploader.SaveBlobAsync(ctx, restic.DataBlob, buf.Data, restic.ID{}, false, func(newID restic.ID, known bool, sizeInRepo int, err error) {
defer buf.Release()
if err != nil {
completeError(err)
return
}
node.Content[pos] = sbr.id
lock.Lock()
if !known {
fnr.stats.DataBlobs++
fnr.stats.DataSize += uint64(len(buf.Data))
fnr.stats.DataSizeInRepo += uint64(sizeInRepo)
}
node.Content[pos] = newID
lock.Unlock()
completeBlob()

View file

@ -11,7 +11,6 @@ import (
"github.com/restic/chunker"
"github.com/restic/restic/internal/data"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
)
@ -34,22 +33,13 @@ func createTestFiles(t testing.TB, num int) (files []string) {
func startFileSaver(ctx context.Context, t testing.TB, _ fs.FS) (*fileSaver, context.Context, *errgroup.Group) {
wg, ctx := errgroup.WithContext(ctx)
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *buffer, _ string, cb func(saveBlobResponse)) {
cb(saveBlobResponse{
id: restic.Hash(buf.Data),
length: len(buf.Data),
sizeInRepo: len(buf.Data),
known: false,
})
}
workers := uint(runtime.NumCPU())
pol, err := chunker.RandomPolynomial()
if err != nil {
t.Fatal(err)
}
s := newFileSaver(ctx, wg, saveBlob, pol, workers, workers)
s := newFileSaver(ctx, wg, &noopSaver{}, pol, workers)
s.NodeFromFileInfo = func(snPath, filename string, meta ToNoder, ignoreXattrListError bool) (*data.Node, error) {
return meta.ToNode(ignoreXattrListError, t.Logf)
}

View file

@ -12,7 +12,7 @@ import (
// treeSaver concurrently saves incoming trees to the repo.
type treeSaver struct {
saveBlob saveBlobFn
uploader restic.BlobSaverAsync
errFn ErrorFunc
ch chan<- saveTreeJob
@ -20,12 +20,12 @@ type treeSaver struct {
// newTreeSaver returns a new tree saver. A worker pool with treeWorkers is
// started, it is stopped when ctx is cancelled.
func newTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob saveBlobFn, errFn ErrorFunc) *treeSaver {
func newTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, uploader restic.BlobSaverAsync, errFn ErrorFunc) *treeSaver {
ch := make(chan saveTreeJob)
s := &treeSaver{
ch: ch,
saveBlob: saveBlob,
uploader: uploader,
errFn: errFn,
}
@ -129,21 +129,35 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite
return nil, stats, err
}
b := &buffer{Data: buf}
ch := make(chan saveBlobResponse, 1)
s.saveBlob(ctx, restic.TreeBlob, b, job.target, func(res saveBlobResponse) {
ch <- res
var (
known bool
length int
sizeInRepo int
id restic.ID
)
ch := make(chan struct{}, 1)
s.uploader.SaveBlobAsync(ctx, restic.TreeBlob, buf, restic.ID{}, false, func(newID restic.ID, cbKnown bool, cbSizeInRepo int, cbErr error) {
known = cbKnown
length = len(buf)
sizeInRepo = cbSizeInRepo
id = newID
err = cbErr
ch <- struct{}{}
})
select {
case sbr := <-ch:
if !sbr.known {
case <-ch:
if err != nil {
return nil, stats, err
}
if !known {
stats.TreeBlobs++
stats.TreeSize += uint64(sbr.length)
stats.TreeSizeInRepo += uint64(sbr.sizeInRepo)
stats.TreeSize += uint64(length)
stats.TreeSizeInRepo += uint64(sizeInRepo)
}
node.Subtree = &sbr.id
node.Subtree = &id
return node, stats, nil
case <-ctx.Done():
return nil, stats, ctx.Err()

View file

@ -13,13 +13,10 @@ import (
"golang.org/x/sync/errgroup"
)
func treeSaveHelper(_ context.Context, _ restic.BlobType, buf *buffer, _ string, cb func(res saveBlobResponse)) {
cb(saveBlobResponse{
id: restic.NewRandomID(),
known: false,
length: len(buf.Data),
sizeInRepo: len(buf.Data),
})
type noopSaver struct{}
func (n *noopSaver) SaveBlobAsync(_ context.Context, _ restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, sizeInRepo int, err error)) {
cb(restic.Hash(buf), false, len(buf), nil)
}
func setupTreeSaver() (context.Context, context.CancelFunc, *treeSaver, func() error) {
@ -30,7 +27,7 @@ func setupTreeSaver() (context.Context, context.CancelFunc, *treeSaver, func() e
return err
}
b := newTreeSaver(ctx, wg, uint(runtime.NumCPU()), treeSaveHelper, errFn)
b := newTreeSaver(ctx, wg, uint(runtime.NumCPU()), &noopSaver{}, errFn)
shutdown := func() error {
b.TriggerShutdown()