/** * @file mce-worker.c * * Mode Control Entity - Offload blocking operations to a worker thread * *

* * Copyright (C) 2015 Jolla Ltd. * *

* * @author Simo Piiroinen * * mce is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License * version 2.1 as published by the Free Software Foundation. * * mce is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with mce. If not, see . */ #include "mce-worker.h" #include "mce-log.h" #include #include #include #include #include #include #include /* ========================================================================= * * FUNCTIONALITY * ========================================================================= */ /* ------------------------------------------------------------------------- * * MISC_UTIL * ------------------------------------------------------------------------- */ static guint mw_add_iowatch(int fd, bool close_on_unref, GIOCondition cnd, GIOFunc io_cb, gpointer aptr); /* ------------------------------------------------------------------------- * * MCE_JOB * ------------------------------------------------------------------------- */ typedef struct mce_job_t mce_job_t; /** Job object */ struct mce_job_t { /** Link to the next job in line */ mce_job_t *mj_next; /** Validation context for this job */ char *mj_context; /** Name of this job */ char *mj_name; /** Callback for executing the job */ void *(*mj_handle)(void *); /** Callback for notifying job executed */ void (*mj_notify)(void *, void *); /** User data to be passed to the callbacks */ void *mj_param; /** Reply value from execute callback, passed to notification callback */ void *mj_reply; }; static const char *mce_job_context (const mce_job_t *self); static const char *mce_job_name (const mce_job_t *self); static void mce_job_notify (mce_job_t *self); static void mce_job_execute (mce_job_t *self); static void mce_job_delete (mce_job_t *self); static mce_job_t *mce_job_create (const char *context, const char *name, void *(*handle)(void *), void (*notify)(void *, void *), void *param); /* ------------------------------------------------------------------------- * * MCE_JOBLIST * ------------------------------------------------------------------------- */ /** Job list object */ typedef struct { /* Pointer to the first job in the queue */ mce_job_t *mjl_head; /* Pointer to the last job in the queue */ mce_job_t *mjl_tail; } mce_joblist_t; static mce_job_t *mce_joblist_pull (mce_joblist_t *self); static void mce_joblist_push (mce_joblist_t *self, mce_job_t *job); static void mce_joblist_delete (mce_joblist_t *self); static mce_joblist_t *mce_joblist_create (void); /* ------------------------------------------------------------------------- * * MCE_WORKER * ------------------------------------------------------------------------- */ static gboolean mce_worker_notify_cb (GIOChannel *chn, GIOCondition cnd, gpointer data); static void mce_worker_execute (void); static void *mce_worker_main (void *aptr); void mce_worker_add_job (const char *context, const char *name, void *(*handle)(void *), void (*notify)(void *, void *), void *param); void mce_worker_add_context(const char *context); void mce_worker_rem_context(const char *context); static bool mce_worker_has_context(const char *context); bool mce_worker_init (void); void mce_worker_quit (void); /** Flag for: Worker thread is running */ static bool mw_is_ready = false; /** List of jobs to be executed */ static mce_joblist_t *mw_req_list = 0; /** Mutex protecting access to mw_req_list */ static pthread_mutex_t mw_req_mutex = PTHREAD_MUTEX_INITIALIZER; /** eventfd descriptor for waking up worker thread after adding new jobs */ static int mw_req_evfd = -1; /** Worker thread id */ static pthread_t mw_req_tid = 0; /** List of jobs already executed */ static mce_joblist_t *mw_rsp_list = 0; /** Mutex protecting access to mw_rsp_list */ static pthread_mutex_t mw_rsp_mutex = PTHREAD_MUTEX_INITIALIZER; /** eventfd descriptor for waking up main thread after executing jobs */ static int mw_rsp_evfd = -1; /** I/O watch identifier for mw_rsp_evfd */ static guint mw_rsp_wid = 0; /** Lookup table containing valid context strings */ static GHashTable *mw_ctx_lut = 0; /** Mutex protecting access to mw_ctx_lut */ static pthread_mutex_t mw_ctx_mutex = PTHREAD_MUTEX_INITIALIZER; /* ========================================================================= * * MISC_UTIL * ========================================================================= */ /** Helper for creating I/O watch for file descriptor */ static guint mw_add_iowatch(int fd, bool close_on_unref, GIOCondition cnd, GIOFunc io_cb, gpointer aptr) { guint wid = 0; GIOChannel *chn = 0; if( !(chn = g_io_channel_unix_new(fd)) ) goto cleanup; g_io_channel_set_close_on_unref(chn, close_on_unref); cnd |= G_IO_ERR | G_IO_HUP | G_IO_NVAL; if( !(wid = g_io_add_watch(chn, cnd, io_cb, aptr)) ) goto cleanup; cleanup: if( chn != 0 ) g_io_channel_unref(chn); return wid; } /* ========================================================================= * * MCE_JOB * ========================================================================= */ /** Get job name string * * @param self job object, or NULL * * @return name of the job, or "unknown" */ static const char * mce_job_name(const mce_job_t *self) { const char *name = 0; if( self ) name = self->mj_name; return name ?: "unknown"; } /** Get job context string * * @param self job object, or NULL * * @return context of the job, or "global" */ static const char * mce_job_context(const mce_job_t *self) { const char *context = 0; if( self ) context = self->mj_context; return context ?: "global"; } /** Job executed notification * * This must be called from the mainloop thread. * * @param self job object, or NULL */ static void mce_job_notify(mce_job_t *self) { if( !self ) goto EXIT; if( !self->mj_notify ) goto EXIT; mce_log(LL_DEBUG, "job(%s:%s) notify", mce_job_context(self), mce_job_name(self)); pthread_mutex_lock(&mw_ctx_mutex); if( mce_worker_has_context(self->mj_context) ) self->mj_notify(self->mj_param, self->mj_reply); pthread_mutex_unlock(&mw_ctx_mutex); EXIT: return; } /** Execute job * * This must be called from the worker thread. * * @param self job object, or NULL */ static void mce_job_execute(mce_job_t *self) { if( !self ) goto EXIT; if( !self->mj_handle ) goto EXIT; mce_log(LL_DEBUG, "job(%s:%s) execute", mce_job_context(self), mce_job_name(self)); pthread_mutex_lock(&mw_ctx_mutex); if( mce_worker_has_context(self->mj_context) ) self->mj_reply = self->mj_handle(self->mj_param); pthread_mutex_unlock(&mw_ctx_mutex); EXIT: return; } /** Delete job object * * @param self job object, or NULL */ static void mce_job_delete(mce_job_t *self) { if( !self ) goto EXIT; mce_log(LL_DEBUG, "job(%s:%s) deleted", mce_job_context(self), mce_job_name(self)); free(self->mj_name); free(self->mj_context); free(self); EXIT: return; } /** Create job object * * @param context Validation context string * @param name Job name string * @param handle Execute callback (in worker thread) * @param notify Finished callback (in main thread) * @param param User data to be passed to callbacks * * @return job object */ static mce_job_t * mce_job_create(const char *context, const char *name, void *(*handle)(void *), void (*notify)(void *, void *), void *param) { mce_job_t *self = calloc(1, sizeof *self); self->mj_next = 0; self->mj_context = context ? strdup(context) : 0; self->mj_name = name ? strdup(name) : 0; self->mj_handle = handle; self->mj_notify = notify; self->mj_param = param; self->mj_reply = 0; mce_log(LL_DEBUG, "job(%s:%s) created", mce_job_context(self), mce_job_name(self)); return self; } /* ========================================================================= * * MCE_JOBLIST * ========================================================================= */ /** Pull a job object from a list of jobs * * Owenership of non-null job is transferred to the caller. * @param self Job list object, or NULL * * @return job object, or NULL */ static mce_job_t * mce_joblist_pull(mce_joblist_t *self) { mce_job_t *job = 0; if( !self ) goto EXIT; if( !(job = self->mjl_head) ) goto EXIT; if( !(self->mjl_head = job->mj_next) ) self->mjl_tail = 0; job->mj_next = 0; EXIT: return job; } /** Pull a job object from a list of jobs * * Owenership of non-null job is transferred to the joblist. * * @param self Job list object, or NULL * @param job Job object, or NULL */ static void mce_joblist_push(mce_joblist_t *self, mce_job_t *job) { if( !self || !job ) goto EXIT; if( self->mjl_tail ) self->mjl_tail->mj_next = job; else self->mjl_head = job; self->mjl_tail = job; job = 0; EXIT: if( job ) { mce_log(LL_ERR, "job(%s:%s) could not be queued; deleting", mce_job_context(job), mce_job_name(job)); mce_job_delete(job); } return; } /** Delete job list object and all contained jobs * * @param self Job list object, or NULL */ static void mce_joblist_delete(mce_joblist_t *self) { mce_job_t *job; if( !self ) goto EXIT; while( (job = mce_joblist_pull(self)) ) mce_job_delete(job); free(self); EXIT: return; } /** Create job list object * * @return job list object */ static mce_joblist_t * mce_joblist_create(void) { mce_joblist_t *self = calloc(1, sizeof *self); self->mjl_head = 0; self->mjl_tail = 0; return self; } /* ========================================================================= * * MCE_WORKER * ========================================================================= */ /** Check validity of job context * * Note: Caller must hold mw_ctx_mutex. * * @param context Context string, or NULL for global * * @return true if context is valid, false otherwise */ static bool mce_worker_has_context(const char *context) { if( !mw_is_ready ) return false; if( !context ) return true; if( !mw_ctx_lut ) return false;; return g_hash_table_lookup(mw_ctx_lut, context) != 0; } /** Mark job context as valid * * @param context Context string, or NULL for nop */ void mce_worker_add_context(const char *context) { if( !mw_is_ready ) goto EXIT; if( !context ) goto EXIT; if( !mw_ctx_lut ) goto EXIT; pthread_mutex_lock(&mw_ctx_mutex); g_hash_table_replace(mw_ctx_lut, g_strdup(context), GINT_TO_POINTER(1)); pthread_mutex_unlock(&mw_ctx_mutex); mce_log(LL_DEBUG, "%s: context enabled", context); EXIT: return; } /** Mark job context as invalid * * @param context Context string, or NULL for nop */ void mce_worker_rem_context(const char *context) { if( !mw_ctx_lut ) goto EXIT; if( !context ) goto EXIT; pthread_mutex_lock(&mw_ctx_mutex); g_hash_table_remove(mw_ctx_lut, context); pthread_mutex_unlock(&mw_ctx_mutex); mce_log(LL_DEBUG, "%s: context disabled", context); EXIT: return; } /** Callback for: Handle job executed notifications * * Note: This is called from main thread. * * @param chn I/O channel for eventfd * @param cnd Wakeup reason * @param data User data (not used) * * @return FALSE if io watch should be disabled, TRUE otherwise */ static gboolean mce_worker_notify_cb(GIOChannel *chn, GIOCondition cnd, gpointer data) { (void)data; gboolean keep_going = FALSE; if( !mw_rsp_wid ) goto cleanup_nak; int fd = g_io_channel_unix_get_fd(chn); if( fd < 0 ) goto cleanup_nak; if( cnd & ~G_IO_IN ) goto cleanup_nak; if( !(cnd & G_IO_IN) ) goto cleanup_ack; uint64_t cnt = 0; int rc = read(fd, &cnt, sizeof cnt); if( rc == 0 ) { mce_log(LL_ERR, "unexpected eof"); goto cleanup_nak; } if( rc == -1 ) { if( errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK ) goto cleanup_ack; mce_log(LL_ERR, "read error: %m"); goto cleanup_nak; } if( rc != sizeof cnt ) goto cleanup_nak; for( ;; ) { pthread_mutex_lock(&mw_rsp_mutex); mce_job_t *job = mce_joblist_pull(mw_rsp_list); pthread_mutex_unlock(&mw_rsp_mutex); if( !job ) break; mce_job_notify(job); mce_job_delete(job); } cleanup_ack: keep_going = TRUE; cleanup_nak: if( !keep_going ) { mw_rsp_wid = 0; mce_log(LL_CRIT, "worker notifications disabled"); } return keep_going; } /** Execute queued jobs * * Note: This is called from worker thread */ static void mce_worker_execute(void) { for( ;; ) { pthread_mutex_lock(&mw_req_mutex); mce_job_t *job = mce_joblist_pull(mw_req_list); pthread_mutex_unlock(&mw_req_mutex); if( !job ) break; mce_job_execute(job); pthread_mutex_lock(&mw_rsp_mutex); mce_joblist_push(mw_rsp_list, job); pthread_mutex_unlock(&mw_rsp_mutex); uint64_t cnt = 1; if( write(mw_rsp_evfd, &cnt, sizeof cnt) == -1 ) { mce_log(LL_ERR, "signaling job finished failed: %m"); } } } /** Worker thread mainloop * * @param aptr user data (not used) * * @return NULL */ static void * mce_worker_main(void *aptr) { (void)aptr; /* Allow quick and dirty cancellation */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0); for( ;; ) { uint64_t cnt = 0; int rc = read(mw_req_evfd, &cnt, sizeof cnt); if( rc == -1 ) { if( errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK ) continue; mce_log(LL_ERR, "read: %m"); goto EXIT; } if( rc != sizeof cnt ) continue; if( cnt > 0 ) mce_worker_execute(); } EXIT: return 0; } /** Queue a job to be executed in worker thread * * @param context Validation context string, or NULL for global * @param handle Execute job callback * @param notify Job finished notification callback * @param param Pointer to be passed to the callbacks */ void mce_worker_add_job(const char *context, const char *name, void *(*handle)(void *), void (*notify)(void *, void *), void *param) { if( !mw_is_ready ) { mce_log(LL_ERR, "job(%s:%s) scheduled while not ready", context, name); goto EXIT; } mce_job_t *job = mce_job_create(context, name, handle, notify, param); pthread_mutex_lock(&mw_req_mutex); mce_joblist_push(mw_req_list, job); pthread_mutex_unlock(&mw_req_mutex); uint64_t cnt = 1; if( write(mw_req_evfd, &cnt, sizeof cnt) == -1 ) { mce_log(LL_ERR, "signaling job added failed: %m"); } EXIT: return; } /** Terminate worker thread */ void mce_worker_quit(void) { /* No longer ready to accept jobs */ mw_is_ready = false; /* Stop worker thread */ if( mw_req_tid ) { if( pthread_cancel(mw_req_tid) != 0 ) { mce_log(LOG_ERR, "failed to stop worker thread"); } else { void *status = 0; pthread_join(mw_req_tid, &status); mce_log(LOG_DEBUG, "worker stopped, status = %p", status); } mw_req_tid = 0; } /* Note: The worker thread is killed asynchronously, so it is * possible that the mutexes are left in locked state * and thus must not be used after this stage. */ /* Remove request pipeline */ mce_joblist_delete(mw_req_list), mw_req_list = 0; if( mw_req_evfd != -1 ) close(mw_req_evfd), mw_req_evfd = -1; /* Remove notify pipeline */ if( mw_rsp_wid ) g_source_remove(mw_rsp_wid), mw_rsp_wid = 0; mce_joblist_delete(mw_rsp_list), mw_rsp_list = 0; if( mw_rsp_evfd != -1 ) close(mw_rsp_evfd), mw_req_evfd = -1; /* Remove context lookup table */ if( mw_ctx_lut ) g_hash_table_unref(mw_ctx_lut), mw_ctx_lut = 0; } /** Start worker thread * * @return true on success, false on failure */ bool mce_worker_init(void) { bool ack = false; /* Setup context lookup table */ mw_ctx_lut = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, 0); /* Setup notify pipeline */ if( !(mw_rsp_list = mce_joblist_create()) ) goto EXIT; if( (mw_rsp_evfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) == -1 ) goto EXIT; mw_rsp_wid = mw_add_iowatch(mw_rsp_evfd, false, G_IO_IN, mce_worker_notify_cb, 0); if( !mw_rsp_wid ) goto EXIT; /* Setup request pipeline */ if( !(mw_req_list = mce_joblist_create()) ) goto EXIT; if( (mw_req_evfd = eventfd(0, EFD_CLOEXEC)) == -1 ) goto EXIT; /* Start worker thread */ if( pthread_create(&mw_req_tid, 0, mce_worker_main, 0) != 0 ) { mw_req_tid = 0; goto EXIT; } /* Note: From now on joblist access must use mutex locking */ /* Ready to accept jobs */ mw_is_ready = true; ack = true; EXIT: /* All or nothing */ if( !ack ) mce_worker_quit(); return ack; }