stream: turn into a ring buffer, make size configurable

In some corner cases (see #6802), it can be beneficial to use a larger
stream buffer size. Use this as argument to rewrite everything for no
reason.

Turn stream.c itself into a ring buffer, with configurable size. The
latter would have been easily achievable with minimal changes, and the
ring buffer is the hard part. There is no reason to have a ring buffer
at all, except possibly if ffmpeg don't fix their awful mp4 demuxer, and
some subtle issues with demux_mkv.c wanting to seek back by small
offsets (the latter was handled with small stream_peek() calls, which
are unneeded now).

In addition, this turns small forward seeks into reads (where data is
simply skipped). Before this commit, only stream_skip() did this (which
also mean that stream_skip() simply calls stream_seek() now).

Replace all stream_peek() calls with something else (usually
stream_read_peek()). The function was a problem, because it returned a
pointer to the internal buffer, which is now a ring buffer with
wrapping. The new function just copies the data into a buffer, and in
some cases requires callers to dynamically allocate memory. (The most
common case, demux_lavf.c, required a separate buffer allocation anyway
due to FFmpeg "idiosyncrasies".) This is the bulk of the demuxer_*
changes.

I'm not happy with this. There still isn't a good reason why there
should be a ring buffer, that is complex, and most of the time just
wastes half of the available memory. Maybe another rewrite soon.

It also contains bugs; you're an alpha tester now.
This commit is contained in:
wm4
2019-11-06 21:36:02 +01:00
parent abb089431d
commit f37f4de849
13 changed files with 314 additions and 175 deletions

View File

@@ -4196,6 +4196,32 @@ Cache
Currently, this is used for ``--cache-on-disk`` only. Currently, this is used for ``--cache-on-disk`` only.
``--stream-buffer-size=<bytesize>``
Size of the low level stream byte buffer (default: 4KB). This is used as
buffer between demuxer and low level I/O (e.g. sockets). Generally, this
can be very small, and the main purpose is similar to the internal buffer
FILE in the C standard library will have.
Half of the buffer is always used for guaranteed seek back, which is
important for unseekable input.
There are known cases where this can help performance to set a large buffer:
1. mp4 files. libavformat may trigger many small seeks in both
directions, depending on how the file was muxed.
2. Certain network filesystems, which do not have a cache, and where
small reads can be inefficient.
In other cases, setting this to a large value can reduce performance.
Usually, read accesses are at half the buffer size, but it may happen that
accesses are done alternating with smaller and larger sizes (this is due to
the internal ring buffer wrap-around).
See ``--list-options`` for defaults and value range. ``<bytesize>`` options
accept suffixes such as ``KiB`` and ``MiB``.
Network Network
------- -------

View File

@@ -265,8 +265,9 @@ static int try_open_file(struct demuxer *demuxer, enum demux_check check)
struct stream *s = demuxer->stream; struct stream *s = demuxer->stream;
if (check >= DEMUX_CHECK_UNSAFE) { if (check >= DEMUX_CHECK_UNSAFE) {
bstr d = stream_peek(s, PROBE_SIZE); char probe[PROBE_SIZE];
if (d.len < 1 || !mp_probe_cue(d)) int len = stream_read_peek(s, probe, sizeof(probe));
if (len < 1 || !mp_probe_cue((bstr){probe, len}))
return -1; return -1;
} }
struct priv *p = talloc_zero(demuxer, struct priv); struct priv *p = talloc_zero(demuxer, struct priv);

View File

@@ -321,7 +321,7 @@ static int d_open(demuxer_t *demuxer, enum demux_check check)
// Initialize the playback time. We need to read _some_ data to get the // Initialize the playback time. We need to read _some_ data to get the
// correct stream-layer time (at least with libdvdnav). // correct stream-layer time (at least with libdvdnav).
stream_peek(demuxer->stream, 1); stream_read_peek(demuxer->stream, &(char){0}, 1);
reset_pts(demuxer); reset_pts(demuxer);
p->slave = demux_open_url("-", &params, demuxer->cancel, demuxer->global); p->slave = demux_open_url("-", &params, demuxer->cancel, demuxer->global);

View File

@@ -459,7 +459,9 @@ static int try_open_file(struct demuxer *demuxer, enum demux_check check)
return 0; return 0;
} }
if (check >= DEMUX_CHECK_UNSAFE) { if (check >= DEMUX_CHECK_UNSAFE) {
if (!bstr_equals0(stream_peek(s, strlen(HEADER)), HEADER)) char header[sizeof(HEADER) - 1];
int len = stream_read_peek(s, header, sizeof(header));
if (len != strlen(HEADER) || memcmp(header, HEADER, len) != 0)
return -1; return -1;
} }
p->data = stream_read_complete(s, demuxer, 1000000); p->data = stream_read_complete(s, demuxer, 1000000);

View File

@@ -458,11 +458,10 @@ static int lavf_check_file(demuxer_t *demuxer, enum demux_check check)
} else { } else {
int nsize = av_clip(avpd.buf_size * 2, INITIAL_PROBE_SIZE, int nsize = av_clip(avpd.buf_size * 2, INITIAL_PROBE_SIZE,
PROBE_BUF_SIZE); PROBE_BUF_SIZE);
bstr buf = stream_peek(s, nsize); nsize = stream_read_peek(s, avpd.buf, nsize);
if (buf.len <= avpd.buf_size) if (nsize <= avpd.buf_size)
final_probe = true; final_probe = true;
memcpy(avpd.buf, buf.start, buf.len); avpd.buf_size = nsize;
avpd.buf_size = buf.len;
priv->avif = av_probe_input_format2(&avpd, avpd.buf_size > 0, &score); priv->avif = av_probe_input_format2(&avpd, avpd.buf_size > 0, &score);
} }

View File

@@ -43,15 +43,17 @@ static int open_file(struct demuxer *demuxer, enum demux_check check)
probe_size *= 100; probe_size *= 100;
} }
bstr probe = stream_peek(demuxer->stream, probe_size); void *probe = ta_alloc_size(NULL, probe_size);
if (probe.len == 0) if (!probe)
return -1; return -1;
int probe_got = stream_read_peek(demuxer->stream, probe, probe_size);
struct stream *probe_stream = struct stream *probe_stream =
stream_memory_open(demuxer->global, probe.start, probe.len); stream_memory_open(demuxer->global, probe, probe_got);
struct mp_archive *mpa = mp_archive_new(mp_null_log, probe_stream, flags); struct mp_archive *mpa = mp_archive_new(mp_null_log, probe_stream, flags);
bool ok = !!mpa; bool ok = !!mpa;
free_stream(probe_stream); free_stream(probe_stream);
mp_archive_free(mpa); mp_archive_free(mpa);
ta_free(probe);
if (!ok) if (!ok)
return -1; return -1;

View File

@@ -1999,13 +1999,9 @@ static int demux_mkv_open(demuxer_t *demuxer, enum demux_check check)
if (demuxer->params) if (demuxer->params)
mkv_d->probably_webm_dash_init = demuxer->params->init_fragment.len > 0; mkv_d->probably_webm_dash_init = demuxer->params->init_fragment.len > 0;
bstr start = stream_peek(s, 4); // Make sure you can seek back after read_ebml_header() if no EBML ID.
uint32_t start_id = 0; if (stream_read_peek(s, &(char[4]){0}, 4) != 4)
for (int n = 0; n < start.len; n++)
start_id = (start_id << 8) | start.start[n];
if (start_id != EBML_ID_EBML)
return -1; return -1;
if (!read_ebml_header(demuxer)) if (!read_ebml_header(demuxer))
return -1; return -1;
MP_DBG(demuxer, "Found the head...\n"); MP_DBG(demuxer, "Found the head...\n");
@@ -2027,7 +2023,6 @@ static int demux_mkv_open(demuxer_t *demuxer, enum demux_check check)
while (1) { while (1) {
start_pos = stream_tell(s); start_pos = stream_tell(s);
stream_peek(s, 4); // make sure we can always seek back
uint32_t id = ebml_read_id(s); uint32_t id = ebml_read_id(s);
if (s->eof) { if (s->eof) {
if (!mkv_d->probably_webm_dash_init) if (!mkv_d->probably_webm_dash_init)
@@ -2836,7 +2831,6 @@ static int read_next_block_into_queue(demuxer_t *demuxer)
find_next_cluster: find_next_cluster:
mkv_d->cluster_end = 0; mkv_d->cluster_end = 0;
for (;;) { for (;;) {
stream_peek(s, 4); // guarantee we can undo ebml_read_id() below
mkv_d->cluster_start = stream_tell(s); mkv_d->cluster_start = stream_tell(s);
uint32_t id = ebml_read_id(s); uint32_t id = ebml_read_id(s);
if (id == MATROSKA_ID_CLUSTER) if (id == MATROSKA_ID_CLUSTER)

View File

@@ -92,12 +92,13 @@ static int read_characters(stream_t *s, uint8_t *dst, int dstsize, int utf16)
} }
return cur - dst; return cur - dst;
} else { } else {
bstr buf = stream_peek_buffer(s); uint8_t buf[1024];
uint8_t *end = memchr(buf.start, '\n', buf.len); int buf_len = stream_read_peek(s, buf, sizeof(buf));
int len = end ? end - buf.start + 1 : buf.len; uint8_t *end = memchr(buf, '\n', buf_len);
int len = end ? end - buf + 1 : buf_len;
if (len > dstsize) if (len > dstsize)
return -1; // line too long return -1; // line too long
memcpy(dst, buf.start, len); memcpy(dst, buf, len);
stream_skip(s, len); stream_skip(s, len);
return len; return len;
} }
@@ -178,7 +179,9 @@ static int parse_m3u(struct pl_parser *p)
// Last resort: if the file extension is m3u, it might be headerless. // Last resort: if the file extension is m3u, it might be headerless.
if (p->check_level == DEMUX_CHECK_UNSAFE) { if (p->check_level == DEMUX_CHECK_UNSAFE) {
char *ext = mp_splitext(p->real_stream->url, NULL); char *ext = mp_splitext(p->real_stream->url, NULL);
bstr data = stream_peek(p->real_stream, PROBE_SIZE); char probe[PROBE_SIZE];
int len = stream_read_peek(p->real_stream, probe, sizeof(probe));
bstr data = {probe, len};
if (ext && data.len > 10 && maybe_text(data)) { if (ext && data.len > 10 && maybe_text(data)) {
const char *exts[] = {"m3u", "m3u8", NULL}; const char *exts[] = {"m3u", "m3u8", NULL};
for (int n = 0; exts[n]; n++) { for (int n = 0; exts[n]; n++) {
@@ -437,8 +440,9 @@ static int open_file(struct demuxer *demuxer, enum demux_check check)
p->real_stream = demuxer->stream; p->real_stream = demuxer->stream;
p->add_base = true; p->add_base = true;
bstr probe_buf = stream_peek(demuxer->stream, PROBE_SIZE); char probe[PROBE_SIZE];
p->s = stream_memory_open(demuxer->global, probe_buf.start, probe_buf.len); int probe_len = stream_read_peek(p->real_stream, probe, sizeof(probe));
p->s = stream_memory_open(demuxer->global, probe, probe_len);
p->s->mime_type = demuxer->stream->mime_type; p->s->mime_type = demuxer->stream->mime_type;
p->utf16 = stream_skip_bom(p->s); p->utf16 = stream_skip_bom(p->s);
p->force = force; p->force = force;

View File

@@ -711,6 +711,7 @@ const m_option_t mp_opts[] = {
OPT_SUBSTRUCT("", vo, vo_sub_opts, 0), OPT_SUBSTRUCT("", vo, vo_sub_opts, 0),
OPT_SUBSTRUCT("", demux_opts, demux_conf, 0), OPT_SUBSTRUCT("", demux_opts, demux_conf, 0),
OPT_SUBSTRUCT("", demux_cache_opts, demux_cache_conf, 0), OPT_SUBSTRUCT("", demux_cache_opts, demux_cache_conf, 0),
OPT_SUBSTRUCT("", stream_opts, stream_conf, 0),
OPT_SUBSTRUCT("", gl_video_opts, gl_video_conf, 0), OPT_SUBSTRUCT("", gl_video_opts, gl_video_conf, 0),
OPT_SUBSTRUCT("", spirv_opts, spirv_conf, 0), OPT_SUBSTRUCT("", spirv_opts, spirv_conf, 0),

View File

@@ -307,6 +307,7 @@ typedef struct MPOpts {
struct demux_opts *demux_opts; struct demux_opts *demux_opts;
struct demux_cache_opts *demux_cache_opts; struct demux_cache_opts *demux_cache_opts;
struct stream_opts *stream_opts;
struct vd_lavc_params *vd_lavc_params; struct vd_lavc_params *vd_lavc_params;
struct ad_lavc_params *ad_lavc_params; struct ad_lavc_params *ad_lavc_params;
@@ -360,5 +361,6 @@ extern const struct m_sub_options mp_subtitle_sub_opts;
extern const struct m_sub_options mp_osd_render_sub_opts; extern const struct m_sub_options mp_osd_render_sub_opts;
extern const struct m_sub_options filter_conf; extern const struct m_sub_options filter_conf;
extern const struct m_sub_options resample_conf; extern const struct m_sub_options resample_conf;
extern const struct m_sub_options stream_conf;
#endif #endif

View File

@@ -33,6 +33,7 @@
#include "misc/bstr.h" #include "misc/bstr.h"
#include "misc/thread_tools.h" #include "misc/thread_tools.h"
#include "common/msg.h" #include "common/msg.h"
#include "options/m_config.h"
#include "options/options.h" #include "options/options.h"
#include "options/path.h" #include "options/path.h"
#include "osdep/timer.h" #include "osdep/timer.h"
@@ -94,7 +95,23 @@ static const stream_info_t *const stream_list[] = {
NULL NULL
}; };
static bool stream_seek_unbuffered(stream_t *s, int64_t newpos); struct stream_opts {
int64_t buffer_size;
};
#define OPT_BASE_STRUCT struct stream_opts
const struct m_sub_options stream_conf = {
.opts = (const struct m_option[]){
OPT_BYTE_SIZE("stream-buffer-size", buffer_size, 0,
STREAM_FIXED_BUFFER_SIZE, 512 * 1024 * 1024),
{0}
},
.size = sizeof(struct stream_opts),
.defaults = &(const struct stream_opts){
.buffer_size = STREAM_FIXED_BUFFER_SIZE,
},
};
// return -1 if not hex char // return -1 if not hex char
static int hex2dec(char c) static int hex2dec(char c)
@@ -181,35 +198,91 @@ static const char *match_proto(const char *url, const char *proto)
return NULL; return NULL;
} }
// Resize the current stream buffer, or do nothing if the size is adequate. // Read len bytes from the start position, and wrap around as needed. Limit the
// Caller must ensure the used buffer is not less than the new buffer size. // a actually read data to the size of the buffer. Return amount of copied bytes.
// Calling this with 0 ensures it uses the default buffer size. // len: max bytes to copy to dst
static void stream_resize_buffer(struct stream *s, int new) // pos: index into s->buffer[], e.g. s->buf_start is byte 0
// returns: bytes copied to dst (limited by len and available buffered data)
static int ring_copy(struct stream *s, void *dst, int len, int pos)
{ {
new = MPMAX(new, STREAM_BUFFER_SIZE); assert(len >= 0);
if (new == s->buffer_alloc) if (pos < s->buf_start || pos > s->buf_end)
return; return 0;
int buffer_used = s->buf_len - s->buf_pos; int copied = 0;
assert(buffer_used <= new); len = MPMIN(len, s->buf_end - pos);
void *nbuf = s->buffer_inline; if (len && pos <= s->buffer_mask) {
if (new > STREAM_BUFFER_SIZE) int copy = MPMIN(len, s->buffer_mask + 1 - pos);
nbuf = ta_alloc_size(s, new); memcpy(dst, &s->buffer[pos], copy);
copied += copy;
if (nbuf) { len -= copy;
if (s->buffer) pos += copy;
memmove(nbuf, &s->buffer[s->buf_pos], buffer_used);
s->buf_pos = 0;
s->buf_len = buffer_used;
if (s->buffer != s->buffer_inline)
ta_free(s->buffer);
s->buffer = nbuf;
s->buffer_alloc = new;
} }
if (len) {
memcpy((char *)dst + copied, &s->buffer[pos & s->buffer_mask], len);
copied += len;
}
return copied;
}
// Resize the current stream buffer. Uses a larger size if needed to keep data.
// Does nothing if the size is adequate. Calling this with 0 ensures it uses the
// default buffer size if possible.
// The caller must check whether enough data was really allocated.
// Returns false if buffer allocation failed.
static bool stream_resize_buffer(struct stream *s, uint32_t new)
{
// Keep all valid buffer.
int old_used_len = s->buf_end - s->buf_start;
int old_pos = s->buf_cur - s->buf_start;
new = MPMAX(new, old_used_len);
new = MPMAX(new, s->requested_buffer_size);
// This much is always required.
new = MPMAX(new, STREAM_FIXED_BUFFER_SIZE);
new = mp_round_next_power_of_2(new);
if (!new || new > INT_MAX / 8)
return false;
if (new == s->buffer_mask + 1)
return true;
MP_DBG(s, "resize stream to %d bytes\n", new);
uint8_t *nbuf = s->buffer_inline;
if (new > STREAM_FIXED_BUFFER_SIZE) {
nbuf = ta_alloc_size(s, new);
} else {
static_assert(MP_IS_POWER_OF_2(STREAM_FIXED_BUFFER_SIZE), "");
assert(new == STREAM_FIXED_BUFFER_SIZE);
}
assert(nbuf != s->buffer);
if (!nbuf)
return false; // oom; tolerate it, caller needs to check if required
int new_len = 0;
if (s->buffer)
new_len = ring_copy(s, nbuf, new, s->buf_start);
assert(new_len == old_used_len);
assert(old_pos <= old_used_len);
s->buf_start = 0;
s->buf_cur = old_pos;
s->buf_end = new_len;
if (s->buffer != s->buffer_inline)
ta_free(s->buffer);
s->buffer = nbuf;
s->buffer_mask = new - 1;
return true;
} }
static int stream_create_instance(const stream_info_t *sinfo, static int stream_create_instance(const stream_info_t *sinfo,
@@ -236,6 +309,9 @@ static int stream_create_instance(const stream_info_t *sinfo,
if (!path) if (!path)
return STREAM_NO_MATCH; return STREAM_NO_MATCH;
struct stream_opts *opts =
mp_get_config_group(NULL, args->global, &stream_conf);
stream_t *s = talloc_zero(NULL, stream_t); stream_t *s = talloc_zero(NULL, stream_t);
s->global = args->global; s->global = args->global;
if (flags & STREAM_SILENT) { if (flags & STREAM_SILENT) {
@@ -249,11 +325,14 @@ static int stream_create_instance(const stream_info_t *sinfo,
s->path = talloc_strdup(s, path); s->path = talloc_strdup(s, path);
s->is_network = sinfo->is_network; s->is_network = sinfo->is_network;
s->mode = flags & (STREAM_READ | STREAM_WRITE); s->mode = flags & (STREAM_READ | STREAM_WRITE);
s->requested_buffer_size = opts->buffer_size;
int opt; int opt;
mp_read_option_raw(s->global, "access-references", &m_option_type_flag, &opt); mp_read_option_raw(s->global, "access-references", &m_option_type_flag, &opt);
s->access_references = opt; s->access_references = opt;
talloc_free(opts);
MP_VERBOSE(s, "Opening %s\n", url); MP_VERBOSE(s, "Opening %s\n", url);
if (strlen(url) > INT_MAX / 8) { if (strlen(url) > INT_MAX / 8) {
@@ -282,8 +361,10 @@ static int stream_create_instance(const stream_info_t *sinfo,
if (!s->read_chunk) if (!s->read_chunk)
s->read_chunk = 4 * STREAM_BUFFER_SIZE; s->read_chunk = 4 * STREAM_BUFFER_SIZE;
stream_resize_buffer(s, 0); if (!stream_resize_buffer(s, 0)) {
MP_HANDLE_OOM(s->buffer); free_stream(s);
return STREAM_ERROR;
}
assert(s->seekable == !!s->seek); assert(s->seekable == !!s->seek);
@@ -391,78 +472,99 @@ static int stream_read_unbuffered(stream_t *s, void *buf, int len)
return res; return res;
} }
// Ask for having "total" bytes ready to read in the stream buffer. This can do // Ask for having at most "forward" bytes ready to read in the buffer.
// a partial read if requested, so it can actually read less.
// To read everything, you may have to call this in a loop. // To read everything, you may have to call this in a loop.
// total: desired amount of bytes in buffer // forward: desired amount of bytes in buffer after s->cur_pos
// allow_short: if true, attempt at most once to read more if needed // returns: progress (false on EOF or memory allocation failure)
// returns: actual bytes in buffer (can be smaller or larger than total) static bool stream_read_more(struct stream *s, int forward)
static int stream_extend_buffer(struct stream *s, int total, bool allow_short)
{ {
assert(total >= 0); assert(forward >= 0);
if (s->buf_len - s->buf_pos < total) { int forward_avail = s->buf_end - s->buf_cur;
// Move to front to guarantee we really can read up to max size. if (forward_avail >= forward)
s->buf_len = s->buf_len - s->buf_pos; return true;
memmove(s->buffer, &s->buffer[s->buf_pos], s->buf_len);
s->buf_pos = 0;
// Read ahead by about as much as stream_fill_buffer() would, to avoid // Avoid that many small reads will lead to many low-level read calls.
// that many small stream_peek() calls will read the buffer at these forward = MPMAX(forward, s->requested_buffer_size / 2);
// quantities.
total = MPMAX(total, STREAM_BUFFER_SIZE);
// Allocate more if the buffer is too small. Also, if the buffer is // Keep guaranteed seek-back.
// larger than needed, resize it to smaller. This assumes stream_peek() int buf_old = MPMIN(s->buf_cur - s->buf_start, s->requested_buffer_size / 2);
// calls are rare or done with small sizes.
stream_resize_buffer(s, total);
// Read less if allocation above failed. if (!stream_resize_buffer(s, buf_old + forward))
total = MPMIN(total, s->buffer_alloc); return false;
// Fill rest of the buffer. Can be partial. int buf_alloc = s->buffer_mask + 1;
while (total > s->buf_len) {
int read = stream_read_unbuffered(s, &s->buffer[s->buf_len], assert(s->buf_start <= s->buf_cur);
total - s->buf_len); assert(s->buf_cur <= s->buf_end);
s->buf_len += read; assert(s->buf_cur < buf_alloc * 2);
if (allow_short || !read) assert(s->buf_end < buf_alloc * 2);
break; assert(s->buf_start < buf_alloc);
// Note: read as much as possible, even if forward is much smaller. Do
// this because the stream buffer is supposed to set an approx. minimum
// read size on it.
int read = buf_alloc - buf_old - forward_avail; // free buffer past end
int pos = s->buf_end & s->buffer_mask;
read = MPMIN(read, buf_alloc - pos);
// Note: if wrap-around happens, we need to make two calls. This may
// affect latency (e.g. waiting for new data on a socket), so do only
// 1 read call always.
read = stream_read_unbuffered(s, &s->buffer[pos], read);
s->buf_end += read;
// May have overwritten old data.
if (s->buf_end - s->buf_start >= buf_alloc) {
assert(s->buf_end >= buf_alloc);
s->buf_start = s->buf_end - buf_alloc;
assert(s->buf_start <= s->buf_cur);
assert(s->buf_cur <= s->buf_end);
if (s->buf_start >= buf_alloc) {
s->buf_start -= buf_alloc;
s->buf_cur -= buf_alloc;
s->buf_end -= buf_alloc;
} }
if (s->buf_len)
s->eof = 0;
} }
return s->buf_len - s->buf_pos; // Must not have overwritten guaranteed old data.
} assert(s->buf_cur - s->buf_start >= buf_old);
int stream_fill_buffer(stream_t *s) if (s->buf_cur < s->buf_end)
{ s->eof = 0;
return stream_extend_buffer(s, STREAM_BUFFER_SIZE, true);
return !!read;
} }
// Read between 1..buf_size bytes of data, return how much data has been read. // Read between 1..buf_size bytes of data, return how much data has been read.
// Return 0 on EOF, error, or if buf_size was 0. // Return 0 on EOF, error, or if buf_size was 0.
int stream_read_partial(stream_t *s, char *buf, int buf_size) int stream_read_partial(stream_t *s, char *buf, int buf_size)
{ {
assert(s->buf_pos <= s->buf_len); assert(s->buf_cur <= s->buf_end);
assert(buf_size >= 0); assert(buf_size >= 0);
if (s->buf_pos == s->buf_len && buf_size > 0) { if (s->buf_cur == s->buf_end && buf_size > 0) {
s->buf_pos = s->buf_len = 0; if (buf_size > (s->buffer_mask + 1) / 2) {
stream_resize_buffer(s, 0); // Direct read if the buffer is too small anyway.
// Do a direct read stream_drop_buffers(s);
// Also, small reads will be more efficient with buffering & copying
if (buf_size >= STREAM_BUFFER_SIZE)
return stream_read_unbuffered(s, buf, buf_size); return stream_read_unbuffered(s, buf, buf_size);
if (!stream_fill_buffer(s)) }
return 0; stream_read_more(s, 1);
} }
int len = MPMIN(buf_size, s->buf_len - s->buf_pos); int res = ring_copy(s, buf, buf_size, s->buf_cur);
memcpy(buf, &s->buffer[s->buf_pos], len); s->buf_cur += res;
s->buf_pos += len; return res;
if (len > 0) }
s->eof = 0;
return len; // Slow version of stream_read_char(); called by it if the buffer is empty.
int stream_read_char_fallback(stream_t *s)
{
uint8_t c;
return stream_read_partial(s, &c, 1) ? c : -256;
} }
int stream_read(stream_t *s, char *mem, int total) int stream_read(stream_t *s, char *mem, int total)
@@ -476,33 +578,18 @@ int stream_read(stream_t *s, char *mem, int total)
len -= read; len -= read;
} }
total -= len; total -= len;
if (total > 0)
s->eof = 0;
return total; return total;
} }
// Read ahead at most len bytes without changing the read position. Return a // Like stream_read(), but do not advance the current position. This may resize
// pointer to the internal buffer, starting from the current read position. // the buffer to satisfy the read request.
// Reading ahead may require memory allocation. If allocation fails, read ahead int stream_read_peek(stream_t *s, void* buf, int buf_size)
// is silently limited to the last successful allocation.
// The returned buffer becomes invalid on the next stream call, and you must
// not write to it.
struct bstr stream_peek(stream_t *s, int len)
{ {
assert(len >= 0); while (s->buf_end - s->buf_cur < buf_size) {
if (!stream_read_more(s, buf_size))
int avail = stream_extend_buffer(s, len, false); break;
return (bstr){.start = &s->buffer[s->buf_pos], .len = MPMIN(len, avail)}; }
} return ring_copy(s, buf, buf_size, s->buf_cur);
// Peek the current buffer. This will return at least 1 byte, unless EOF was
// reached. If data is returned, the length is essentially random.
struct bstr stream_peek_buffer(stream_t *s)
{
if (s->buf_len - s->buf_pos < 1)
stream_fill_buffer(s);
return (bstr){.start = &s->buffer[s->buf_pos],
.len = s->buf_len - s->buf_pos};
} }
int stream_write_buffer(stream_t *s, unsigned char *buf, int len) int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
@@ -527,14 +614,14 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
static bool stream_skip_read(struct stream *s, int64_t len) static bool stream_skip_read(struct stream *s, int64_t len)
{ {
while (len > 0) { while (len > 0) {
unsigned int left = s->buf_len - s->buf_pos; unsigned int left = s->buf_end - s->buf_cur;
if (!left) { if (!left) {
if (!stream_fill_buffer(s)) if (!stream_read_more(s, 1))
return false; return false;
continue; continue;
} }
unsigned skip = MPMIN(len, left); unsigned skip = MPMIN(len, left);
s->buf_pos += skip; s->buf_cur += skip;
len -= skip; len -= skip;
} }
return true; return true;
@@ -546,7 +633,7 @@ static bool stream_skip_read(struct stream *s, int64_t len)
void stream_drop_buffers(stream_t *s) void stream_drop_buffers(stream_t *s)
{ {
s->pos = stream_tell(s); s->pos = stream_tell(s);
s->buf_pos = s->buf_len = 0; s->buf_start = s->buf_cur = s->buf_end = 0;
s->eof = 0; s->eof = 0;
stream_resize_buffer(s, 0); stream_resize_buffer(s, 0);
} }
@@ -555,6 +642,9 @@ void stream_drop_buffers(stream_t *s)
static bool stream_seek_unbuffered(stream_t *s, int64_t newpos) static bool stream_seek_unbuffered(stream_t *s, int64_t newpos)
{ {
if (newpos != s->pos) { if (newpos != s->pos) {
MP_VERBOSE(s, "stream level seek from %" PRId64 " to %" PRId64 "\n",
s->pos, newpos);
if (newpos > s->pos && !s->seekable) { if (newpos > s->pos && !s->seekable) {
MP_ERR(s, "Cannot seek forward in this stream\n"); MP_ERR(s, "Cannot seek forward in this stream\n");
return false; return false;
@@ -577,7 +667,8 @@ static bool stream_seek_unbuffered(stream_t *s, int64_t newpos)
bool stream_seek(stream_t *s, int64_t pos) bool stream_seek(stream_t *s, int64_t pos)
{ {
MP_TRACE(s, "seek to %lld\n", (long long)pos); MP_TRACE(s, "seek request from %" PRId64 " to %" PRId64 "\n",
stream_tell(s), pos);
s->eof = 0; // eof should be set only on read; seeking always clears it s->eof = 0; // eof should be set only on read; seeking always clears it
@@ -586,14 +677,12 @@ bool stream_seek(stream_t *s, int64_t pos)
pos = 0; pos = 0;
} }
if (pos == stream_tell(s)) if (pos <= s->pos) {
return true; int64_t x = pos - (s->pos - (int)s->buf_end);
if (x >= (int)s->buf_start) {
if (pos < s->pos) { s->buf_cur = x;
int64_t x = pos - (s->pos - (int)s->buf_len); assert(s->buf_cur >= s->buf_start);
if (x >= 0) { assert(s->buf_cur <= s->buf_end);
s->buf_pos = x;
assert(s->buf_pos <= s->buf_len);
return true; return true;
} }
} }
@@ -601,35 +690,24 @@ bool stream_seek(stream_t *s, int64_t pos)
if (s->mode == STREAM_WRITE) if (s->mode == STREAM_WRITE)
return s->seekable && s->seek(s, pos); return s->seekable && s->seek(s, pos);
int64_t newpos = pos; // Skip data instead of performing a seek in some cases.
if (pos >= s->pos &&
MP_TRACE(s, "Seek from %" PRId64 " to %" PRId64 ((!s->seekable && s->fast_skip) ||
" (with offset %d)\n", s->pos, pos, (int)(pos - newpos)); pos - s->pos <= s->requested_buffer_size))
{
if (pos >= s->pos && !s->seekable && s->fast_skip) { return stream_skip_read(s, pos - stream_tell(s));
// skipping is handled by generic code below
} else if (!stream_seek_unbuffered(s, newpos)) {
return false;
} }
return stream_skip_read(s, pos - stream_tell(s)); return stream_seek_unbuffered(s, pos);
} }
bool stream_skip(stream_t *s, int64_t len) bool stream_skip(stream_t *s, int64_t len)
{ {
int64_t target = stream_tell(s) + len; if (!stream_seek(s, stream_tell(s) + len))
if (len < 0) return false;
return stream_seek(s, target); // Make sure s->eof is set correctly, even if a seek happened.
if (len > 2 * STREAM_BUFFER_SIZE && s->seekable) { stream_read_more(s, 1);
// Seek to 1 byte before target - this is the only way to distinguish return true;
// skip-to-EOF and skip-past-EOF in general. Successful seeking means
// absolutely nothing, so test by doing a real read of the last byte.
if (!stream_seek(s, target - 1))
return false;
stream_read_char(s);
return !stream_eof(s) && stream_tell(s) == target;
}
return stream_skip_read(s, len);
} }
int stream_control(stream_t *s, int cmd, void *arg) int stream_control(stream_t *s, int cmd, void *arg)
@@ -661,7 +739,9 @@ static const char *const bom[3] = {"\xEF\xBB\xBF", "\xFF\xFE", "\xFE\xFF"};
// Return utf16 argument for stream_read_line // Return utf16 argument for stream_read_line
int stream_skip_bom(struct stream *s) int stream_skip_bom(struct stream *s)
{ {
bstr data = stream_peek(s, 4); char buf[4];
int len = stream_read_peek(s, buf, sizeof(buf));
bstr data = {buf, len};
for (int n = 0; n < 3; n++) { for (int n = 0; n < 3; n++) {
if (bstr_startswith0(data, bom[n])) { if (bstr_startswith0(data, bom[n])) {
stream_skip(s, strlen(bom[n])); stream_skip(s, strlen(bom[n]));

View File

@@ -28,7 +28,11 @@
#include "misc/bstr.h" #include "misc/bstr.h"
// Minimum guaranteed buffer and seek-back size. For any reads <= of this size,
// it's guaranteed that you can seek back by <= of this size again.
#define STREAM_BUFFER_SIZE 2048 #define STREAM_BUFFER_SIZE 2048
// (Half of this is typically reserved for seeking back.)
#define STREAM_FIXED_BUFFER_SIZE (STREAM_BUFFER_SIZE * 2)
// stream->mode // stream->mode
#define STREAM_READ 0 #define STREAM_READ 0
@@ -119,7 +123,6 @@ typedef struct stream {
void (*close)(struct stream *s); void (*close)(struct stream *s);
int read_chunk; // maximum amount of data to read at once to limit latency int read_chunk; // maximum amount of data to read at once to limit latency
unsigned int buf_pos, buf_len;
int64_t pos; int64_t pos;
int eof; int eof;
int mode; //STREAM_READ or STREAM_WRITE int mode; //STREAM_READ or STREAM_WRITE
@@ -145,20 +148,46 @@ typedef struct stream {
// added to this. The user can reset this as needed. // added to this. The user can reset this as needed.
uint64_t total_unbuffered_read_bytes; uint64_t total_unbuffered_read_bytes;
// Buffer size requested by user; s->buffer may have a different size
int requested_buffer_size;
// This is a ring buffer. It is reset only on seeks (or when buffers are
// dropped). Otherwise old contents always stay valid.
// The valid buffer is from buf_start to buf_end; buf_end can be larger
// then the buffer size (requires wrap around). buf_cur is a value in the
// range [buf_start, buf_end].
// When reading more data from the stream, buf_start is advanced as old
// data is overwritten with new data.
// Example:
// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// +===========================+---------------------------+
// + 05 06 07 08 | 01 02 03 04 + 05 06 07 08 | 01 02 03 04 +
// +===========================+---------------------------+
// ^ buf_start (4) | |
// | ^ buf_end (12 % 8 => 4)
// ^ buf_cur (9 % 8 => 1)
// Here, the entire 8 byte buffer is filled, i.e. buf_end - buf_start = 8.
// buffer_mask == 7, so (x & buffer_mask) == (x % buffer_size)
unsigned int buf_start; // index of oldest byte in buffer (is <= buffer_mask)
unsigned int buf_cur; // current read pos (can be > buffer_mask)
unsigned int buf_end; // end position (can be > buffer_mask)
unsigned int buffer_mask; // buffer_size-1, where buffer_size == 2**n
uint8_t *buffer; uint8_t *buffer;
int buffer_alloc; uint8_t buffer_inline[STREAM_FIXED_BUFFER_SIZE];
uint8_t buffer_inline[STREAM_BUFFER_SIZE];
} stream_t; } stream_t;
int stream_fill_buffer(stream_t *s); // Non-inline version with stream_read_char().
int stream_read_char_fallback(stream_t *s);
int stream_write_buffer(stream_t *s, unsigned char *buf, int len); int stream_write_buffer(stream_t *s, unsigned char *buf, int len);
inline static int stream_read_char(stream_t *s) inline static int stream_read_char(stream_t *s)
{ {
return (s->buf_pos < s->buf_len) ? s->buffer[s->buf_pos++] : return s->buf_cur < s->buf_end
(stream_fill_buffer(s) ? s->buffer[s->buf_pos++] : -256); ? s->buffer[(s->buf_cur++) & s->buffer_mask]
: stream_read_char_fallback(s);
} }
int stream_skip_bom(struct stream *s); int stream_skip_bom(struct stream *s);
@@ -170,15 +199,14 @@ inline static int stream_eof(stream_t *s)
inline static int64_t stream_tell(stream_t *s) inline static int64_t stream_tell(stream_t *s)
{ {
return s->pos + s->buf_pos - s->buf_len; return s->pos + s->buf_cur - s->buf_end;
} }
bool stream_skip(stream_t *s, int64_t len); bool stream_skip(stream_t *s, int64_t len);
bool stream_seek(stream_t *s, int64_t pos); bool stream_seek(stream_t *s, int64_t pos);
int stream_read(stream_t *s, char *mem, int total); int stream_read(stream_t *s, char *mem, int total);
int stream_read_partial(stream_t *s, char *buf, int buf_size); int stream_read_partial(stream_t *s, char *buf, int buf_size);
struct bstr stream_peek(stream_t *s, int len); int stream_read_peek(stream_t *s, void* buf, int buf_size);
struct bstr stream_peek_buffer(stream_t *s);
void stream_drop_buffers(stream_t *s); void stream_drop_buffers(stream_t *s);
int64_t stream_get_size(stream_t *s); int64_t stream_get_size(stream_t *s);

View File

@@ -215,7 +215,7 @@ main_dependencies = [
'atomic_int_least64_t test = ATOMIC_VAR_INIT(123);' 'atomic_int_least64_t test = ATOMIC_VAR_INIT(123);'
'atomic_fetch_add(&test, 1)')) 'atomic_fetch_add(&test, 1)'))
}, { }, {
# C11; technically we still support C99 # C11; technically we require C11, but aligned_alloc() is not in MinGW
'name': 'aligned_alloc', 'name': 'aligned_alloc',
'desc': 'C11 aligned_alloc()', 'desc': 'C11 aligned_alloc()',
'func': check_statement('stdlib.h', 'aligned_alloc(1, 1)'), 'func': check_statement('stdlib.h', 'aligned_alloc(1, 1)'),