| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | package archiver | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"io" | 
					
						
							|  |  |  | 	"os" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/restic/chunker" | 
					
						
							|  |  |  | 	"github.com/restic/restic/internal/debug" | 
					
						
							|  |  |  | 	"github.com/restic/restic/internal/errors" | 
					
						
							|  |  |  | 	"github.com/restic/restic/internal/fs" | 
					
						
							|  |  |  | 	"github.com/restic/restic/internal/restic" | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 	tomb "gopkg.in/tomb.v2" | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-04-29 13:42:23 +02:00
										 |  |  | // FutureFile is returned by Save and will return the data once it | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | // has been processed. | 
					
						
							|  |  |  | type FutureFile struct { | 
					
						
							|  |  |  | 	ch  <-chan saveFileResponse | 
					
						
							|  |  |  | 	res saveFileResponse | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | // Wait blocks until the result of the save operation is received or ctx is | 
					
						
							|  |  |  | // cancelled. | 
					
						
							|  |  |  | func (s *FutureFile) Wait(ctx context.Context) { | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case res, ok := <-s.ch: | 
					
						
							|  |  |  | 		if ok { | 
					
						
							|  |  |  | 			s.res = res | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case <-ctx.Done(): | 
					
						
							|  |  |  | 		return | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Node returns the node once it is available. | 
					
						
							|  |  |  | func (s *FutureFile) Node() *restic.Node { | 
					
						
							|  |  |  | 	return s.res.node | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Stats returns the stats for the file once they are available. | 
					
						
							|  |  |  | func (s *FutureFile) Stats() ItemStats { | 
					
						
							|  |  |  | 	return s.res.stats | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Err returns the error in case an error occurred. | 
					
						
							|  |  |  | func (s *FutureFile) Err() error { | 
					
						
							|  |  |  | 	return s.res.err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | // SaveBlobFn saves a blob to a repo. | 
					
						
							|  |  |  | type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | // FileSaver concurrently saves incoming files to the repo. | 
					
						
							|  |  |  | type FileSaver struct { | 
					
						
							|  |  |  | 	saveFilePool *BufferPool | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 	saveBlob     SaveBlobFn | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	pol chunker.Pol | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-13 16:20:59 +02:00
										 |  |  | 	ch chan<- saveFileJob | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	CompleteBlob func(filename string, bytes uint64) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-04-29 15:34:41 +02:00
										 |  |  | // NewFileSaver returns a new file saver. A worker pool with fileWorkers is | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | // started, it is stopped when ctx is cancelled. | 
					
						
							| 
									
										
										
										
											2020-02-17 00:21:37 +01:00
										 |  |  | func NewFileSaver(ctx context.Context, t *tomb.Tomb, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { | 
					
						
							| 
									
										
										
										
											2018-04-29 15:34:41 +02:00
										 |  |  | 	ch := make(chan saveFileJob) | 
					
						
							| 
									
										
										
										
											2018-04-29 13:20:12 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-04-29 15:34:41 +02:00
										 |  |  | 	debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	poolSize := fileWorkers + blobWorkers | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	s := &FileSaver{ | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 		saveBlob:     save, | 
					
						
							| 
									
										
										
										
											2018-04-29 15:34:41 +02:00
										 |  |  | 		saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize), | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 		pol:          pol, | 
					
						
							|  |  |  | 		ch:           ch, | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		CompleteBlob: func(string, uint64) {}, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-04-29 13:20:12 +02:00
										 |  |  | 	for i := uint(0); i < fileWorkers; i++ { | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 		t.Go(func() error { | 
					
						
							|  |  |  | 			s.worker(t.Context(ctx), ch) | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 			return nil | 
					
						
							|  |  |  | 		}) | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return s | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // CompleteFunc is called when the file has been saved. | 
					
						
							|  |  |  | type CompleteFunc func(*restic.Node, ItemStats) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Save stores the file f and returns the data once it has been completed. The | 
					
						
							|  |  |  | // file is closed by Save. | 
					
						
							|  |  |  | func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureFile { | 
					
						
							|  |  |  | 	ch := make(chan saveFileResponse, 1) | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 	job := saveFileJob{ | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 		snPath:   snPath, | 
					
						
							|  |  |  | 		file:     file, | 
					
						
							|  |  |  | 		fi:       fi, | 
					
						
							|  |  |  | 		start:    start, | 
					
						
							|  |  |  | 		complete: complete, | 
					
						
							|  |  |  | 		ch:       ch, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 	select { | 
					
						
							|  |  |  | 	case s.ch <- job: | 
					
						
							|  |  |  | 	case <-ctx.Done(): | 
					
						
							|  |  |  | 		debug.Log("not sending job, context is cancelled: %v", ctx.Err()) | 
					
						
							| 
									
										
										
										
											2018-05-12 23:08:00 +02:00
										 |  |  | 		_ = file.Close() | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 		close(ch) | 
					
						
							|  |  |  | 		return FutureFile{ch: ch} | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	return FutureFile{ch: ch} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type saveFileJob struct { | 
					
						
							|  |  |  | 	snPath   string | 
					
						
							|  |  |  | 	file     fs.File | 
					
						
							|  |  |  | 	fi       os.FileInfo | 
					
						
							|  |  |  | 	ch       chan<- saveFileResponse | 
					
						
							|  |  |  | 	complete CompleteFunc | 
					
						
							|  |  |  | 	start    func() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type saveFileResponse struct { | 
					
						
							|  |  |  | 	node  *restic.Node | 
					
						
							|  |  |  | 	stats ItemStats | 
					
						
							|  |  |  | 	err   error | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // saveFile stores the file f in the repo, then closes it. | 
					
						
							|  |  |  | func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, f fs.File, fi os.FileInfo, start func()) saveFileResponse { | 
					
						
							|  |  |  | 	start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	stats := ItemStats{} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	debug.Log("%v", snPath) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	node, err := s.NodeFromFileInfo(f.Name(), fi) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		_ = f.Close() | 
					
						
							|  |  |  | 		return saveFileResponse{err: err} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if node.Type != "file" { | 
					
						
							|  |  |  | 		_ = f.Close() | 
					
						
							|  |  |  | 		return saveFileResponse{err: errors.Errorf("node type %q is wrong", node.Type)} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// reuse the chunker | 
					
						
							|  |  |  | 	chnker.Reset(f, s.pol) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var results []FutureBlob | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	node.Content = []restic.ID{} | 
					
						
							|  |  |  | 	var size uint64 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		buf := s.saveFilePool.Get() | 
					
						
							|  |  |  | 		chunk, err := chnker.Next(buf.Data) | 
					
						
							|  |  |  | 		if errors.Cause(err) == io.EOF { | 
					
						
							|  |  |  | 			buf.Release() | 
					
						
							|  |  |  | 			break | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-04-29 15:34:41 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 		buf.Data = chunk.Data | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		size += uint64(chunk.Length) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			_ = f.Close() | 
					
						
							|  |  |  | 			return saveFileResponse{err: err} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// test if the context has been cancelled, return the error | 
					
						
							|  |  |  | 		if ctx.Err() != nil { | 
					
						
							|  |  |  | 			_ = f.Close() | 
					
						
							|  |  |  | 			return saveFileResponse{err: ctx.Err()} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 		res := s.saveBlob(ctx, restic.DataBlob, buf) | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 		results = append(results, res) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// test if the context has been cancelled, return the error | 
					
						
							|  |  |  | 		if ctx.Err() != nil { | 
					
						
							|  |  |  | 			_ = f.Close() | 
					
						
							|  |  |  | 			return saveFileResponse{err: ctx.Err()} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		s.CompleteBlob(f.Name(), uint64(len(chunk.Data))) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	err = f.Close() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return saveFileResponse{err: err} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for _, res := range results { | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 		res.Wait(ctx) | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 		if !res.Known() { | 
					
						
							|  |  |  | 			stats.DataBlobs++ | 
					
						
							|  |  |  | 			stats.DataSize += uint64(res.Length()) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		node.Content = append(node.Content, res.ID()) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	node.Size = size | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return saveFileResponse{ | 
					
						
							|  |  |  | 		node:  node, | 
					
						
							|  |  |  | 		stats: stats, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	// a worker has one chunker which is reused for each file (because it contains a rather large buffer) | 
					
						
							|  |  |  | 	chnker := chunker.New(nil, s.pol) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		var job saveFileJob | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-ctx.Done(): | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		case job = <-jobs: | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		res := s.saveFile(ctx, chnker, job.snPath, job.file, job.fi, job.start) | 
					
						
							|  |  |  | 		if job.complete != nil { | 
					
						
							|  |  |  | 			job.complete(res.node, res.stats) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		job.ch <- res | 
					
						
							|  |  |  | 		close(job.ch) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } |