| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | package archiver | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 	"github.com/restic/restic/internal/debug" | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	"github.com/restic/restic/internal/restic" | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 	tomb "gopkg.in/tomb.v2" | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Saver allows saving a blob. | 
					
						
							|  |  |  | type Saver interface { | 
					
						
							| 
									
										
										
										
											2020-06-06 22:20:44 +02:00
										 |  |  | 	SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error) | 
					
						
							| 
									
										
										
										
											2020-07-25 21:19:46 +02:00
										 |  |  | 	Index() restic.MasterIndex | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // BlobSaver concurrently saves incoming blobs to the repo. | 
					
						
							|  |  |  | type BlobSaver struct { | 
					
						
							|  |  |  | 	repo Saver | 
					
						
							| 
									
										
										
										
											2020-06-06 22:20:44 +02:00
										 |  |  | 	ch   chan<- saveBlobJob | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NewBlobSaver returns a new blob. A worker pool is started, it is stopped | 
					
						
							|  |  |  | // when ctx is cancelled. | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *BlobSaver { | 
					
						
							| 
									
										
										
										
											2018-04-30 15:13:03 +02:00
										 |  |  | 	ch := make(chan saveBlobJob) | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	s := &BlobSaver{ | 
					
						
							| 
									
										
										
										
											2020-06-06 22:20:44 +02:00
										 |  |  | 		repo: repo, | 
					
						
							|  |  |  | 		ch:   ch, | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for i := uint(0); i < workers; i++ { | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 		t.Go(func() error { | 
					
						
							|  |  |  | 			return s.worker(t.Context(ctx), ch) | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 		}) | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return s | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Save stores a blob in the repo. It checks the index and the known blobs | 
					
						
							| 
									
										
										
										
											2020-06-11 13:34:05 +02:00
										 |  |  | // before saving anything. It takes ownership of the buffer passed in. | 
					
						
							| 
									
										
										
										
											2018-04-29 15:34:41 +02:00
										 |  |  | func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	ch := make(chan saveBlobResponse, 1) | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 	select { | 
					
						
							|  |  |  | 	case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: | 
					
						
							|  |  |  | 	case <-ctx.Done(): | 
					
						
							|  |  |  | 		debug.Log("not sending job, context is cancelled") | 
					
						
							|  |  |  | 		close(ch) | 
					
						
							|  |  |  | 		return FutureBlob{ch: ch} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	return FutureBlob{ch: ch, length: len(buf.Data)} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // FutureBlob is returned by SaveBlob and will return the data once it has been processed. | 
					
						
							|  |  |  | type FutureBlob struct { | 
					
						
							|  |  |  | 	ch     <-chan saveBlobResponse | 
					
						
							|  |  |  | 	length int | 
					
						
							|  |  |  | 	res    saveBlobResponse | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | // Wait blocks until the result is available or the context is cancelled. | 
					
						
							|  |  |  | func (s *FutureBlob) Wait(ctx context.Context) { | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case <-ctx.Done(): | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	case res, ok := <-s.ch: | 
					
						
							|  |  |  | 		if ok { | 
					
						
							|  |  |  | 			s.res = res | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ID returns the ID of the blob after it has been saved. | 
					
						
							|  |  |  | func (s *FutureBlob) ID() restic.ID { | 
					
						
							|  |  |  | 	return s.res.id | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Known returns whether or not the blob was already known. | 
					
						
							|  |  |  | func (s *FutureBlob) Known() bool { | 
					
						
							|  |  |  | 	return s.res.known | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Length returns the length of the blob. | 
					
						
							|  |  |  | func (s *FutureBlob) Length() int { | 
					
						
							|  |  |  | 	return s.length | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type saveBlobJob struct { | 
					
						
							|  |  |  | 	restic.BlobType | 
					
						
							| 
									
										
										
										
											2018-04-29 15:34:41 +02:00
										 |  |  | 	buf *Buffer | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	ch  chan<- saveBlobResponse | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type saveBlobResponse struct { | 
					
						
							|  |  |  | 	id    restic.ID | 
					
						
							|  |  |  | 	known bool | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) { | 
					
						
							| 
									
										
										
										
											2020-06-06 22:20:44 +02:00
										 |  |  | 	id, known, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false) | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 		return saveBlobResponse{}, err | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	return saveBlobResponse{ | 
					
						
							|  |  |  | 		id:    id, | 
					
						
							| 
									
										
										
										
											2020-06-06 22:20:44 +02:00
										 |  |  | 		known: known, | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 	}, nil | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error { | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 	for { | 
					
						
							|  |  |  | 		var job saveBlobJob | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-ctx.Done(): | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 			return nil | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 		case job = <-jobs: | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 		res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2018-05-12 21:40:31 +02:00
										 |  |  | 			debug.Log("saveBlob returned error, exiting: %v", err) | 
					
						
							| 
									
										
										
										
											2018-05-08 22:28:37 +02:00
										 |  |  | 			close(job.ch) | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		job.ch <- res | 
					
						
							| 
									
										
										
										
											2018-03-30 22:43:18 +02:00
										 |  |  | 		close(job.ch) | 
					
						
							|  |  |  | 		job.buf.Release() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } |