1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
/**
A Threadpool has a number of "workers", each which process "tasks"
*/
#pragma once
#include <pthread.h>
#include "darray.h"
#include "defines.h"
#include "ring_queue.h"
#define MAX_NUM_THREADS 16
struct threadpool;
typedef struct threadpool threadpool;
typedef struct task_globals {
threadpool *pool;
void *ctx;
} task_globals;
/* function pointer */
typedef bool (*tpool_task_start)(void *, void *);
/* function pointer */
typedef void (*tpool_task_on_complete)(task_globals *, void *);
typedef struct threadpool_worker {
u16 id;
pthread_t thread;
threadpool *pool; // pointer back to the pool so we can get the mutex and cond
} threadpool_worker;
typedef enum tpool_task_status {
TASK_STATUS_READY,
} task_status;
typedef struct tpool_task {
u64 task_id;
tpool_task_start do_task;
tpool_task_on_complete on_success;
tpool_task_on_complete on_failure;
bool buffer_result_for_main_thread;
/** @brief a pointer to the parameters data that will be passed into the task. */
void *params;
u32 param_size;
void *result_data;
u32 result_data_size;
} task;
typedef struct deferred_task_result {
u64 task_id;
tpool_task_on_complete callback;
u32 result_data_size;
// this gets passed to the void* argument of `tpool_task_on_complete`
void *result_data;
} deferred_task_result;
#ifndef TYPED_TASK_RESULT_ARRAY
KITC_DECL_TYPED_ARRAY(deferred_task_result) // creates "deferred_task_result_darray"
#define TYPED_TASK_RESULT_ARRAY
#endif
struct threadpool {
ring_queue *task_queue;
pthread_mutex_t mutex;
pthread_cond_t has_tasks;
threadpool_worker workers[MAX_NUM_THREADS];
deferred_task_result_darray *results;
u64 next_task_id;
void *context;
};
/**
* @param pool where to store the created threadpool
* @param thread_count how many threads to spawn
* @param queue_size max size of task queue
*/
bool threadpool_create(threadpool *pool, u8 thread_count, u32 queue_size);
void threadpool_destroy(threadpool *pool);
/** @brief set a context variable for the threadpool that task data has access to */
void threadpool_set_ctx(threadpool *pool, void *ctx);
/**
* @brief Add a task to the threadpool
*/
bool threadpool_add_task(threadpool *pool, tpool_task_start do_task,
tpool_task_on_complete on_success, tpool_task_on_complete on_fail,
bool buffer_result_for_main_thread, void *param_data, u32 param_data_size,
u32 result_data_size);
void threadpool_process_results(threadpool *pool, int num_to_process);
u32 Tpool_GetNumWorkers(); // how many threads are we using
|