mirror of
https://github.com/Motorhead1991/qemu.git
synced 2025-08-05 00:33:55 -06:00
thread-pool: Implement generic (non-AIO) pool support
Migration code wants to manage device data sending threads in one place. QEMU has an existing thread pool implementation, however it is limited to queuing AIO operations only and essentially has a 1:1 mapping between the current AioContext and the AIO ThreadPool in use. Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's GThreadPool. This brings a few new operations on a pool: * thread_pool_wait() operation waits until all the submitted work requests have finished. * thread_pool_set_max_threads() explicitly sets the maximum thread count in the pool. * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count in the pool to equal the number of still waiting in queue or unfinished work. Reviewed-by: Fabiano Rosas <farosas@suse.de> Reviewed-by: Peter Xu <peterx@redhat.com> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> Link: https://lore.kernel.org/qemu-devel/b1efaebdbea7cb7068b8fb74148777012383e12b.1741124640.git.maciej.szmigiero@oracle.com Signed-off-by: Cédric Le Goater <clg@redhat.com>
This commit is contained in:
parent
dc67daeed5
commit
b5aa74968b
2 changed files with 170 additions and 0 deletions
|
@ -374,3 +374,122 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
|
|||
qemu_mutex_destroy(&pool->lock);
|
||||
g_free(pool);
|
||||
}
|
||||
|
||||
struct ThreadPool {
|
||||
GThreadPool *t;
|
||||
size_t cur_work;
|
||||
QemuMutex cur_work_lock;
|
||||
QemuCond all_finished_cond;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
ThreadPoolFunc *func;
|
||||
void *opaque;
|
||||
GDestroyNotify opaque_destroy;
|
||||
} ThreadPoolElement;
|
||||
|
||||
static void thread_pool_func(gpointer data, gpointer user_data)
|
||||
{
|
||||
ThreadPool *pool = user_data;
|
||||
g_autofree ThreadPoolElement *el = data;
|
||||
|
||||
el->func(el->opaque);
|
||||
|
||||
if (el->opaque_destroy) {
|
||||
el->opaque_destroy(el->opaque);
|
||||
}
|
||||
|
||||
QEMU_LOCK_GUARD(&pool->cur_work_lock);
|
||||
|
||||
assert(pool->cur_work > 0);
|
||||
pool->cur_work--;
|
||||
|
||||
if (pool->cur_work == 0) {
|
||||
qemu_cond_signal(&pool->all_finished_cond);
|
||||
}
|
||||
}
|
||||
|
||||
ThreadPool *thread_pool_new(void)
|
||||
{
|
||||
ThreadPool *pool = g_new(ThreadPool, 1);
|
||||
|
||||
pool->cur_work = 0;
|
||||
qemu_mutex_init(&pool->cur_work_lock);
|
||||
qemu_cond_init(&pool->all_finished_cond);
|
||||
|
||||
pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
|
||||
/*
|
||||
* g_thread_pool_new() can only return errors if initial thread(s)
|
||||
* creation fails but we ask for 0 initial threads above.
|
||||
*/
|
||||
assert(pool->t);
|
||||
|
||||
return pool;
|
||||
}
|
||||
|
||||
void thread_pool_free(ThreadPool *pool)
|
||||
{
|
||||
/*
|
||||
* With _wait = TRUE this effectively waits for all
|
||||
* previously submitted work to complete first.
|
||||
*/
|
||||
g_thread_pool_free(pool->t, FALSE, TRUE);
|
||||
|
||||
qemu_cond_destroy(&pool->all_finished_cond);
|
||||
qemu_mutex_destroy(&pool->cur_work_lock);
|
||||
|
||||
g_free(pool);
|
||||
}
|
||||
|
||||
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
|
||||
void *opaque, GDestroyNotify opaque_destroy)
|
||||
{
|
||||
ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
|
||||
|
||||
el->func = func;
|
||||
el->opaque = opaque;
|
||||
el->opaque_destroy = opaque_destroy;
|
||||
|
||||
WITH_QEMU_LOCK_GUARD(&pool->cur_work_lock) {
|
||||
pool->cur_work++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Ignore the return value since this function can only return errors
|
||||
* if creation of an additional thread fails but even in this case the
|
||||
* provided work is still getting queued (just for the existing threads).
|
||||
*/
|
||||
g_thread_pool_push(pool->t, el, NULL);
|
||||
}
|
||||
|
||||
void thread_pool_submit_immediate(ThreadPool *pool, ThreadPoolFunc *func,
|
||||
void *opaque, GDestroyNotify opaque_destroy)
|
||||
{
|
||||
thread_pool_submit(pool, func, opaque, opaque_destroy);
|
||||
thread_pool_adjust_max_threads_to_work(pool);
|
||||
}
|
||||
|
||||
void thread_pool_wait(ThreadPool *pool)
|
||||
{
|
||||
QEMU_LOCK_GUARD(&pool->cur_work_lock);
|
||||
|
||||
while (pool->cur_work > 0) {
|
||||
qemu_cond_wait(&pool->all_finished_cond,
|
||||
&pool->cur_work_lock);
|
||||
}
|
||||
}
|
||||
|
||||
bool thread_pool_set_max_threads(ThreadPool *pool,
|
||||
int max_threads)
|
||||
{
|
||||
assert(max_threads > 0);
|
||||
|
||||
return g_thread_pool_set_max_threads(pool->t, max_threads, NULL);
|
||||
}
|
||||
|
||||
bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool)
|
||||
{
|
||||
QEMU_LOCK_GUARD(&pool->cur_work_lock);
|
||||
|
||||
return thread_pool_set_max_threads(pool, pool->cur_work);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue