azure: enhanced upload with single PutBlob API and configurable upload methods

This commit is contained in:
Srigovind Nayak 2025-10-01 22:39:04 +05:30
parent 481fcb9ca7
commit dd2d562b7b
No known key found for this signature in database
GPG key ID: 09006810B7263D69
2 changed files with 77 additions and 9 deletions

View file

@ -42,6 +42,7 @@ type Backend struct {
} }
const saveLargeSize = 256 * 1024 * 1024 const saveLargeSize = 256 * 1024 * 1024
const singleBlobMaxSize = 5000 * 1024 * 1024 // 5000 MiB - max size for Put Blob API in service version 2019-12-12+
const defaultListMaxItems = 5000 const defaultListMaxItems = 5000
// make sure that *Backend implements backend.Backend // make sure that *Backend implements backend.Backend
@ -53,6 +54,12 @@ func NewFactory() location.Factory {
func open(cfg Config, rt http.RoundTripper) (*Backend, error) { func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
debug.Log("open, config %#v", cfg) debug.Log("open, config %#v", cfg)
// Validate configuration
if err := cfg.Validate(); err != nil {
return nil, err
}
var client *azContainer.Client var client *azContainer.Client
var err error var err error
@ -255,12 +262,35 @@ func (be *Backend) Save(ctx context.Context, h backend.Handle, rd backend.Rewind
} }
var err error var err error
if rd.Length() < saveLargeSize { uploadMethod := strings.ToLower(be.cfg.UploadMethod)
// if it's smaller than 256miB, then just create the file directly from the reader fileSize := rd.Length()
err = be.saveSmall(ctx, objName, rd, accessTier)
} else { switch uploadMethod {
// otherwise use the more complicated method case "single":
err = be.saveLarge(ctx, objName, rd, accessTier) // Always use single blob upload
if fileSize > singleBlobMaxSize {
return errors.Errorf("file size %d exceeds single blob limit of %d MiB", fileSize, singleBlobMaxSize/1024/1024)
}
err = be.saveSingleBlob(ctx, objName, rd, accessTier)
case "blocks":
// Legacy block-based upload method
if fileSize < saveLargeSize {
err = be.saveSmall(ctx, objName, rd, accessTier)
} else {
err = be.saveLarge(ctx, objName, rd, accessTier)
}
case "auto", "":
// Automatic selection: use single blob for files <= 5000 MiB, blocks for larger files
if fileSize <= singleBlobMaxSize {
err = be.saveSingleBlob(ctx, objName, rd, accessTier)
} else {
err = be.saveLarge(ctx, objName, rd, accessTier)
}
default:
return errors.Errorf("invalid upload method %q, must be 'auto', 'single', or 'blocks'", uploadMethod)
} }
return err return err
@ -348,6 +378,29 @@ func (be *Backend) saveLarge(ctx context.Context, objName string, rd backend.Rew
return errors.Wrap(err, "CommitBlockList") return errors.Wrap(err, "CommitBlockList")
} }
// saveSingleBlob uploads data using a single Put Blob operation.
// This method is more efficient for files under 5000 MiB as it requires only one API call
// instead of the two calls (StageBlock + CommitBlockList) required by the block-based approach.
func (be *Backend) saveSingleBlob(ctx context.Context, objName string, rd backend.RewindReader, accessTier blob.AccessTier) error {
blockBlobClient := be.container.NewBlockBlobClient(objName)
buf := make([]byte, rd.Length())
_, err := io.ReadFull(rd, buf)
if err != nil {
return errors.Wrap(err, "ReadFull")
}
reader := bytes.NewReader(buf)
opts := &blockblob.UploadOptions{
Tier: &accessTier,
TransactionalValidation: blob.TransferValidationTypeMD5(rd.Hash()),
}
debug.Log("Upload single blob %v with %d bytes", objName, len(buf))
_, err = blockBlobClient.Upload(ctx, streaming.NopCloser(reader), opts)
return errors.Wrap(err, "Upload")
}
// Load runs fn with a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. // given offset.
func (be *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { func (be *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {

View file

@ -22,14 +22,16 @@ type Config struct {
Container string Container string
Prefix string Prefix string
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
AccessTier string `option:"access-tier" help:"set the access tier for the blob storage (default: inferred from the storage account defaults)"` AccessTier string `option:"access-tier" help:"set the access tier for the blob storage (default: inferred from the storage account defaults)"`
UploadMethod string `option:"upload-method" help:"blob upload method: 'auto' (single blob for <=5000 MiB), 'single' (always single blob), or 'blocks' (legacy block-based) (default: auto)"`
} }
// NewConfig returns a new Config with the default values filled in. // NewConfig returns a new Config with the default values filled in.
func NewConfig() Config { func NewConfig() Config {
return Config{ return Config{
Connections: 5, Connections: 5,
UploadMethod: "auto",
} }
} }
@ -85,3 +87,16 @@ func (cfg *Config) ApplyEnvironment(prefix string) {
cfg.EndpointSuffix = os.Getenv(prefix + "AZURE_ENDPOINT_SUFFIX") cfg.EndpointSuffix = os.Getenv(prefix + "AZURE_ENDPOINT_SUFFIX")
} }
} }
// Validate checks the configuration for errors.
func (cfg *Config) Validate() error {
// Normalize upload method to lowercase
uploadMethod := strings.ToLower(cfg.UploadMethod)
if uploadMethod != "auto" && uploadMethod != "single" && uploadMethod != "blocks" && uploadMethod != "" {
return errors.Errorf("invalid upload method %q, must be 'auto', 'single', or 'blocks'", cfg.UploadMethod)
}
if uploadMethod != "" {
cfg.UploadMethod = uploadMethod
}
return nil
}