migration: Add thread pool of optional load threads

Some drivers might want to make use of auxiliary helper threads during VM
state loading, for example to make sure that their blocking (sync) I/O
operations don't block the rest of the migration process.

Add a migration core managed thread pool to facilitate this use case.

The migration core will wait for these threads to finish before
(re)starting the VM at destination.

Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
Link: https://lore.kernel.org/qemu-devel/b09fd70369b6159c75847e69f235cb908b02570c.1741124640.git.maciej.szmigiero@oracle.com
Signed-off-by: Cédric Le Goater <clg@redhat.com>
This commit is contained in:
Maciej S. Szmigiero 2025-03-04 23:03:36 +01:00 committed by Cédric Le Goater
parent 18eb55546a
commit b1937fd1eb
6 changed files with 105 additions and 4 deletions

View file

@ -54,6 +54,7 @@
#include "qemu/job.h"
#include "qemu/main-loop.h"
#include "block/snapshot.h"
#include "block/thread-pool.h"
#include "qemu/cutils.h"
#include "io/channel-buffer.h"
#include "io/channel-file.h"
@ -131,6 +132,35 @@ static struct mig_cmd_args {
* generic extendable format with an exception for two old entities.
*/
/***********************************************************/
/* Optional load threads pool support */
static void qemu_loadvm_thread_pool_create(MigrationIncomingState *mis)
{
assert(!mis->load_threads);
mis->load_threads = thread_pool_new();
mis->load_threads_abort = false;
}
static void qemu_loadvm_thread_pool_destroy(MigrationIncomingState *mis)
{
qatomic_set(&mis->load_threads_abort, true);
bql_unlock(); /* Load threads might be waiting for BQL */
g_clear_pointer(&mis->load_threads, thread_pool_free);
bql_lock();
}
static bool qemu_loadvm_thread_pool_wait(MigrationState *s,
MigrationIncomingState *mis)
{
bql_unlock(); /* Let load threads do work requiring BQL */
thread_pool_wait(mis->load_threads);
bql_lock();
return !migrate_has_error(s);
}
/***********************************************************/
/* savevm/loadvm support */
@ -2783,16 +2813,68 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
return 0;
}
void qemu_loadvm_state_cleanup(void)
struct LoadThreadData {
MigrationLoadThread function;
void *opaque;
};
static int qemu_loadvm_load_thread(void *thread_opaque)
{
struct LoadThreadData *data = thread_opaque;
MigrationIncomingState *mis = migration_incoming_get_current();
g_autoptr(Error) local_err = NULL;
if (!data->function(data->opaque, &mis->load_threads_abort, &local_err)) {
MigrationState *s = migrate_get_current();
/*
* Can't set load_threads_abort here since processing of main migration
* channel data could still be happening, resulting in launching of new
* load threads.
*/
assert(local_err);
/*
* In case of multiple load threads failing which thread error
* return we end setting is purely arbitrary.
*/
migrate_set_error(s, local_err);
}
return 0;
}
void qemu_loadvm_start_load_thread(MigrationLoadThread function,
void *opaque)
{
MigrationIncomingState *mis = migration_incoming_get_current();
struct LoadThreadData *data;
/* We only set it from this thread so it's okay to read it directly */
assert(!mis->load_threads_abort);
data = g_new(struct LoadThreadData, 1);
data->function = function;
data->opaque = opaque;
thread_pool_submit_immediate(mis->load_threads, qemu_loadvm_load_thread,
data, g_free);
}
void qemu_loadvm_state_cleanup(MigrationIncomingState *mis)
{
SaveStateEntry *se;
trace_loadvm_state_cleanup();
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
if (se->ops && se->ops->load_cleanup) {
se->ops->load_cleanup(se->opaque);
}
}
qemu_loadvm_thread_pool_destroy(mis);
}
/* Return true if we should continue the migration, or false. */
@ -2943,6 +3025,7 @@ out:
int qemu_loadvm_state(QEMUFile *f)
{
MigrationState *s = migrate_get_current();
MigrationIncomingState *mis = migration_incoming_get_current();
Error *local_err = NULL;
int ret;
@ -2952,6 +3035,8 @@ int qemu_loadvm_state(QEMUFile *f)
return -EINVAL;
}
qemu_loadvm_thread_pool_create(mis);
ret = qemu_loadvm_state_header(f);
if (ret) {
return ret;
@ -2983,12 +3068,18 @@ int qemu_loadvm_state(QEMUFile *f)
/* When reaching here, it must be precopy */
if (ret == 0) {
if (migrate_has_error(migrate_get_current())) {
if (migrate_has_error(migrate_get_current()) ||
!qemu_loadvm_thread_pool_wait(s, mis)) {
ret = -EINVAL;
} else {
ret = qemu_file_get_error(f);
}
}
/*
* Set this flag unconditionally so we'll catch further attempts to
* start additional threads via an appropriate assert()
*/
qatomic_set(&mis->load_threads_abort, true);
/*
* Try to read in the VMDESC section as well, so that dumping tools that