ALL: use new mp_thread abstraction

This commit is contained in:
Kacper Michajłow
2023-10-21 04:55:41 +02:00
committed by Dudemanguy
parent 3a8b107f62
commit 174df99ffa
81 changed files with 1255 additions and 1302 deletions

View File

@@ -16,7 +16,6 @@
*/
#include <assert.h>
#include <pthread.h>
#include <stdarg.h>
#include <stdatomic.h>
#include <stdint.h>
@@ -55,9 +54,9 @@
struct mp_log_root {
struct mpv_global *global;
pthread_mutex_t lock;
pthread_mutex_t log_file_lock;
pthread_cond_t log_file_wakeup;
mp_mutex lock;
mp_mutex log_file_lock;
mp_cond log_file_wakeup;
// --- protected by lock
char **msg_levels;
bool use_terminal; // make accesses to stderr/stdout
@@ -83,7 +82,7 @@ struct mp_log_root {
// --- owner thread only (caller of mp_msg_init() etc.)
char *log_path;
char *stats_path;
pthread_t log_file_thread;
mp_thread log_file_thread;
// --- owner thread only, but frozen while log_file_thread is running
FILE *log_file;
struct mp_log_buffer *log_file_buffer;
@@ -104,7 +103,7 @@ struct mp_log {
struct mp_log_buffer {
struct mp_log_root *root;
pthread_mutex_t lock;
mp_mutex lock;
// --- protected by lock
struct mp_log_buffer_entry **entries; // ringbuffer
int capacity; // total space in entries[]
@@ -133,7 +132,7 @@ static bool match_mod(const char *name, const char *mod)
static void update_loglevel(struct mp_log *log)
{
struct mp_log_root *root = log->root;
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
log->level = MSGL_STATUS + root->verbose; // default log level
if (root->really_quiet)
log->level = -1;
@@ -155,7 +154,7 @@ static void update_loglevel(struct mp_log *log)
log->level = MPMAX(log->level, MSGL_STATS);
log->level = MPMIN(log->level, log->max_level);
atomic_store(&log->reload_counter, atomic_load(&log->root->reload_counter));
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
}
// Set (numerically) the maximum level that should still be output for this log
@@ -164,9 +163,9 @@ void mp_msg_set_max_level(struct mp_log *log, int lev)
{
if (!log->root)
return;
pthread_mutex_lock(&log->root->lock);
mp_mutex_lock(&log->root->lock);
log->max_level = MPCLAMP(lev, -1, MSGL_MAX);
pthread_mutex_unlock(&log->root->lock);
mp_mutex_unlock(&log->root->lock);
update_loglevel(log);
}
@@ -233,9 +232,9 @@ static void flush_status_line(struct mp_log_root *root)
void mp_msg_flush_status_line(struct mp_log *log)
{
if (log->root) {
pthread_mutex_lock(&log->root->lock);
mp_mutex_lock(&log->root->lock);
flush_status_line(log->root);
pthread_mutex_unlock(&log->root->lock);
mp_mutex_unlock(&log->root->lock);
}
}
@@ -243,18 +242,18 @@ void mp_msg_set_term_title(struct mp_log *log, const char *title)
{
if (log->root && title) {
// Lock because printf to terminal is not necessarily atomic.
pthread_mutex_lock(&log->root->lock);
mp_mutex_lock(&log->root->lock);
fprintf(stderr, "\e]0;%s\007", title);
pthread_mutex_unlock(&log->root->lock);
mp_mutex_unlock(&log->root->lock);
}
}
bool mp_msg_has_status_line(struct mpv_global *global)
{
struct mp_log_root *root = global->log->root;
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
bool r = root->status_lines > 0;
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
return r;
}
@@ -352,7 +351,7 @@ static void write_msg_to_buffers(struct mp_log *log, int lev, char *text)
for (int n = 0; n < root->num_buffers; n++) {
struct mp_log_buffer *buffer = root->buffers[n];
bool wakeup = false;
pthread_mutex_lock(&buffer->lock);
mp_mutex_lock(&buffer->lock);
int buffer_level = buffer->level;
if (buffer_level == MP_LOG_BUFFER_MSGL_TERM)
buffer_level = log->terminal_level;
@@ -366,16 +365,16 @@ static void write_msg_to_buffers(struct mp_log *log, int lev, char *text)
while (buffer->num_entries == buffer->capacity && !dead) {
// Temporary unlock is OK; buffer->level is immutable, and
// buffer can't go away because the global log lock is held.
pthread_mutex_unlock(&buffer->lock);
pthread_mutex_lock(&root->log_file_lock);
mp_mutex_unlock(&buffer->lock);
mp_mutex_lock(&root->log_file_lock);
if (root->log_file_thread_active) {
pthread_cond_wait(&root->log_file_wakeup,
mp_cond_wait(&root->log_file_wakeup,
&root->log_file_lock);
} else {
dead = true;
}
pthread_mutex_unlock(&root->log_file_lock);
pthread_mutex_lock(&buffer->lock);
mp_mutex_unlock(&root->log_file_lock);
mp_mutex_lock(&buffer->lock);
}
}
if (buffer->num_entries == buffer->capacity) {
@@ -395,7 +394,7 @@ static void write_msg_to_buffers(struct mp_log *log, int lev, char *text)
if (buffer->wakeup_cb && !buffer->silent)
wakeup = true;
}
pthread_mutex_unlock(&buffer->lock);
mp_mutex_unlock(&buffer->lock);
if (wakeup)
buffer->wakeup_cb(buffer->wakeup_cb_ctx);
}
@@ -415,7 +414,7 @@ void mp_msg_va(struct mp_log *log, int lev, const char *format, va_list va)
struct mp_log_root *root = log->root;
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
root->buffer.len = 0;
@@ -461,7 +460,7 @@ void mp_msg_va(struct mp_log *log, int lev, const char *format, va_list va)
}
}
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
}
static void destroy_log(void *ptr)
@@ -527,9 +526,9 @@ void mp_msg_init(struct mpv_global *global)
.reload_counter = 1,
};
pthread_mutex_init(&root->lock, NULL);
pthread_mutex_init(&root->log_file_lock, NULL);
pthread_cond_init(&root->log_file_wakeup, NULL);
mp_mutex_init(&root->lock);
mp_mutex_init(&root->log_file_lock);
mp_cond_init(&root->log_file_wakeup);
struct mp_log dummy = { .root = root };
struct mp_log *log = mp_log_new(root, &dummy, "");
@@ -537,44 +536,44 @@ void mp_msg_init(struct mpv_global *global)
global->log = log;
}
static void *log_file_thread(void *p)
static MP_THREAD_VOID log_file_thread(void *p)
{
struct mp_log_root *root = p;
mpthread_set_name("log");
mp_thread_set_name("log");
pthread_mutex_lock(&root->log_file_lock);
mp_mutex_lock(&root->log_file_lock);
while (root->log_file_thread_active) {
struct mp_log_buffer_entry *e =
mp_msg_log_buffer_read(root->log_file_buffer);
if (e) {
pthread_mutex_unlock(&root->log_file_lock);
mp_mutex_unlock(&root->log_file_lock);
fprintf(root->log_file, "[%8.3f][%c][%s] %s",
mp_time_sec(),
mp_log_levels[e->level][0], e->prefix, e->text);
fflush(root->log_file);
pthread_mutex_lock(&root->log_file_lock);
mp_mutex_lock(&root->log_file_lock);
talloc_free(e);
// Multiple threads might be blocked if the log buffer was full.
pthread_cond_broadcast(&root->log_file_wakeup);
mp_cond_broadcast(&root->log_file_wakeup);
} else {
pthread_cond_wait(&root->log_file_wakeup, &root->log_file_lock);
mp_cond_wait(&root->log_file_wakeup, &root->log_file_lock);
}
}
pthread_mutex_unlock(&root->log_file_lock);
mp_mutex_unlock(&root->log_file_lock);
return NULL;
MP_THREAD_RETURN();
}
static void wakeup_log_file(void *p)
{
struct mp_log_root *root = p;
pthread_mutex_lock(&root->log_file_lock);
pthread_cond_broadcast(&root->log_file_wakeup);
pthread_mutex_unlock(&root->log_file_lock);
mp_mutex_lock(&root->log_file_lock);
mp_cond_broadcast(&root->log_file_wakeup);
mp_mutex_unlock(&root->log_file_lock);
}
// Only to be called from the main thread.
@@ -582,16 +581,16 @@ static void terminate_log_file_thread(struct mp_log_root *root)
{
bool wait_terminate = false;
pthread_mutex_lock(&root->log_file_lock);
mp_mutex_lock(&root->log_file_lock);
if (root->log_file_thread_active) {
root->log_file_thread_active = false;
pthread_cond_broadcast(&root->log_file_wakeup);
mp_cond_broadcast(&root->log_file_wakeup);
wait_terminate = true;
}
pthread_mutex_unlock(&root->log_file_lock);
mp_mutex_unlock(&root->log_file_lock);
if (wait_terminate)
pthread_join(root->log_file_thread, NULL);
mp_thread_join(root->log_file_thread);
mp_msg_log_buffer_destroy(root->log_file_buffer);
root->log_file_buffer = NULL;
@@ -631,7 +630,7 @@ void mp_msg_update_msglevels(struct mpv_global *global, struct MPOpts *opts)
{
struct mp_log_root *root = global->log->root;
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
root->verbose = opts->verbose;
root->really_quiet = opts->msg_really_quiet;
@@ -645,7 +644,7 @@ void mp_msg_update_msglevels(struct mpv_global *global, struct MPOpts *opts)
m_option_type_msglevels.copy(NULL, &root->msg_levels, &opts->msg_levels);
atomic_fetch_add(&root->reload_counter, 1);
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
if (check_new_path(global, opts->log_file, &root->log_path)) {
terminate_log_file_thread(root);
@@ -655,11 +654,11 @@ void mp_msg_update_msglevels(struct mpv_global *global, struct MPOpts *opts)
// if a logfile is created and the early filebuf still exists,
// flush and destroy the early buffer
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
struct mp_log_buffer *earlybuf = root->early_filebuffer;
if (earlybuf)
root->early_filebuffer = NULL; // but it still logs msgs
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
if (earlybuf) {
// flush, destroy before creating the normal logfile buf,
@@ -681,7 +680,7 @@ void mp_msg_update_msglevels(struct mpv_global *global, struct MPOpts *opts)
mp_msg_log_buffer_new(global, FILE_BUF, MP_LOG_BUFFER_MSGL_LOGFILE,
wakeup_log_file, root);
root->log_file_thread_active = true;
if (pthread_create(&root->log_file_thread, NULL, log_file_thread,
if (mp_thread_create(&root->log_file_thread, log_file_thread,
root))
{
root->log_file_thread_active = false;
@@ -697,7 +696,7 @@ void mp_msg_update_msglevels(struct mpv_global *global, struct MPOpts *opts)
if (check_new_path(global, opts->dump_stats, &root->stats_path)) {
bool open_error = false;
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
if (root->stats_file)
fclose(root->stats_file);
root->stats_file = NULL;
@@ -705,7 +704,7 @@ void mp_msg_update_msglevels(struct mpv_global *global, struct MPOpts *opts)
root->stats_file = fopen(root->stats_path, "wb");
open_error = !root->stats_file;
}
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
if (open_error) {
mp_err(global->log, "Failed to open stats file '%s'\n",
@@ -718,9 +717,9 @@ void mp_msg_force_stderr(struct mpv_global *global, bool force_stderr)
{
struct mp_log_root *root = global->log->root;
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
root->force_stderr = force_stderr;
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
}
// Only to be called from the main thread.
@@ -743,9 +742,9 @@ void mp_msg_uninit(struct mpv_global *global)
talloc_free(root->stats_path);
talloc_free(root->log_path);
m_option_type_msglevels.free(&root->msg_levels);
pthread_mutex_destroy(&root->lock);
pthread_mutex_destroy(&root->log_file_lock);
pthread_cond_destroy(&root->log_file_wakeup);
mp_mutex_destroy(&root->lock);
mp_mutex_destroy(&root->log_file_lock);
mp_cond_destroy(&root->log_file_wakeup);
talloc_free(root);
global->log = NULL;
}
@@ -773,26 +772,26 @@ static void mp_msg_set_early_logging_raw(struct mpv_global *global, bool enable,
int size, int level)
{
struct mp_log_root *root = global->log->root;
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
if (enable != !!*root_logbuf) {
if (enable) {
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
struct mp_log_buffer *buf =
mp_msg_log_buffer_new(global, size, level, NULL, NULL);
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
assert(!*root_logbuf); // no concurrent calls to this function
*root_logbuf = buf;
} else {
struct mp_log_buffer *buf = *root_logbuf;
*root_logbuf = NULL;
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
mp_msg_log_buffer_destroy(buf);
return;
}
}
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
}
void mp_msg_set_early_logging(struct mpv_global *global, bool enable)
@@ -814,7 +813,7 @@ struct mp_log_buffer *mp_msg_log_buffer_new(struct mpv_global *global,
{
struct mp_log_root *root = global->log->root;
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
if (level == MP_LOG_BUFFER_MSGL_TERM) {
size = TERM_BUF;
@@ -828,7 +827,7 @@ struct mp_log_buffer *mp_msg_log_buffer_new(struct mpv_global *global,
root->early_buffer = NULL;
buffer->wakeup_cb = wakeup_cb;
buffer->wakeup_cb_ctx = wakeup_cb_ctx;
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
return buffer;
}
}
@@ -845,21 +844,21 @@ struct mp_log_buffer *mp_msg_log_buffer_new(struct mpv_global *global,
.wakeup_cb_ctx = wakeup_cb_ctx,
};
pthread_mutex_init(&buffer->lock, NULL);
mp_mutex_init(&buffer->lock);
MP_TARRAY_APPEND(root, root->buffers, root->num_buffers, buffer);
atomic_fetch_add(&root->reload_counter, 1);
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
return buffer;
}
void mp_msg_log_buffer_set_silent(struct mp_log_buffer *buffer, bool silent)
{
pthread_mutex_lock(&buffer->lock);
mp_mutex_lock(&buffer->lock);
buffer->silent = silent;
pthread_mutex_unlock(&buffer->lock);
mp_mutex_unlock(&buffer->lock);
}
void mp_msg_log_buffer_destroy(struct mp_log_buffer *buffer)
@@ -869,7 +868,7 @@ void mp_msg_log_buffer_destroy(struct mp_log_buffer *buffer)
struct mp_log_root *root = buffer->root;
pthread_mutex_lock(&root->lock);
mp_mutex_lock(&root->lock);
for (int n = 0; n < root->num_buffers; n++) {
if (root->buffers[n] == buffer) {
@@ -885,11 +884,11 @@ found:
while (buffer->num_entries)
talloc_free(log_buffer_read(buffer));
pthread_mutex_destroy(&buffer->lock);
mp_mutex_destroy(&buffer->lock);
talloc_free(buffer);
atomic_fetch_add(&root->reload_counter, 1);
pthread_mutex_unlock(&root->lock);
mp_mutex_unlock(&root->lock);
}
// Return a queued message, or if the buffer is empty, NULL.
@@ -898,7 +897,7 @@ struct mp_log_buffer_entry *mp_msg_log_buffer_read(struct mp_log_buffer *buffer)
{
struct mp_log_buffer_entry *res = NULL;
pthread_mutex_lock(&buffer->lock);
mp_mutex_lock(&buffer->lock);
if (!buffer->silent && buffer->num_entries) {
if (buffer->dropped) {
@@ -916,7 +915,7 @@ struct mp_log_buffer_entry *mp_msg_log_buffer_read(struct mp_log_buffer *buffer)
}
}
pthread_mutex_unlock(&buffer->lock);
mp_mutex_unlock(&buffer->lock);
return res;
}