1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
#include "threadpool.h"
#include <pthread.h>
#include "defines.h"
#include "log.h"
#include "ring_queue.h"
static void *worker_factory(void *arg) {
threadpool_worker *worker = arg;
// INFO("Starting job thread %d", worker->id);
// Run forever, waiting for jobs.
while (true) {
pthread_mutex_lock(&worker->pool->mutex);
pthread_cond_wait(&worker->pool->has_tasks, &worker->pool->mutex); // wait for work to be ready
task t;
if (ring_queue_dequeue(worker->pool->task_queue, &t)) {
DEBUG("Job thread %d picked up task %d", worker->id, t.task_id);
} else {
WARN("Job thread %d didnt pick up a task as queue was empty", worker->id);
pthread_mutex_unlock(&worker->pool->mutex);
break;
}
pthread_mutex_unlock(&worker->pool->mutex);
// Do the work
bool result = t.do_task(t.params, t.result_data);
// INFO("Task result was %s", result ? "success" : "failure");
if (result) {
pthread_mutex_lock(&worker->pool->mutex);
if (t.buffer_result_for_main_thread) {
deferred_task_result dtr = { .task_id = t.task_id,
.callback = t.on_success,
.result_data = t.result_data,
.result_data_size = t.result_data_size };
deferred_task_result_darray_push(worker->pool->results, dtr);
} else {
// call on complete from here.
}
} else {
// TODO
}
pthread_mutex_unlock(&worker->pool->mutex);
}
return NULL;
}
bool threadpool_create(threadpool *pool, u8 thread_count, u32 queue_size) {
INFO("Threadpool init");
pool->next_task_id = 0;
pool->context = NULL;
u8 num_worker_threads = thread_count;
if (thread_count > MAX_NUM_THREADS) {
ERROR_EXIT("Threadpool has a hard limit of %d threads, you tried to start one with %d",
MAX_NUM_THREADS, thread_count)
num_worker_threads = MAX_NUM_THREADS;
}
DEBUG("creating task queue with max length %d", queue_size);
pool->task_queue = ring_queue_new(sizeof(task), queue_size, NULL);
DEBUG("creating mutex and condition");
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->has_tasks, NULL);
pool->results = deferred_task_result_darray_new(256);
DEBUG("Spawning %d threads for the threadpool", thread_count);
for (u8 i = 0; i < num_worker_threads; i++) {
pool->workers[i].id = i;
pool->workers[i].pool = pool;
if (pthread_create(&pool->workers[i].thread, NULL, worker_factory, &pool->workers[i]) != 0) {
FATAL("OS error creating job thread");
return false;
};
}
return true;
}
bool threadpool_add_task(threadpool *pool, tpool_task_start do_task,
tpool_task_on_complete on_success, tpool_task_on_complete on_fail,
bool buffer_result_for_main_thread, void *param_data, u32 param_data_size,
u32 result_data_size) {
void *result_data = malloc(result_data_size);
task *work_task = malloc(sizeof(task));
work_task->task_id = 0;
work_task->do_task = do_task;
work_task->on_success = on_success;
work_task->on_failure = on_fail;
work_task->buffer_result_for_main_thread = buffer_result_for_main_thread;
work_task->param_size = param_data_size;
work_task->params = param_data;
work_task->result_data_size = result_data_size;
work_task->result_data = result_data;
// START critical section
if (pthread_mutex_lock(&pool->mutex) != 0) {
ERROR("Unable to get threadpool lock.");
return false;
}
work_task->task_id = pool->next_task_id;
pool->next_task_id++;
ring_queue_enqueue(pool->task_queue, work_task);
DEBUG("Enqueued job");
pthread_cond_broadcast(&pool->has_tasks);
if (pthread_mutex_unlock(&pool->mutex) != 0) {
ERROR("couldnt unlock threadpool after adding task.");
return false; // ?
}
// END critical section
return true;
}
void threadpool_process_results(threadpool *pool, int _num_to_process) {
pthread_mutex_lock(&pool->mutex);
size_t num_results = deferred_task_result_darray_len(pool->results);
if (num_results > 0) {
u32 _size = ((deferred_task_result *)pool->results->data)[num_results].result_data_size;
deferred_task_result res;
deferred_task_result_darray_pop(pool->results, &res);
pthread_mutex_unlock(&pool->mutex);
task_globals globals = { .pool = pool, .ctx = pool->context };
res.callback(&globals, res.result_data);
} else {
pthread_mutex_unlock(&pool->mutex);
}
}
void threadpool_set_ctx(threadpool *pool, void *ctx) { pool->context = ctx; }
|