ffmpeg/libavformat/shared.c
Andreas Rheinhardt 5b9d8901a9 avformat/shared: use av_fallthrough to mark fallthroughs
Reviewed-by: Kacper Michajłow <kasper93@gmail.com>
Signed-off-by: Andreas Rheinhardt <andreas.rheinhardt@outlook.com>
2026-06-04 19:43:15 +02:00

882 lines
29 KiB
C

/*
* Shared file cache protocol.
* Copyright (c) 2026 Niklas Haas
*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* Based on cache.c by Michael Niedermayer
*/
#include "libavutil/attributes.h"
#include "libavutil/avassert.h"
#include "libavutil/avstring.h"
#include "libavutil/crc.h"
#include "libavutil/hash.h"
#include "libavutil/file_open.h"
#include "libavutil/mem.h"
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "url.h"
#include <errno.h>
#include <fcntl.h>
#include <stdatomic.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
/**
* This hash should be resistant against collision attacks, so that an
* attacker could not generate e.g. two different URIs that map to the same
* cache file. This requires at least 64 bits of collision resistance in
* practice (i.e. 128 bits = 16 bytes of hash size). However, we can be
* conservative by computing e.g. a 256 bit hash and storing it inside the
* file header for verification.
*
* Note that due to the way we use atomics, we should avoid zero bytes in
* the resulting hash; hence we tweak the input slightly to avoid this.
* The resulting loss in hash strength is negligible, since 32 bytes is
* already much more than needed.
*/
#define HASH_METHOD "SHA512/256"
#define HASH_SIZE 32
static int hash_uri(uint8_t hash[HASH_SIZE], const char *uri)
{
struct AVHashContext *ctx = NULL;
int ret = av_hash_alloc(&ctx, HASH_METHOD);
if (ret < 0)
return ret;
av_assert0(av_hash_get_size(ctx) == HASH_SIZE);
av_hash_init(ctx);
av_hash_update(ctx, (const uint8_t *) uri, strlen(uri));
av_hash_final(ctx, hash);
av_hash_freep(&ctx);
for (int i = 0; i < HASH_SIZE; i++)
hash[i] = hash[i] ? hash[i] : ~hash[i]; /* prevent zero bytes */
return 0;
}
#define HEADER_MAGIC MKTAG(u'\xFF', 'S', 'h', '$')
#define HEADER_VERSION 2
enum BlockState {
/* Reserved block state values */
BLOCK_NONE = 0, ///< block is not cached
BLOCK_PENDING, ///< a thread is currently trying to write this block
BLOCK_FAILED, ///< the underlying I/O source failed to read this block
/**
* All other block states represent valid cached blocks, with the value
* being the CRC of the block data.
*/
};
static uint16_t get_block_crc(const uint8_t *block, size_t block_size)
{
uint16_t crc = av_crc(av_crc_get_table(AV_CRC_16_ANSI), 0, block, block_size);
switch (crc) {
case BLOCK_NONE:
case BLOCK_FAILED:
case BLOCK_PENDING:
return ~crc; /* avoid reserved block states */
default:
return crc;
}
}
typedef struct Block {
atomic_ushort state; /* enum BlockState */
} Block;
typedef struct Spacemap {
atomic_uint header_magic;
atomic_ushort version;
atomic_ushort block_shift;
atomic_ullong filesize; /* byte offset of true EOF, or 0 if unknown */
atomic_uchar hash[HASH_SIZE]; /* hash of resource URI / filename */
char reserved[80];
Block blocks[];
} Spacemap;
/* Set to value iff the current value is unset (zero) */
#define DEF_SET_ONCE(ctype, atype) \
static int set_once_##atype(atomic_##atype *const ptr, const ctype value) \
{ \
ctype prev = 0; \
av_assert1(value != 0); \
if (atomic_compare_exchange_strong_explicit( \
ptr, &prev, value, memory_order_release, memory_order_relaxed)) \
return 1; \
else if (prev == value) \
return 0; \
else \
return AVERROR(EINVAL); \
}
DEF_SET_ONCE(unsigned char, uchar)
DEF_SET_ONCE(unsigned int, uint)
DEF_SET_ONCE(unsigned short, ushort)
DEF_SET_ONCE(unsigned long long, ullong)
typedef struct SharedContext {
AVClass *class;
URLContext *inner;
int64_t inner_pos;
/* options */
char *cache_dir;
int block_shift; ///< requested shift; may disagree with actual
int read_only;
int64_t timeout;
int retry_errors;
int verify;
/* misc state */
int64_t pos; ///< current logical position
uint8_t *tmp_buf;
int block_size;
int write_err; ///< write error occurred
/* cache file */
uint8_t *cache_data; ///< optional mmap of the cache file
char *cache_path;
off_t cache_size; ///< size of mapped memory region (for munmap)
int fd;
/* space map */
Spacemap *spacemap;
char *map_path;
off_t map_size;
int mapfd;
/* statistics */
int64_t nb_hit;
int64_t nb_miss;
} SharedContext;
static int shared_close(URLContext *h)
{
SharedContext *s = h->priv_data;
ffurl_close(s->inner);
if (s->cache_data)
munmap(s->cache_data, s->cache_size);
if (s->spacemap)
munmap(s->spacemap, s->map_size);
if (s->fd != -1)
close(s->fd);
if (s->mapfd != -1)
close(s->mapfd);
av_freep(&s->cache_path);
av_freep(&s->map_path);
av_freep(&s->tmp_buf);
av_log(h, AV_LOG_DEBUG, "Cache statistics: %"PRId64" hits, %"PRId64" misses\n",
s->nb_hit, s->nb_miss);
return 0;
}
static int cache_map(URLContext *h, int64_t filesize);
static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE]);
static int spacemap_grow(URLContext *h, int64_t block);
static int64_t get_filesize(URLContext *h)
{
SharedContext *s = h->priv_data;
return atomic_load_explicit(&s->spacemap->filesize, memory_order_relaxed);
}
static int set_filesize(URLContext *h, int64_t new_size)
{
SharedContext *s = h->priv_data;
int ret;
if (!new_size)
return 0;
ret = set_once_ullong(&s->spacemap->filesize, new_size);
if (ret < 0) {
av_log(h, AV_LOG_ERROR, "Cached file size mismatch, expected: "
"%"PRId64", got: %"PRIu64"!\n", new_size,
(uint64_t) atomic_load(&s->spacemap->filesize));
return ret;
} else if (ret) {
/* Opportunistically map the file; this also sets the correct filesize.
* Ignore errors as this is not critical to the cache logic. */
cache_map(h, new_size);
}
return ret;
}
static int shared_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
{
SharedContext *s = h->priv_data;
int ret;
if (!s->cache_dir || !s->cache_dir[0]) {
av_log(h, AV_LOG_ERROR, "Missing path for shared cache! Specify a "
"directory using the -cache_dir option.\n");
return AVERROR(EINVAL);
}
s->fd = s->mapfd = -1; /* Set these early for shared_close() failure path */
/* Open underlying protocol */
av_strstart(arg, "shared:", &arg);
ret = ffurl_open_whitelist(&s->inner, arg, flags, &h->interrupt_callback,
options, h->protocol_whitelist, h->protocol_blacklist, h);
if (ret < 0)
goto fail;
uint8_t hash[HASH_SIZE];
ret = hash_uri(hash, arg);
if (ret < 0)
goto fail;
/* 128 bits is enough for collision resistance; we already store the full
* hash inside the header for verification */
char filename[2 * 16 + 1];
for (int i = 0; i < FF_ARRAY_ELEMS(filename) / 2; i++)
sprintf(&filename[i * 2], "%02X", hash[i]);
s->cache_path = av_asprintf("%s/%s.cache", s->cache_dir, filename);
s->map_path = av_asprintf("%s/%s.spacemap", s->cache_dir, filename);
if (!s->cache_path || !s->map_path) {
ret = AVERROR(ENOMEM);
goto fail;
}
av_log(h, AV_LOG_VERBOSE, "Opening cache file '%s' for URI: '%s'\n",
s->cache_path, s->inner->filename);
s->fd = avpriv_open(s->cache_path, O_RDWR | O_CREAT, 0660);
s->mapfd = avpriv_open(s->map_path, O_RDWR | O_CREAT, 0660);
if (s->fd < 0 || s->mapfd < 0) {
ret = AVERROR(errno);
av_log(h, AV_LOG_ERROR, "Failed to open '%s': %s\n",
s->fd < 0 ? s->cache_path : s->map_path, av_err2str(ret));
goto fail;
}
ret = spacemap_init(h, hash);
if (ret < 0)
goto fail;
s->block_size = 1 << atomic_load(&s->spacemap->block_shift);
int64_t filesize = get_filesize(h);
if (!filesize) {
/* Filesize is not yet known, try to get it from the underlying URL */
filesize = ffurl_size(s->inner);
if (filesize < 0 && filesize != AVERROR(ENOSYS)) {
ret = (int) filesize;
goto fail;
} else if (filesize > 0)
set_filesize(h, filesize);
}
if (filesize > 0) {
int64_t last_pos = filesize - 1;
int64_t last_block = last_pos >> atomic_load(&s->spacemap->block_shift);
ret = spacemap_grow(h, last_block);
if (ret < 0)
goto fail;
/* If filesize is known, we can directly mmap() the cache file */
ret = cache_map(h, filesize);
if (ret < 0) {
av_log(h, AV_LOG_WARNING, "Failed to map cache file: %s. Falling "
"back to normal read/write\n", av_err2str(ret));
ret = 0;
}
}
if (!s->cache_data) {
/* Temporary buffer needed for pread/pwrite() fallback */
s->tmp_buf = av_malloc(s->block_size);
if (!s->tmp_buf) {
ret = AVERROR(ENOMEM);
goto fail;
}
}
h->max_packet_size = s->block_size;
h->min_packet_size = s->block_size;
fail:
if (ret < 0)
shared_close(h);
return ret;
}
static int cache_map(URLContext *h, int64_t filesize)
{
SharedContext *s = h->priv_data;
if (s->cache_size >= filesize || filesize > SIZE_MAX)
return 0;
if (s->cache_data) {
munmap(s->cache_data, s->cache_size);
s->cache_data = NULL;
s->cache_size = 0;
}
struct stat st;
int ret = fstat(s->fd, &st);
if (ret < 0)
return AVERROR(errno);
if (st.st_size != filesize) {
/* Ensure the file size is correct before mapping; this can happen if
* another process wrote the correct filesize to the header but
* crashed right before actually successfully resizing the file. */
ret = ftruncate(s->fd, filesize);
if (ret < 0)
return AVERROR(errno);
}
s->cache_data = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, s->fd, 0);
if (s->cache_data == MAP_FAILED) {
s->cache_data = NULL;
return AVERROR(errno);
}
s->cache_size = filesize;
return 0;
}
static int spacemap_remap(URLContext *h, size_t map_size)
{
SharedContext *s = h->priv_data;
struct flock fl = { .l_type = F_WRLCK };
int ret, did_grow = 0;
if (map_size <= s->map_size)
return 0;
/* Opportunistically get current filesize before attempting to lock */
struct stat st;
ret = fstat(s->mapfd, &st);
if (ret < 0) {
ret = AVERROR(errno);
goto fail;
}
if (st.st_size >= map_size)
goto skip_resize;
/* Lock the spacemap to ensure nobody else is currently resizing it */
ret = fcntl(s->mapfd, F_SETLKW, &fl);
if (ret < 0) {
ret = AVERROR(errno);
goto fail;
}
fl.l_type = F_UNLCK;
/* Refresh filesize after acquiring the lock */
ret = fstat(s->mapfd, &st);
if (ret < 0) {
ret = AVERROR(errno);
goto fail;
}
if (st.st_size >= map_size)
goto skip_resize;
ret = ftruncate(s->mapfd, map_size);
if (ret < 0) {
ret = AVERROR(errno);
goto fail;
}
st.st_size = map_size;
did_grow = 1;
skip_resize:
if (s->spacemap)
munmap(s->spacemap, s->map_size);
s->map_size = st.st_size;
s->spacemap = mmap(NULL, s->map_size, PROT_READ | PROT_WRITE, MAP_SHARED, s->mapfd, 0);
if (s->spacemap == MAP_FAILED) {
s->spacemap = NULL; /* for munmap check */
s->map_size = 0;
ret = AVERROR(errno);
goto fail;
}
/* fl.l_type is set to F_UNLCK only after successful lock */
if (fl.l_type == F_UNLCK)
fcntl(s->mapfd, F_SETLK, &fl);
return did_grow;
fail:
if (fl.l_type == F_UNLCK)
fcntl(s->mapfd, F_SETLK, &fl);
av_log(h, AV_LOG_ERROR, "Failed to resize space map: %s\n", av_err2str(ret));
return ret;
}
static int spacemap_grow(URLContext *h, int64_t block)
{
SharedContext *s = h->priv_data;
int64_t num_blocks = block + 1;
size_t map_bytes = sizeof(Spacemap) + num_blocks * sizeof(Block);
/* When streaming files without known size, round up the number of blocks
* to the nearest multiple of the block size to reduce the rate of resizes */
if (!get_filesize(h)) {
av_assert0(s->block_size > 0);
map_bytes = FFALIGN(map_bytes, (int64_t) s->block_size);
}
if (map_bytes < num_blocks)
return AVERROR(EINVAL); /* overflow */
const off_t old_size = s->map_size;
int ret = spacemap_remap(h, map_bytes);
if (ret < 0)
return ret;
/* Report new size after successful grow */
if (s->map_size > old_size) {
num_blocks = (s->map_size - sizeof(Spacemap)) / sizeof(Block);
av_log(h, AV_LOG_DEBUG,
"%s %zu bytes, capacity: %"PRId64" blocks = %zu MB\n",
ret ? "Resized spacemap to" : "Mapped spacemap with",
(size_t) s->map_size, num_blocks,
(num_blocks * (int64_t) s->block_size) >> 20);
}
return 0;
}
static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE])
{
SharedContext *s = h->priv_data;
int ret;
ret = spacemap_remap(h, sizeof(Spacemap));
if (ret < 0)
return ret;
if ((ret = set_once_uint(&s->spacemap->header_magic, HEADER_MAGIC)) < 0 ||
(ret = set_once_ushort(&s->spacemap->version, HEADER_VERSION)) < 0)
{
av_log(h, AV_LOG_ERROR, "Shared cache spacemap header mismatch!\n");
av_log(h, AV_LOG_ERROR, " Expected magic: 0x%X, version: %d\n",
HEADER_MAGIC, HEADER_VERSION);
av_log(h, AV_LOG_ERROR, " Got magic: 0x%X, version: %d\n",
atomic_load(&s->spacemap->header_magic),
atomic_load(&s->spacemap->version));
return ret;
}
ret = set_once_ushort(&s->spacemap->block_shift, s->block_shift);
if (ret < 0) {
const int shift = atomic_load(&s->spacemap->block_shift);
av_log(h, AV_LOG_WARNING, "Shared cache uses block shift %d, "
"but requested block shift is %d.\n", shift, s->block_shift);
if (shift < 9 || shift > 30) {
av_log(h, AV_LOG_ERROR, "Invalid block shift %d in cache file!\n", shift);
return AVERROR(EINVAL);
}
}
for (int i = 0; i < HASH_SIZE; i++) {
ret = set_once_uchar(&s->spacemap->hash[i], hash[i]);
if (ret < 0) {
av_log(h, AV_LOG_ERROR, "Shared cache spacemap hash mismatch!\n");
av_log(h, AV_LOG_ERROR, " Expected hash: ");
for (int j = 0; j < 32; j++)
av_log(h, AV_LOG_ERROR, "%02X", hash[j]);
av_log(h, AV_LOG_ERROR, "\n Got hash: ");
for (int j = 0; j < 32; j++)
av_log(h, AV_LOG_ERROR, "%02X", atomic_load(&s->spacemap->hash[j]));
av_log(h, AV_LOG_ERROR, "\n");
return ret;
}
}
if (ret) /* set_once() return 1 if this is the first time setting the value */
av_log(h, AV_LOG_DEBUG, "Initialized new cache spacemap.\n");
return ret;
}
static int read_cache(SharedContext *s, uint8_t *buf, size_t size, off_t offset)
{
if (s->cache_data) {
av_assert1(offset + size <= s->cache_size);
memcpy(buf, s->cache_data + offset, size);
return 0;
}
while (size) {
ssize_t ret = pread(s->fd, buf, size, offset);
if (ret <= 0)
return ret ? AVERROR(errno) : AVERROR(EIO);
buf += ret;
offset += ret;
size -= ret;
}
return 0;
}
static int write_cache(SharedContext *s, const uint8_t *buf, size_t size, off_t offset)
{
if (s->cache_data) {
av_assert1(offset + size <= s->cache_size);
memcpy(s->cache_data + offset, buf, size);
return 0;
}
while (size) {
ssize_t ret = pwrite(s->fd, buf, size, offset);
if (ret <= 0)
return ret ? AVERROR(errno) : AVERROR(EIO);
buf += ret;
offset += ret;
size -= ret;
}
return 0;
}
static size_t clamp_size(URLContext *h, size_t size, int64_t pos)
{
const int64_t filesize = get_filesize(h);
if (!filesize)
return size;
else if (pos > filesize)
return 0;
else
return FFMIN(filesize - pos, size);
}
static int shared_read(URLContext *h, unsigned char *buf, int size)
{
SharedContext *s = h->priv_data;
uint8_t *tmp;
int ret;
if (size <= 0)
return 0;
size = clamp_size(h, size, s->pos);
if (size <= 0)
return AVERROR_EOF;
const int shift = atomic_load_explicit(&s->spacemap->block_shift, memory_order_relaxed);
const int64_t block_id = s->pos >> shift;
const int64_t offset = s->pos & (s->block_size - 1);
const int64_t block_pos = block_id * s->block_size;
int block_size = clamp_size(h, s->block_size, block_pos);
ret = spacemap_grow(h, block_id);
if (ret < 0)
return ret;
Block *const block = &s->spacemap->blocks[block_id];
unsigned short state = atomic_load_explicit(&block->state, memory_order_acquire);
int64_t pending_since = 0;
int verify_read = 0;
retry:
switch (state) {
default:
/* We always need to read the entire block to verify integrity */
block_size = clamp_size(h, block_size, block_pos); /* filesize may have changed */
if (s->cache_data) {
av_assert1(block_pos + block_size <= s->cache_size);
tmp = s->cache_data + block_pos;
} else {
tmp = s->tmp_buf;
ret = read_cache(s, tmp, block_size, block_pos);
if (ret < 0) {
av_log(h, AV_LOG_ERROR, "Failed to read from cache file: %s\n", av_err2str(ret));
return ret;
}
}
uint16_t crc = get_block_crc(tmp, block_size);
if (crc != state) {
av_log(h, AV_LOG_ERROR, "Cache corruption detected for block 0x%"PRIx64" at "
"offset 0x%"PRIx64": expected CRC: 0x%04X, got: 0x%04X\n",
block_id, block_pos, state, crc);
return AVERROR(EIO);
}
tmp += (ptrdiff_t) offset;
size = FFMIN(size, block_size - offset);
if (s->verify) {
verify_read = 1;
break; /* fall through to the cache miss logic */
}
memcpy(buf, tmp, size);
s->nb_hit++;
s->pos += size;
return size;
case BLOCK_FAILED:
if (!s->retry_errors)
return AVERROR(EIO);
av_fallthrough;
case BLOCK_NONE:
if (s->read_only)
break; /* don't mark block as pending */
if (atomic_compare_exchange_weak_explicit(&block->state, &state,
BLOCK_PENDING,
memory_order_acquire,
memory_order_acquire))
{
/* Acquired pending state, proceed to fetch the block */
state = BLOCK_PENDING;
break;
}
/* CAS failed, another thread changed the state; reload it */
goto retry;
case BLOCK_PENDING:
/* Another thread is busy fetching this block, wait for it to finish */
if (!s->timeout) {
break; /* no timeout requested, immediately race to fetch block */
} else if (pending_since) {
int64_t new = av_gettime_relative();
if (new - pending_since >= s->timeout)
break; /* timeout expired, try to fetch the block ourselves */
} else {
pending_since = av_gettime_relative();
}
/* Make sure we try a few times before giving up */
av_usleep(s->timeout >> 4);
state = atomic_load_explicit(&block->state, memory_order_acquire);
goto retry;
}
/* Cache miss, fetch this block from underlying protocol */
s->nb_miss++;
const int read_only = s->read_only || s->write_err || verify_read;
int64_t inner_pos = read_only ? s->pos : block_pos;
if (s->inner_pos != inner_pos) {
inner_pos = ffurl_seek(s->inner, inner_pos, SEEK_SET);
if (inner_pos < 0) {
av_log(h, AV_LOG_ERROR, "Failed to seek underlying protocol: %s\n",
av_err2str(inner_pos));
if (!read_only) {
/* Release pending state to avoid stalling other threads. Don't
* mark this as failed, since the seek error may be unrelated to
* the block and should probably be tried again. */
atomic_compare_exchange_strong_explicit(&block->state, &state,
BLOCK_NONE,
memory_order_relaxed,
memory_order_relaxed);
}
return inner_pos;
}
av_log(h, AV_LOG_DEBUG, "Inner seek to 0x%"PRIx64"\n", inner_pos);
s->inner_pos = inner_pos;
}
if (read_only) {
/* Directly defer to the underlying protocol */
ret = ffurl_read(s->inner, buf, size);
if (ret < 0)
return ret;
/* Verify the read data against the cached data if requested */
if (verify_read && memcmp(buf, tmp, ret)) {
av_log(h, AV_LOG_ERROR, "Cache verification failed for %d bytes "
"in block 0x%"PRIx64" at offset 0x%"PRIx64" + %"PRId64"!\n",
ret, block_id, block_pos, offset);
}
s->pos = s->inner_pos = inner_pos + ret;
return ret;
}
int write_back = 1;
if (s->cache_data) {
/* Read directly into memory mapped cache file */
tmp = s->cache_data + block_pos;
write_back = 0;
} else if (size >= block_size && !offset) {
/* Read directly into output buffer if aligned and large enough */
tmp = buf;
} else {
/* Read into temporary buffer and copy later */
tmp = s->tmp_buf;
}
/* Try and fetch the entire block */
av_assert0(inner_pos == block_pos);
int bytes_read = 0;
while (bytes_read < block_size) {
ret = ffurl_read(s->inner, &tmp[bytes_read], block_size - bytes_read);
if (!ret || ret == AVERROR_EOF)
break;
else if (ret < 0) {
av_log(h, AV_LOG_ERROR, "Failed to read block 0x%"PRIx64": %s\n",
block_id, av_err2str(ret));
/* Try to mark block as failed; ignore errors - any mismatch
* here will mean that either another thread already marked it
* as failed, or successfully cached it in the meantime */
atomic_compare_exchange_strong_explicit(&block->state, &state,
BLOCK_FAILED,
memory_order_relaxed,
memory_order_relaxed);
return ret;
}
bytes_read += ret;
s->inner_pos += ret;
}
if (bytes_read < block_size) {
/* Learned location of true EOF, update filesize */
ret = set_filesize(h, inner_pos + bytes_read);
if (ret < 0)
return ret;
}
if (bytes_read > 0) {
ret = write_back ? write_cache(s, tmp, bytes_read, block_pos) : 0;
if (ret < 0) {
av_log(h, AV_LOG_ERROR, "Failed to write to cache file: %s\n",
av_err2str(ret));
s->write_err = 1;
/* Mark as NONE, not FAILED, since the block itself is fine -
* just absent from the cache. */
atomic_compare_exchange_strong_explicit(&block->state, &state,
BLOCK_NONE,
memory_order_relaxed,
memory_order_relaxed);
} else {
uint16_t crc = get_block_crc(tmp, bytes_read);
av_log(h, AV_LOG_TRACE, "Cached %d bytes to block 0x%"PRIx64" at "
"offset 0x%"PRIx64", CRC 0x%04X\n", bytes_read, block_id,
block_pos, crc);
atomic_store_explicit(&block->state, crc, memory_order_release);
}
} else {
return AVERROR_EOF;
}
size = FFMIN(bytes_read - offset, size);
av_assert0(size > 0);
if (tmp != buf)
memcpy(buf, &tmp[offset], size);
s->pos += size;
return size;
}
static int64_t shared_seek(URLContext *h, int64_t pos, int whence)
{
SharedContext *s = h->priv_data;
const int64_t filesize = get_filesize(h);
int64_t res;
switch (whence) {
case AVSEEK_SIZE:
if (filesize)
return filesize;
res = ffurl_seek(s->inner, pos, whence);
if (res > 0)
set_filesize(h, res);
return res;
case SEEK_SET:
break;
case SEEK_CUR:
pos += s->pos;
break;
case SEEK_END:
if (filesize) {
pos += filesize;
break;
}
/* Defer to underlying protocol if filesize is unknown */
res = ffurl_seek(s->inner, pos, whence);
if (res < 0)
return res;
set_filesize(h, res - pos); /* Opportunistically update known filesize */
av_log(h, AV_LOG_DEBUG, "Inner seek to 0x%"PRIx64"\n", res);
return s->pos = s->inner_pos = res;
default:
return AVERROR(EINVAL);
}
if (pos < 0)
return AVERROR(EINVAL);
av_log(h, AV_LOG_DEBUG, "Virtual seek to 0x%"PRIx64"\n", pos);
return s->pos = pos;
}
static int shared_get_file_handle(URLContext *h)
{
SharedContext *s = h->priv_data;
return ffurl_get_file_handle(s->inner);
}
static int shared_get_short_seek(URLContext *h)
{
SharedContext *s = h->priv_data;
int ret = ffurl_get_short_seek(s->inner);
if (ret < 0)
return ret;
return FFMAX(ret, s->block_size);
}
#define OFFSET(x) offsetof(SharedContext, x)
#define D AV_OPT_FLAG_DECODING_PARAM
static const AVOption options[] = {
{ "cache_dir", "Directory path for shared file cache", OFFSET(cache_dir), AV_OPT_TYPE_STRING, {.str = NULL}, .flags = D },
{ "block_shift", "Set the base 2 logarithm of the block size", OFFSET(block_shift), AV_OPT_TYPE_INT, {.i64 = 15}, 9, 30, .flags = D },
{ "read_only", "Don't write data to the cache, only read from it", OFFSET(read_only), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, .flags = D },
{ "cache_verify", "Verify correctness of the cache against the source", OFFSET(verify), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, .flags = D },
{ "cache_timeout", "Time in us to wait before re-fetching pending blocks", OFFSET(timeout), AV_OPT_TYPE_INT64, {.i64 = 0}, 0, INT64_MAX, .flags = D },
{ "retry_errors", "Re-request blocks even if they previously failed", OFFSET(retry_errors), AV_OPT_TYPE_BOOL, {.i64 = 1}, 0, 1, .flags = D },
{0},
};
static const AVClass shared_context_class = {
.class_name = "shared",
.item_name = av_default_item_name,
.option = options,
.version = LIBAVUTIL_VERSION_INT,
};
const URLProtocol ff_shared_protocol = {
.name = "shared",
.url_open2 = shared_open,
.url_read = shared_read,
.url_seek = shared_seek,
.url_close = shared_close,
.url_get_file_handle = shared_get_file_handle,
.url_get_short_seek = shared_get_short_seek,
.priv_data_size = sizeof(SharedContext),
.priv_data_class = &shared_context_class,
};