demux: add a per stream wakeup callback

This is supposed to help making data flow easier and wakeup handling
more efficient. Once that change is done, reading a packet on any
stream won't have to wakeup and poll all decoders (which helps reducing
the mess even if all decoders are on the same thread).

This also improves the accuracy of wakeups by tracking better whether
a wakeup is needed.
This commit is contained in:
wm4
2018-01-29 13:51:47 +01:00
committed by Kevin Mitchell
parent 1d5991ef30
commit eaced0ebb0
2 changed files with 56 additions and 13 deletions

View File

@@ -268,6 +268,9 @@ struct demux_stream {
int index; // equals to sh->index
// --- all fields are protected by in->lock
void (*wakeup_cb)(void *ctx);
void *wakeup_cb_ctx;
// demuxer state
bool selected; // user wants packets from this stream
bool eager; // try to keep at least 1 packet queued
@@ -293,6 +296,7 @@ struct demux_stream {
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
bool need_wakeup; // call wakeup_cb on next reader_head state change
// for refresh seeks: pos/dts of last packet returned to reader
int64_t last_ret_pos;
@@ -580,6 +584,7 @@ static void ds_clear_reader_queue_state(struct demux_stream *ds)
ds->fw_bytes = 0;
ds->fw_packs = 0;
ds->eof = false;
ds->need_wakeup = true;
}
static void ds_clear_reader_state(struct demux_stream *ds)
@@ -595,6 +600,22 @@ static void ds_clear_reader_state(struct demux_stream *ds)
ds->last_ret_dts = MP_NOPTS_VALUE;
}
// Call if the observed reader state on this stream somehow changes. The wakeup
// is skipped if the reader successfully read a packet, because that means we
// expect it to come back and ask for more.
static void wakeup_ds(struct demux_stream *ds)
{
if (ds->need_wakeup) {
if (ds->wakeup_cb) {
ds->wakeup_cb(ds->wakeup_cb_ctx);
} else if (ds->in->wakeup_cb) {
ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
}
ds->need_wakeup = false;
pthread_cond_signal(&ds->in->wakeup);
}
}
static void update_stream_selection_state(struct demux_internal *in,
struct demux_stream *ds)
{
@@ -646,6 +667,8 @@ static void update_stream_selection_state(struct demux_internal *in,
}
free_empty_cached_ranges(in);
wakeup_ds(ds);
}
void demux_set_ts_offset(struct demuxer *demuxer, double offset)
@@ -1273,10 +1296,7 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
}
}
// Wake up if this was the first packet after start/possible underrun.
if (ds->in->wakeup_cb && ds->reader_head && !ds->reader_head->next)
ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
pthread_cond_signal(&in->wakeup);
wakeup_ds(ds);
pthread_mutex_unlock(&in->lock);
}
@@ -1325,11 +1345,8 @@ static bool read_packet(struct demux_internal *in)
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
bool eof = !ds->reader_head;
if (eof && !ds->eof) {
if (in->wakeup_cb)
in->wakeup_cb(in->wakeup_cb_ctx);
pthread_cond_signal(&in->wakeup);
}
if (eof && ds->eof)
wakeup_ds(ds);
ds->eof |= eof;
}
return false;
@@ -1365,6 +1382,8 @@ static bool read_packet(struct demux_internal *in)
if (!ds->eof)
adjust_seek_range_on_packet(ds, NULL);
ds->eof = true;
if (!in->last_eof && ds->wakeup_cb)
wakeup_ds(ds);
}
// If we had EOF previously, then don't wakeup (avoids wakeup loop)
if (!in->last_eof) {
@@ -1629,6 +1648,7 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh)
const char *t = stream_type_name(ds->type);
MP_DBG(in, "reading packet for %s\n", t);
in->eof = false; // force retry
ds->need_wakeup = true;
while (ds->selected && !ds->reader_head && !in->blocked) {
in->reading = true;
// Note: the following code marks EOF if it can't continue
@@ -1679,12 +1699,16 @@ int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt)
} else {
r = *out_pkt ? 1 : -1;
}
ds->need_wakeup = r != 1;
pthread_mutex_unlock(&ds->in->lock);
} else if (ds->in->blocked) {
r = 0;
} else {
*out_pkt = demux_read_packet(sh);
r = *out_pkt ? 1 : -1;
if (ds->in->blocked) {
r = 0;
} else {
*out_pkt = demux_read_packet(sh);
r = *out_pkt ? 1 : -1;
}
ds->need_wakeup = r != 1;
}
return r;
}
@@ -2515,6 +2539,9 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags)
in->seek_pts = seek_pts;
}
for (int n = 0; n < in->num_streams; n++)
wakeup_ds(in->streams[n]->ds);
if (!in->threading && in->seeking)
execute_seek(in);
@@ -2662,6 +2689,16 @@ bool demux_stream_is_selected(struct sh_stream *stream)
return r;
}
void demux_set_stream_wakeup_cb(struct sh_stream *sh,
void (*cb)(void *ctx), void *ctx)
{
pthread_mutex_lock(&sh->ds->in->lock);
sh->ds->wakeup_cb = cb;
sh->ds->wakeup_cb_ctx = ctx;
sh->ds->need_wakeup = true;
pthread_mutex_unlock(&sh->ds->in->lock);
}
int demuxer_add_attachment(demuxer_t *demuxer, char *name, char *type,
void *data, size_t data_size)
{
@@ -2738,6 +2775,10 @@ void demux_block_reading(struct demuxer *demuxer, bool block)
pthread_mutex_lock(&in->lock);
in->blocked = block;
for (int n = 0; n < in->num_streams; n++) {
in->streams[n]->ds->need_wakeup = true;
wakeup_ds(in->streams[n]->ds);
}
pthread_cond_signal(&in->wakeup);
pthread_mutex_unlock(&in->lock);
}

View File

@@ -255,6 +255,8 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh);
int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt);
bool demux_stream_is_selected(struct sh_stream *stream);
bool demux_has_packet(struct sh_stream *sh);
void demux_set_stream_wakeup_cb(struct sh_stream *sh,
void (*cb)(void *ctx), void *ctx);
struct demux_packet *demux_read_any_packet(struct demuxer *demuxer);
struct sh_stream *demux_get_stream(struct demuxer *demuxer, int index);