Merge pull request #5621 from MichaelEischer/copy-stream-snapshots

copy: iterate through snapshots
This commit is contained in:
Michael Eischer 2025-12-03 21:21:05 +01:00 committed by GitHub
commit a8be8e36fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"iter"
"time" "time"
"github.com/restic/restic/internal/data" "github.com/restic/restic/internal/data"
@ -75,33 +76,33 @@ func (opts *CopyOptions) AddFlags(f *pflag.FlagSet) {
func collectAllSnapshots(ctx context.Context, opts CopyOptions, func collectAllSnapshots(ctx context.Context, opts CopyOptions,
srcSnapshotLister restic.Lister, srcRepo restic.Repository, srcSnapshotLister restic.Lister, srcRepo restic.Repository,
dstSnapshotByOriginal map[restic.ID][]*data.Snapshot, args []string, printer progress.Printer, dstSnapshotByOriginal map[restic.ID][]*data.Snapshot, args []string, printer progress.Printer,
) (selectedSnapshots []*data.Snapshot) { ) iter.Seq[*data.Snapshot] {
return func(yield func(*data.Snapshot) bool) {
selectedSnapshots = make([]*data.Snapshot, 0, 10) for sn := range FindFilteredSnapshots(ctx, srcSnapshotLister, srcRepo, &opts.SnapshotFilter, args, printer) {
for sn := range FindFilteredSnapshots(ctx, srcSnapshotLister, srcRepo, &opts.SnapshotFilter, args, printer) { // check whether the destination has a snapshot with the same persistent ID which has similar snapshot fields
// check whether the destination has a snapshot with the same persistent ID which has similar snapshot fields srcOriginal := *sn.ID()
srcOriginal := *sn.ID() if sn.Original != nil {
if sn.Original != nil { srcOriginal = *sn.Original
srcOriginal = *sn.Original }
} if originalSns, ok := dstSnapshotByOriginal[srcOriginal]; ok {
if originalSns, ok := dstSnapshotByOriginal[srcOriginal]; ok { isCopy := false
isCopy := false for _, originalSn := range originalSns {
for _, originalSn := range originalSns { if similarSnapshots(originalSn, sn) {
if similarSnapshots(originalSn, sn) { printer.V("\n%v", sn)
printer.V("\n%v", sn) printer.V("skipping source snapshot %s, was already copied to snapshot %s", sn.ID().Str(), originalSn.ID().Str())
printer.V("skipping source snapshot %s, was already copied to snapshot %s", sn.ID().Str(), originalSn.ID().Str()) isCopy = true
isCopy = true break
break }
}
if isCopy {
continue
} }
} }
if isCopy { if !yield(sn) {
continue return
} }
} }
selectedSnapshots = append(selectedSnapshots, sn)
} }
return selectedSnapshots
} }
func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args []string, term ui.Terminal) error { func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args []string, term ui.Terminal) error {
@ -189,7 +190,7 @@ func similarSnapshots(sna *data.Snapshot, snb *data.Snapshot) bool {
// copyTreeBatched copies multiple snapshots in one go. Snapshots are written after // copyTreeBatched copies multiple snapshots in one go. Snapshots are written after
// data equivalent to at least 10 packfiles was written. // data equivalent to at least 10 packfiles was written.
func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository, func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
selectedSnapshots []*data.Snapshot, printer progress.Printer) error { selectedSnapshots iter.Seq[*data.Snapshot], printer progress.Printer) error {
// remember already processed trees across all snapshots // remember already processed trees across all snapshots
visitedTrees := srcRepo.NewAssociatedBlobSet() visitedTrees := srcRepo.NewAssociatedBlobSet()
@ -197,16 +198,23 @@ func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo res
targetSize := uint64(dstRepo.PackSize()) * 100 targetSize := uint64(dstRepo.PackSize()) * 100
minDuration := 1 * time.Minute minDuration := 1 * time.Minute
for len(selectedSnapshots) > 0 { // use pull-based iterator to allow iteration in multiple steps
next, stop := iter.Pull(selectedSnapshots)
defer stop()
for {
var batch []*data.Snapshot var batch []*data.Snapshot
batchSize := uint64(0) batchSize := uint64(0)
startTime := time.Now() startTime := time.Now()
// call WithBlobUploader() once and then loop over all selectedSnapshots // call WithBlobUploader() once and then loop over all selectedSnapshots
err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
for len(selectedSnapshots) > 0 && (batchSize < targetSize || time.Since(startTime) < minDuration) { for batchSize < targetSize || time.Since(startTime) < minDuration {
sn := selectedSnapshots[0] sn, ok := next()
selectedSnapshots = selectedSnapshots[1:] if !ok {
break
}
batch = append(batch, sn) batch = append(batch, sn)
printer.P("\n%v", sn) printer.P("\n%v", sn)
@ -225,6 +233,11 @@ func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo res
return err return err
} }
// if no snapshots were processed in this batch, we're done
if len(batch) == 0 {
break
}
// add a newline to separate saved snapshot messages from the other messages // add a newline to separate saved snapshot messages from the other messages
if len(batch) > 1 { if len(batch) > 1 {
printer.P("") printer.P("")