diff options
Diffstat (limited to 'archive/src/logos')
-rw-r--r-- | archive/src/logos/README.md | 6 | ||||
-rw-r--r-- | archive/src/logos/tasks.h | 74 | ||||
-rw-r--r-- | archive/src/logos/threadpool.c | 141 | ||||
-rw-r--r-- | archive/src/logos/threadpool.h | 96 |
4 files changed, 317 insertions, 0 deletions
diff --git a/archive/src/logos/README.md b/archive/src/logos/README.md new file mode 100644 index 0000000..25b7bef --- /dev/null +++ b/archive/src/logos/README.md @@ -0,0 +1,6 @@ +# Logos + +Logos is the namespace for threadpool & job system code. This is not a 'system' as it is underlying core unit of the engine. + +Threadpool currently gets initialised at core bringup with a set number of threads and results are processed once per frame +on the main thread. This is subject to change but multithreading is not the highest priority right now.
\ No newline at end of file diff --git a/archive/src/logos/tasks.h b/archive/src/logos/tasks.h new file mode 100644 index 0000000..2e3dc53 --- /dev/null +++ b/archive/src/logos/tasks.h @@ -0,0 +1,74 @@ +/** + * Common jobs that get run + */ +#pragma once +#include "defines.h" +#include "logos/threadpool.h" +#include "render_types.h" +#include "str.h" + +typedef enum TaskLifetime { + /** ephemeral tasks must be finished by the end of the frame and thus we use a leak and clear + allocation strategy */ + TASK_EPHEMERAL, + /** multi-frame tasks have a more complex lifetime and must be cleaned up by the caller or in a + separate cleanup callback */ + TASK_MULTIFRAME, + TASK_COUNT +} TaskLifetime; + +typedef enum TaskKind { + TASK_RENDER, + TASK_PHYSICS, + TASK_GAMEPLAY, + TASK_ASSET, + TASK_USERLAND, + TASKKIND_COUNT +} TaskKind; + +typedef struct Task { + char* debug_name; + void* params; + bool is_done; +} Task; + +// Macro : give Params and Result structs and it creates a function that knows +// correct sizes + +typedef struct Task_ModelLoad_Params { + Str8 filepath; // filepath to the model on disk +} Task_ModelLoad_Params; +typedef struct Task_ModelLoad_Result { + Model model; +} Task_ModelLoad_Result; + +// Internally it will allocate data for each + +static bool Task_ModelLoad_Typed(Task_ModelLoad_Params* params, Task_ModelLoad_Result* result, + tpool_task_start run_task, tpool_task_on_complete on_success, + tpool_task_on_complete on_failure) { + threadpool_add_task(pool, , tpool_task_on_complete on_success, tpool_task_on_complete on_fail, + bool buffer_result_for_main_thread, void* param_data, u32 param_data_size, + u32 result_data_size) +} + +// do task +// success +void model_load_success(task_globals* globals, void* result) { + Task_ModelLoad_Result* load_res = result; + + // push into render -> renderables ? +} +// fail + +// we can define our custom task here that wraps the more verbose function pointers +static Task Task_ModelLoad(Task_ModelLoad_Params* params, Task_ModelLoad_Result* result) { + Task task; + task.debug_name = "Load Model"; + task.params = params; + + Task_ModelLoad_Typed(params, result, tpool_task_start run_task, tpool_task_on_complete on_success, + tpool_task_on_complete on_failure) + + return task; +} diff --git a/archive/src/logos/threadpool.c b/archive/src/logos/threadpool.c new file mode 100644 index 0000000..0e82d98 --- /dev/null +++ b/archive/src/logos/threadpool.c @@ -0,0 +1,141 @@ +#include "threadpool.h" + +#include <pthread.h> + +#include "defines.h" +#include "log.h" +#include "ring_queue.h" + +static void* worker_factory(void* arg) { + threadpool_worker* worker = arg; + // INFO("Starting job thread %d", worker->id); + + // Run forever, waiting for jobs. + while (true) { + pthread_mutex_lock(&worker->pool->mutex); + pthread_cond_wait(&worker->pool->has_tasks, &worker->pool->mutex); // wait for work to be ready + + task t; + if (ring_queue_dequeue(worker->pool->task_queue, &t)) { + DEBUG("Job thread %d picked up task %d", worker->id, t.task_id); + } else { + WARN("Job thread %d didnt pick up a task as queue was empty", worker->id); + pthread_mutex_unlock(&worker->pool->mutex); + break; + } + + pthread_mutex_unlock(&worker->pool->mutex); + + // Do the work + bool result = t.do_task(t.params, t.result_data); + + // INFO("Task result was %s", result ? "success" : "failure"); + if (result) { + pthread_mutex_lock(&worker->pool->mutex); + if (t.buffer_result_for_main_thread) { + deferred_task_result dtr = { .task_id = t.task_id, + .callback = t.on_success, + .result_data = t.result_data, + .result_data_size = t.result_data_size }; + deferred_task_result_darray_push(worker->pool->results, dtr); + } else { + // call on complete from here. + } + } else { + // TODO + } + pthread_mutex_unlock(&worker->pool->mutex); + } + + return NULL; +} + +bool threadpool_create(threadpool* pool, u8 thread_count, u32 queue_size) { + INFO("Threadpool init"); + pool->next_task_id = 0; + pool->context = NULL; + + u8 num_worker_threads = thread_count; + if (thread_count > MAX_NUM_THREADS) { + ERROR_EXIT("Threadpool has a hard limit of %d threads, you tried to start one with %d", + MAX_NUM_THREADS, thread_count) + num_worker_threads = MAX_NUM_THREADS; + } + + DEBUG("creating task queue with max length %d", queue_size); + pool->task_queue = ring_queue_new(sizeof(task), queue_size, NULL); + + DEBUG("creating mutex and condition"); + pthread_mutex_init(&pool->mutex, NULL); + pthread_cond_init(&pool->has_tasks, NULL); + + pool->results = deferred_task_result_darray_new(256); + + DEBUG("Spawning %d threads for the threadpool", thread_count); + for (u8 i = 0; i < num_worker_threads; i++) { + pool->workers[i].id = i; + pool->workers[i].pool = pool; + if (pthread_create(&pool->workers[i].thread, NULL, worker_factory, &pool->workers[i]) != 0) { + FATAL("OS error creating job thread"); + return false; + }; + } + + return true; +} + +bool threadpool_add_task(threadpool* pool, tpool_task_start do_task, + tpool_task_on_complete on_success, tpool_task_on_complete on_fail, + bool buffer_result_for_main_thread, void* param_data, u32 param_data_size, + u32 result_data_size) { + void* result_data = malloc(result_data_size); + + task* work_task = malloc(sizeof(task)); + work_task->task_id = 0; + work_task->do_task = do_task; + work_task->on_success = on_success; + work_task->on_failure = on_fail; + work_task->buffer_result_for_main_thread = buffer_result_for_main_thread; + work_task->param_size = param_data_size; + work_task->params = param_data; + work_task->result_data_size = result_data_size; + work_task->result_data = result_data; + + // START critical section + if (pthread_mutex_lock(&pool->mutex) != 0) { + ERROR("Unable to get threadpool lock."); + return false; + } + + work_task->task_id = pool->next_task_id; + pool->next_task_id++; + + ring_queue_enqueue(pool->task_queue, work_task); + DEBUG("Enqueued job"); + pthread_cond_broadcast(&pool->has_tasks); + + if (pthread_mutex_unlock(&pool->mutex) != 0) { + ERROR("couldnt unlock threadpool after adding task."); + return false; // ? + } + // END critical section + + return true; +} + +void threadpool_process_results(threadpool* pool, int _num_to_process) { + pthread_mutex_lock(&pool->mutex); + size_t num_results = deferred_task_result_darray_len(pool->results); + if (num_results > 0) { + u32 _size = ((deferred_task_result*)pool->results->data)[num_results].result_data_size; + deferred_task_result res; + deferred_task_result_darray_pop(pool->results, &res); + pthread_mutex_unlock(&pool->mutex); + task_globals globals = { .pool = pool, .ctx = pool->context }; + res.callback(&globals, res.result_data); + } else { + pthread_mutex_unlock(&pool->mutex); + } +} + +void threadpool_set_ctx(threadpool* pool, void* ctx) { pool->context = ctx; }
\ No newline at end of file diff --git a/archive/src/logos/threadpool.h b/archive/src/logos/threadpool.h new file mode 100644 index 0000000..6390a38 --- /dev/null +++ b/archive/src/logos/threadpool.h @@ -0,0 +1,96 @@ +/** + A Threadpool has a number of "workers", each which process "tasks" +*/ +#pragma once + +#include <pthread.h> + +#include "darray.h" +#include "defines.h" +#include "ring_queue.h" + +#define MAX_NUM_THREADS 16 + +struct threadpool; +typedef struct threadpool threadpool; + +typedef struct task_globals { + threadpool* pool; + void* ctx; +} task_globals; + +/* function pointer */ +typedef bool (*tpool_task_start)(void*, void*); + +/* function pointer */ +typedef void (*tpool_task_on_complete)(task_globals*, void*); + +typedef struct threadpool_worker { + u16 id; + pthread_t thread; + threadpool* pool; // pointer back to the pool so we can get the mutex and cond +} threadpool_worker; + +typedef enum tpool_task_status { + TASK_STATUS_READY, +} task_status; + +typedef struct tpool_task { + u64 task_id; + tpool_task_start do_task; + tpool_task_on_complete on_success; + tpool_task_on_complete on_failure; + bool buffer_result_for_main_thread; + /** @brief a pointer to the parameters data that will be passed into the task. */ + void* params; + u32 param_size; + void* result_data; + u32 result_data_size; +} task; + +typedef struct deferred_task_result { + u64 task_id; + tpool_task_on_complete callback; + u32 result_data_size; + // this gets passed to the void* argument of `tpool_task_on_complete` + void* result_data; +} deferred_task_result; + +#ifndef TYPED_TASK_RESULT_ARRAY +KITC_DECL_TYPED_ARRAY(deferred_task_result) // creates "deferred_task_result_darray" +#define TYPED_TASK_RESULT_ARRAY +#endif + +struct threadpool { + ring_queue* task_queue; + pthread_mutex_t mutex; + pthread_cond_t has_tasks; + threadpool_worker workers[MAX_NUM_THREADS]; + deferred_task_result_darray* results; + u64 next_task_id; + + void* context; +}; + +/** + * @param pool where to store the created threadpool + * @param thread_count how many threads to spawn + * @param queue_size max size of task queue + */ +bool threadpool_create(threadpool* pool, u8 thread_count, u32 queue_size); +void threadpool_destroy(threadpool* pool); + +/** @brief set a context variable for the threadpool that task data has access to */ +void threadpool_set_ctx(threadpool* pool, void* ctx); + +/** + * @brief Add a task to the threadpool + */ +bool threadpool_add_task(threadpool* pool, tpool_task_start do_task, + tpool_task_on_complete on_success, tpool_task_on_complete on_fail, + bool buffer_result_for_main_thread, void* param_data, u32 param_data_size, + u32 result_data_size); + +void threadpool_process_results(threadpool* pool, int num_to_process); + +u32 Tpool_GetNumWorkers(); // how many threads are we using
\ No newline at end of file |