Merge pull request #142 from MichaelEischer/atomic-upload

Atomic file upload and directory sync
This commit is contained in:
Alexander Neumann 2021-08-17 20:32:33 +02:00 committed by GitHub
commit 05773795dd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 187 additions and 11 deletions

View file

@ -0,0 +1,14 @@
Bugfix: Fix possible data loss due to interrupted network connections
When rest-server was run without `--append-only` it was possible to lose uploaded
files in a specific scenario in which a network connection was interrupted. For the
data loss to occur a file upload by restic would have to be interrupted such that
restic notices the interrupted network connection before the rest-server. Then
restic would have to retry the file upload and finish it before the rest-server
notices that the initial upload has failed. Then the uploaded file would be
accidentally removed by rest-server when trying to cleanup the failed upload.
This has been fixed by always uploading to a temporary file first which is moved
in position only once it was transfered completely.
https://github.com/restic/rest-server/pull/142

View file

@ -5,6 +5,7 @@ import (
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
@ -14,6 +15,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
)
@ -66,6 +68,7 @@ func newRequest(t testing.TB, method, path string, body io.Reader) *http.Request
// wantCode returns a function which checks that the response has the correct HTTP status code.
func wantCode(code int) wantFunc {
return func(t testing.TB, res *httptest.ResponseRecorder) {
t.Helper()
if res.Code != code {
t.Errorf("wrong response code, want %v, got %v", code, res.Code)
}
@ -75,6 +78,7 @@ func wantCode(code int) wantFunc {
// wantBody returns a function which checks that the response has the data in the body.
func wantBody(body string) wantFunc {
return func(t testing.TB, res *httptest.ResponseRecorder) {
t.Helper()
if res.Body == nil {
t.Errorf("body is nil, want %q", body)
return
@ -88,6 +92,7 @@ func wantBody(body string) wantFunc {
// checkRequest uses f to process the request and runs the checker functions on the result.
func checkRequest(t testing.TB, f http.HandlerFunc, req *http.Request, want []wantFunc) {
t.Helper()
rr := httptest.NewRecorder()
f(rr, req)
@ -221,8 +226,8 @@ func TestResticHandler(t *testing.T) {
{createOverwriteDeleteSeq(t, "/parent2/data/"+fileID, data)},
}
// setup rclone with a local backend in a temporary directory
tempdir, err := ioutil.TempDir("", "rclone-restic-test-")
// setup the server with a local backend in a temporary directory
tempdir, err := ioutil.TempDir("", "rest-server-test-")
if err != nil {
t.Fatal(err)
}
@ -324,3 +329,121 @@ func TestSplitURLPath(t *testing.T) {
})
}
}
// delayErrorReader blocks until Continue is closed, closes the channel FirstRead and then returns Err.
type delayErrorReader struct {
FirstRead chan struct{}
firstReadOnce sync.Once
Err error
Continue chan struct{}
}
func newDelayedErrorReader(err error) *delayErrorReader {
return &delayErrorReader{
Err: err,
Continue: make(chan struct{}),
FirstRead: make(chan struct{}),
}
}
func (d *delayErrorReader) Read(p []byte) (int, error) {
d.firstReadOnce.Do(func() {
// close the channel to signal that the first read has happened
close(d.FirstRead)
})
<-d.Continue
return 0, d.Err
}
// TestAbortedRequest runs tests with concurrent upload requests for the same file.
func TestAbortedRequest(t *testing.T) {
// setup the server with a local backend in a temporary directory
tempdir, err := ioutil.TempDir("", "rest-server-test-")
if err != nil {
t.Fatal(err)
}
// make sure the tempdir is properly removed
defer func() {
err := os.RemoveAll(tempdir)
if err != nil {
t.Fatal(err)
}
}()
// configure path, the race condition doesn't happen for append-only repositories
mux, err := NewHandler(&Server{
AppendOnly: false,
Path: tempdir,
NoAuth: true,
Debug: true,
PanicOnError: true,
})
if err != nil {
t.Fatalf("error from NewHandler: %v", err)
}
// create the repo
checkRequest(t, mux.ServeHTTP,
newRequest(t, "POST", "/?create=true", nil),
[]wantFunc{wantCode(http.StatusOK)})
var (
id = "b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c"
wg sync.WaitGroup
)
// the first request is an upload to a file which blocks while reading the
// body and then after some data returns an error
rd := newDelayedErrorReader(errors.New("injected"))
wg.Add(1)
go func() {
defer wg.Done()
// first, read some string, then read from rd (which blocks and then
// returns an error)
dataReader := io.MultiReader(strings.NewReader("invalid data from aborted request\n"), rd)
t.Logf("start first upload")
req := newRequest(t, "POST", "/data/"+id, dataReader)
rr := httptest.NewRecorder()
mux.ServeHTTP(rr, req)
t.Logf("first upload done, response %v (%v)", rr.Code, rr.Result().Status)
}()
// wait until the first request starts reading from the body
<-rd.FirstRead
// then while the first request is blocked we send a second request to
// delete the file and a third request to upload to the file again, only
// then the first request is unblocked.
t.Logf("delete file")
checkRequest(t, mux.ServeHTTP,
newRequest(t, "DELETE", "/data/"+id, nil),
nil) // don't check anything, restic also ignores errors here
t.Logf("upload again")
checkRequest(t, mux.ServeHTTP,
newRequest(t, "POST", "/data/"+id, strings.NewReader("foo\n")),
[]wantFunc{wantCode(http.StatusOK)})
// unblock the reader for the first request now so it can continue
close(rd.Continue)
// wait for the first request to continue
wg.Wait()
// request the file again, it must exist and contain the string from the
// second request
checkRequest(t, mux.ServeHTTP,
newRequest(t, "GET", "/data/"+id, nil),
[]wantFunc{
wantCode(http.StatusOK),
wantBody("foo\n"),
},
)
}

View file

@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"time"
@ -542,7 +543,18 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) {
}
path := h.getObjectPath(objectType, objectID)
tf, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_EXCL, h.opt.FileMode)
_, err := os.Stat(path)
if err == nil {
httpDefaultError(w, http.StatusForbidden)
return
}
if !os.IsNotExist(err) {
h.internalServerError(w, err)
return
}
tmpFn := objectID + ".rest-server-temp"
tf, err := ioutil.TempFile(filepath.Dir(path), tmpFn)
if os.IsNotExist(err) {
// the error is caused by a missing directory, create it and retry
mkdirErr := os.MkdirAll(filepath.Dir(path), h.opt.DirMode)
@ -550,13 +562,9 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) {
log.Print(mkdirErr)
} else {
// try again
tf, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_EXCL, h.opt.FileMode)
tf, err = ioutil.TempFile(filepath.Dir(path), tmpFn)
}
}
if os.IsExist(err) {
httpDefaultError(w, http.StatusForbidden)
return
}
if err != nil {
h.internalServerError(w, err)
return
@ -590,7 +598,7 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) {
if err != nil {
_ = tf.Close()
_ = os.Remove(path)
_ = os.Remove(tf.Name())
h.incrementRepoSpaceUsage(-written)
if h.opt.Debug {
log.Print(err)
@ -601,22 +609,53 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) {
if err := tf.Sync(); err != nil {
_ = tf.Close()
_ = os.Remove(path)
_ = os.Remove(tf.Name())
h.incrementRepoSpaceUsage(-written)
h.internalServerError(w, err)
return
}
if err := tf.Close(); err != nil {
_ = os.Remove(path)
_ = os.Remove(tf.Name())
h.incrementRepoSpaceUsage(-written)
h.internalServerError(w, err)
return
}
if err := os.Rename(tf.Name(), path); err != nil {
_ = os.Remove(tf.Name())
h.incrementRepoSpaceUsage(-written)
h.internalServerError(w, err)
return
}
if err := syncDir(filepath.Dir(path)); err != nil {
// Don't call os.Remove(path) as this is prone to race conditions with parallel upload retries
h.internalServerError(w, err)
return
}
h.sendMetric(objectType, BlobWrite, uint64(written))
}
func syncDir(dirname string) error {
if runtime.GOOS == "windows" {
// syncing a directory is not possible on windows
return nil
}
dir, err := os.Open(dirname)
if err != nil {
return err
}
err = dir.Sync()
if err != nil {
_ = dir.Close()
return err
}
return dir.Close()
}
// deleteBlob deletes a blob from the repository.
func (h *Handler) deleteBlob(w http.ResponseWriter, r *http.Request) {
if h.opt.Debug {