Migartion pull request for 20240304

- Bryan's fix on multifd compression level API
 - Fabiano's mapped-ram series (base + multifd only)
 - Steve's amend on cpr document in qapi/
 -----BEGIN PGP SIGNATURE-----
 
 iIgEABYKADAWIQS5GE3CDMRX2s990ak7X8zN86vXBgUCZeUjKhIccGV0ZXJ4QHJl
 ZGhhdC5jb20ACgkQO1/MzfOr1wbv5QD/ZexBUsmZA5qyxgGvZ2yvlUBEGNOvtmKY
 kRdiYPU7khMA/0N43rn4LcqKCoq4+T+EAnYizGjIyhH/7BRUyn4DUxgO
 =AeEn
 -----END PGP SIGNATURE-----

Merge tag 'migration-next-pull-request' of https://gitlab.com/peterx/qemu into staging

Migartion pull request for 20240304

- Bryan's fix on multifd compression level API
- Fabiano's mapped-ram series (base + multifd only)
- Steve's amend on cpr document in qapi/

# -----BEGIN PGP SIGNATURE-----
#
# iIgEABYKADAWIQS5GE3CDMRX2s990ak7X8zN86vXBgUCZeUjKhIccGV0ZXJ4QHJl
# ZGhhdC5jb20ACgkQO1/MzfOr1wbv5QD/ZexBUsmZA5qyxgGvZ2yvlUBEGNOvtmKY
# kRdiYPU7khMA/0N43rn4LcqKCoq4+T+EAnYizGjIyhH/7BRUyn4DUxgO
# =AeEn
# -----END PGP SIGNATURE-----
# gpg: Signature made Mon 04 Mar 2024 01:26:02 GMT
# gpg:                using EDDSA key B9184DC20CC457DACF7DD1A93B5FCCCDF3ABD706
# gpg:                issuer "peterx@redhat.com"
# gpg: Good signature from "Peter Xu <xzpeter@gmail.com>" [marginal]
# gpg:                 aka "Peter Xu <peterx@redhat.com>" [marginal]
# gpg: WARNING: This key is not certified with sufficiently trusted signatures!
# gpg:          It is not certain that the signature belongs to the owner.
# Primary key fingerprint: B918 4DC2 0CC4 57DA CF7D  D1A9 3B5F CCCD F3AB D706

* tag 'migration-next-pull-request' of https://gitlab.com/peterx/qemu: (27 commits)
  migration/multifd: Document two places for mapped-ram
  tests/qtest/migration: Add a multifd + mapped-ram migration test
  migration/multifd: Add mapped-ram support to fd: URI
  migration/multifd: Support incoming mapped-ram stream format
  migration/multifd: Support outgoing mapped-ram stream format
  migration/multifd: Prepare multifd sync for mapped-ram migration
  migration/multifd: Add incoming QIOChannelFile support
  migration/multifd: Add outgoing QIOChannelFile support
  migration/multifd: Add a wrapper for channels_created
  migration/multifd: Allow receiving pages without packets
  migration/multifd: Allow multifd without packets
  migration/multifd: Decouple recv method from pages
  migration/multifd: Rename MultiFDSend|RecvParams::data to compress_data
  tests/qtest/migration: Add tests for mapped-ram file-based migration
  migration/ram: Add incoming 'mapped-ram' migration
  migration/ram: Add outgoing 'mapped-ram' migration
  migration: Add mapped-ram URI compatibility check
  migration/ram: Introduce 'mapped-ram' migration capability
  migration/qemu-file: add utility methods for working with seekable channels
  io: fsync before closing a file channel
  ...

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>

# Conflicts:
#	migration/ram.c
This commit is contained in:
Peter Maydell 2024-03-05 11:19:58 +00:00
commit c90cfb5294
27 changed files with 1666 additions and 160 deletions

View file

@ -15,18 +15,41 @@
*/
#include "qemu/osdep.h"
#include "qapi/error.h"
#include "channel.h"
#include "fd.h"
#include "migration.h"
#include "monitor/monitor.h"
#include "io/channel-file.h"
#include "io/channel-util.h"
#include "options.h"
#include "trace.h"
static struct FdOutgoingArgs {
int fd;
} outgoing_args;
int fd_args_get_fd(void)
{
return outgoing_args.fd;
}
void fd_cleanup_outgoing_migration(void)
{
if (outgoing_args.fd > 0) {
close(outgoing_args.fd);
outgoing_args.fd = -1;
}
}
void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp)
{
QIOChannel *ioc;
int fd = monitor_get_fd(monitor_cur(), fdname, errp);
outgoing_args.fd = -1;
if (fd == -1) {
return;
}
@ -38,6 +61,8 @@ void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **
return;
}
outgoing_args.fd = fd;
qio_channel_set_name(ioc, "migration-fd-outgoing");
migration_channel_connect(s, ioc, NULL, NULL);
object_unref(OBJECT(ioc));
@ -73,4 +98,23 @@ void fd_start_incoming_migration(const char *fdname, Error **errp)
fd_accept_incoming_migration,
NULL, NULL,
g_main_context_get_thread_default());
if (migrate_multifd()) {
int channels = migrate_multifd_channels();
while (channels--) {
ioc = QIO_CHANNEL(qio_channel_file_new_fd(dup(fd)));
if (QIO_CHANNEL_FILE(ioc)->fd == -1) {
error_setg(errp, "Failed to duplicate fd %d", fd);
return;
}
qio_channel_set_name(ioc, "migration-fd-incoming");
qio_channel_add_watch_full(ioc, G_IO_IN,
fd_accept_incoming_migration,
NULL, NULL,
g_main_context_get_thread_default());
}
}
}

View file

@ -20,4 +20,6 @@ void fd_start_incoming_migration(const char *fdname, Error **errp);
void fd_start_outgoing_migration(MigrationState *s, const char *fdname,
Error **errp);
void fd_cleanup_outgoing_migration(void);
int fd_args_get_fd(void);
#endif

View file

@ -6,17 +6,25 @@
*/
#include "qemu/osdep.h"
#include "exec/ramblock.h"
#include "qemu/cutils.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
#include "channel.h"
#include "fd.h"
#include "file.h"
#include "migration.h"
#include "io/channel-file.h"
#include "io/channel-util.h"
#include "options.h"
#include "trace.h"
#define OFFSET_OPTION ",offset="
static struct FileOutgoingArgs {
char *fname;
} outgoing_args;
/* Remove the offset option from @filespec and return it in @offsetp. */
int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp)
@ -36,6 +44,41 @@ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp)
return 0;
}
void file_cleanup_outgoing_migration(void)
{
g_free(outgoing_args.fname);
outgoing_args.fname = NULL;
}
bool file_send_channel_create(gpointer opaque, Error **errp)
{
QIOChannelFile *ioc;
int flags = O_WRONLY;
bool ret = false;
int fd = fd_args_get_fd();
if (fd && fd != -1) {
ioc = qio_channel_file_new_fd(dup(fd));
} else {
ioc = qio_channel_file_new_path(outgoing_args.fname, flags, 0, errp);
if (!ioc) {
goto out;
}
}
multifd_channel_connect(opaque, QIO_CHANNEL(ioc));
ret = true;
out:
/*
* File channel creation is synchronous. However posting this
* semaphore here is simpler than adding a special case.
*/
multifd_send_channel_created();
return ret;
}
void file_start_outgoing_migration(MigrationState *s,
FileMigrationArgs *file_args, Error **errp)
{
@ -52,6 +95,8 @@ void file_start_outgoing_migration(MigrationState *s,
return;
}
outgoing_args.fname = g_strdup(filename);
ioc = QIO_CHANNEL(fioc);
if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
return;
@ -74,7 +119,8 @@ void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp)
g_autofree char *filename = g_strdup(file_args->filename);
QIOChannelFile *fioc = NULL;
uint64_t offset = file_args->offset;
QIOChannel *ioc;
int channels = 1;
int i = 0;
trace_migration_file_incoming(filename);
@ -83,13 +129,100 @@ void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp)
return;
}
ioc = QIO_CHANNEL(fioc);
if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
if (offset &&
qio_channel_io_seek(QIO_CHANNEL(fioc), offset, SEEK_SET, errp) < 0) {
return;
}
qio_channel_set_name(QIO_CHANNEL(ioc), "migration-file-incoming");
qio_channel_add_watch_full(ioc, G_IO_IN,
file_accept_incoming_migration,
NULL, NULL,
g_main_context_get_thread_default());
if (migrate_multifd()) {
channels += migrate_multifd_channels();
}
do {
QIOChannel *ioc = QIO_CHANNEL(fioc);
qio_channel_set_name(ioc, "migration-file-incoming");
qio_channel_add_watch_full(ioc, G_IO_IN,
file_accept_incoming_migration,
NULL, NULL,
g_main_context_get_thread_default());
fioc = qio_channel_file_new_fd(dup(fioc->fd));
if (!fioc || fioc->fd == -1) {
error_setg(errp, "Error creating migration incoming channel");
break;
}
} while (++i < channels);
}
int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov,
int niov, RAMBlock *block, Error **errp)
{
ssize_t ret = -1;
int i, slice_idx, slice_num;
uintptr_t base, next, offset;
size_t len;
slice_idx = 0;
slice_num = 1;
/*
* If the iov array doesn't have contiguous elements, we need to
* split it in slices because we only have one file offset for the
* whole iov. Do this here so callers don't need to break the iov
* array themselves.
*/
for (i = 0; i < niov; i++, slice_num++) {
base = (uintptr_t) iov[i].iov_base;
if (i != niov - 1) {
len = iov[i].iov_len;
next = (uintptr_t) iov[i + 1].iov_base;
if (base + len == next) {
continue;
}
}
/*
* Use the offset of the first element of the segment that
* we're sending.
*/
offset = (uintptr_t) iov[slice_idx].iov_base - (uintptr_t) block->host;
if (offset >= block->used_length) {
error_setg(errp, "offset " RAM_ADDR_FMT
"outside of ramblock %s range", offset, block->idstr);
ret = -1;
break;
}
ret = qio_channel_pwritev(ioc, &iov[slice_idx], slice_num,
block->pages_offset + offset, errp);
if (ret < 0) {
break;
}
slice_idx += slice_num;
slice_num = 0;
}
return (ret < 0) ? ret : 0;
}
int multifd_file_recv_data(MultiFDRecvParams *p, Error **errp)
{
MultiFDRecvData *data = p->data;
size_t ret;
ret = qio_channel_pread(p->c, (char *) data->opaque,
data->size, data->file_offset, errp);
if (ret != data->size) {
error_prepend(errp,
"multifd recv (%u): read 0x%zx, expected 0x%zx",
p->id, ret, data->size);
return -1;
}
return 0;
}

View file

@ -9,10 +9,18 @@
#define QEMU_MIGRATION_FILE_H
#include "qapi/qapi-types-migration.h"
#include "io/task.h"
#include "channel.h"
#include "multifd.h"
void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp);
void file_start_outgoing_migration(MigrationState *s,
FileMigrationArgs *file_args, Error **errp);
int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp);
void file_cleanup_outgoing_migration(void);
bool file_send_channel_create(gpointer opaque, Error **errp);
int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov,
int niov, RAMBlock *block, Error **errp);
int multifd_file_recv_data(MultiFDRecvParams *p, Error **errp);
#endif

View file

@ -140,9 +140,38 @@ static bool transport_supports_multi_channels(MigrationAddress *addr)
if (addr->transport == MIGRATION_ADDRESS_TYPE_SOCKET) {
SocketAddress *saddr = &addr->u.socket;
return saddr->type == SOCKET_ADDRESS_TYPE_INET ||
saddr->type == SOCKET_ADDRESS_TYPE_UNIX ||
saddr->type == SOCKET_ADDRESS_TYPE_VSOCK;
if (saddr->type == SOCKET_ADDRESS_TYPE_FD) {
return migrate_mapped_ram();
}
return (saddr->type == SOCKET_ADDRESS_TYPE_INET ||
saddr->type == SOCKET_ADDRESS_TYPE_UNIX ||
saddr->type == SOCKET_ADDRESS_TYPE_VSOCK);
} else if (addr->transport == MIGRATION_ADDRESS_TYPE_FILE) {
return migrate_mapped_ram();
} else {
return false;
}
}
static bool migration_needs_seekable_channel(void)
{
return migrate_mapped_ram();
}
static bool transport_supports_seeking(MigrationAddress *addr)
{
if (addr->transport == MIGRATION_ADDRESS_TYPE_FILE) {
return true;
}
/*
* At this point, the user might not yet have passed the file
* descriptor to QEMU, so we cannot know for sure whether it
* refers to a plain file or a socket. Let it through anyway.
*/
if (addr->transport == MIGRATION_ADDRESS_TYPE_SOCKET) {
return addr->u.socket.type == SOCKET_ADDRESS_TYPE_FD;
}
return false;
@ -152,6 +181,12 @@ static bool
migration_channels_and_transport_compatible(MigrationAddress *addr,
Error **errp)
{
if (migration_needs_seekable_channel() &&
!transport_supports_seeking(addr)) {
error_setg(errp, "Migration requires seekable transport (e.g. file)");
return false;
}
if (migration_needs_multiple_sockets() &&
!transport_supports_multi_channels(addr)) {
error_setg(errp, "Migration requires multi-channel URIs (e.g. tcp)");
@ -881,7 +916,8 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
uint32_t channel_magic = 0;
int ret = 0;
if (migrate_multifd() && !migrate_postcopy_ram() &&
if (migrate_multifd() && !migrate_mapped_ram() &&
!migrate_postcopy_ram() &&
qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
/*
* With multiple channels, it is possible that we receive channels
@ -1950,6 +1986,18 @@ static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc,
return false;
}
if (migrate_mapped_ram()) {
if (migrate_tls()) {
error_setg(errp, "Cannot use TLS with mapped-ram");
return false;
}
if (migrate_multifd_compression()) {
error_setg(errp, "Cannot use compression with mapped-ram");
return false;
}
}
if (migrate_mode_is_cpr(s)) {
const char *conflict = NULL;

View file

@ -69,7 +69,7 @@ static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
err_msg = "out of memory for buf";
goto err_free_zbuff;
}
p->data = z;
p->compress_data = z;
return 0;
err_free_zbuff:
@ -92,15 +92,15 @@ err_free_z:
*/
static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
{
struct zlib_data *z = p->data;
struct zlib_data *z = p->compress_data;
deflateEnd(&z->zs);
g_free(z->zbuff);
z->zbuff = NULL;
g_free(z->buf);
z->buf = NULL;
g_free(p->data);
p->data = NULL;
g_free(p->compress_data);
p->compress_data = NULL;
}
/**
@ -117,7 +117,7 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
{
MultiFDPages_t *pages = p->pages;
struct zlib_data *z = p->data;
struct zlib_data *z = p->compress_data;
z_stream *zs = &z->zs;
uint32_t out_size = 0;
int ret;
@ -194,7 +194,7 @@ static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
struct zlib_data *z = g_new0(struct zlib_data, 1);
z_stream *zs = &z->zs;
p->data = z;
p->compress_data = z;
zs->zalloc = Z_NULL;
zs->zfree = Z_NULL;
zs->opaque = Z_NULL;
@ -224,17 +224,17 @@ static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
*/
static void zlib_recv_cleanup(MultiFDRecvParams *p)
{
struct zlib_data *z = p->data;
struct zlib_data *z = p->compress_data;
inflateEnd(&z->zs);
g_free(z->zbuff);
z->zbuff = NULL;
g_free(p->data);
p->data = NULL;
g_free(p->compress_data);
p->compress_data = NULL;
}
/**
* zlib_recv_pages: read the data from the channel into actual pages
* zlib_recv: read the data from the channel into actual pages
*
* Read the compressed buffer, and uncompress it into the actual
* pages.
@ -244,9 +244,9 @@ static void zlib_recv_cleanup(MultiFDRecvParams *p)
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zlib_recv_pages(MultiFDRecvParams *p, Error **errp)
static int zlib_recv(MultiFDRecvParams *p, Error **errp)
{
struct zlib_data *z = p->data;
struct zlib_data *z = p->compress_data;
z_stream *zs = &z->zs;
uint32_t in_size = p->next_packet_size;
/* we measure the change of total_out */
@ -319,7 +319,7 @@ static MultiFDMethods multifd_zlib_ops = {
.send_prepare = zlib_send_prepare,
.recv_setup = zlib_recv_setup,
.recv_cleanup = zlib_recv_cleanup,
.recv_pages = zlib_recv_pages
.recv = zlib_recv
};
static void multifd_zlib_register(void)

View file

@ -52,7 +52,7 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
struct zstd_data *z = g_new0(struct zstd_data, 1);
int res;
p->data = z;
p->compress_data = z;
z->zcs = ZSTD_createCStream();
if (!z->zcs) {
g_free(z);
@ -90,14 +90,14 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
*/
static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
{
struct zstd_data *z = p->data;
struct zstd_data *z = p->compress_data;
ZSTD_freeCStream(z->zcs);
z->zcs = NULL;
g_free(z->zbuff);
z->zbuff = NULL;
g_free(p->data);
p->data = NULL;
g_free(p->compress_data);
p->compress_data = NULL;
}
/**
@ -114,7 +114,7 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
{
MultiFDPages_t *pages = p->pages;
struct zstd_data *z = p->data;
struct zstd_data *z = p->compress_data;
int ret;
uint32_t i;
@ -183,7 +183,7 @@ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
struct zstd_data *z = g_new0(struct zstd_data, 1);
int ret;
p->data = z;
p->compress_data = z;
z->zds = ZSTD_createDStream();
if (!z->zds) {
g_free(z);
@ -221,18 +221,18 @@ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
*/
static void zstd_recv_cleanup(MultiFDRecvParams *p)
{
struct zstd_data *z = p->data;
struct zstd_data *z = p->compress_data;
ZSTD_freeDStream(z->zds);
z->zds = NULL;
g_free(z->zbuff);
z->zbuff = NULL;
g_free(p->data);
p->data = NULL;
g_free(p->compress_data);
p->compress_data = NULL;
}
/**
* zstd_recv_pages: read the data from the channel into actual pages
* zstd_recv: read the data from the channel into actual pages
*
* Read the compressed buffer, and uncompress it into the actual
* pages.
@ -242,13 +242,13 @@ static void zstd_recv_cleanup(MultiFDRecvParams *p)
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zstd_recv_pages(MultiFDRecvParams *p, Error **errp)
static int zstd_recv(MultiFDRecvParams *p, Error **errp)
{
uint32_t in_size = p->next_packet_size;
uint32_t out_size = 0;
uint32_t expected_size = p->normal_num * p->page_size;
uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
struct zstd_data *z = p->data;
struct zstd_data *z = p->compress_data;
int ret;
int i;
@ -310,7 +310,7 @@ static MultiFDMethods multifd_zstd_ops = {
.send_prepare = zstd_send_prepare,
.recv_setup = zstd_recv_setup,
.recv_cleanup = zstd_recv_cleanup,
.recv_pages = zstd_recv_pages
.recv = zstd_recv
};
static void multifd_zstd_register(void)

View file

@ -17,7 +17,8 @@
#include "exec/ramblock.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
#include "ram.h"
#include "fd.h"
#include "file.h"
#include "migration.h"
#include "migration-stats.h"
#include "socket.h"
@ -28,6 +29,7 @@
#include "threadinfo.h"
#include "options.h"
#include "qemu/yank.h"
#include "io/channel-file.h"
#include "io/channel-socket.h"
#include "yank_functions.h"
@ -81,9 +83,13 @@ struct {
struct {
MultiFDRecvParams *params;
MultiFDRecvData *data;
/* number of created threads */
int count;
/* syncs main thread and channels */
/*
* This is always posted by the recv threads, the migration thread
* uses it to wait for recv threads to finish assigned tasks.
*/
QemuSemaphore sem_sync;
/* global number of generated multifd packets */
uint64_t packet_num;
@ -92,6 +98,27 @@ struct {
MultiFDMethods *ops;
} *multifd_recv_state;
static bool multifd_use_packets(void)
{
return !migrate_mapped_ram();
}
void multifd_send_channel_created(void)
{
qemu_sem_post(&multifd_send_state->channels_created);
}
static void multifd_set_file_bitmap(MultiFDSendParams *p)
{
MultiFDPages_t *pages = p->pages;
assert(pages->block);
for (int i = 0; i < p->pages->num; i++) {
ramblock_set_file_bmap_atomic(pages->block, pages->offset[i]);
}
}
/* Multifd without compression */
/**
@ -122,6 +149,19 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
return;
}
static void multifd_send_prepare_iovs(MultiFDSendParams *p)
{
MultiFDPages_t *pages = p->pages;
for (int i = 0; i < pages->num; i++) {
p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
p->iov[p->iovs_num].iov_len = p->page_size;
p->iovs_num++;
}
p->next_packet_size = pages->num * p->page_size;
}
/**
* nocomp_send_prepare: prepare date to be able to send
*
@ -136,9 +176,15 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
{
bool use_zero_copy_send = migrate_zero_copy_send();
MultiFDPages_t *pages = p->pages;
int ret;
if (!multifd_use_packets()) {
multifd_send_prepare_iovs(p);
multifd_set_file_bitmap(p);
return 0;
}
if (!use_zero_copy_send) {
/*
* Only !zerocopy needs the header in IOV; zerocopy will
@ -147,13 +193,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
multifd_send_prepare_header(p);
}
for (int i = 0; i < pages->num; i++) {
p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
p->iov[p->iovs_num].iov_len = p->page_size;
p->iovs_num++;
}
p->next_packet_size = pages->num * p->page_size;
multifd_send_prepare_iovs(p);
p->flags |= MULTIFD_FLAG_NOCOMP;
multifd_send_fill_packet(p);
@ -197,7 +237,7 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p)
}
/**
* nocomp_recv_pages: read the data from the channel into actual pages
* nocomp_recv: read the data from the channel
*
* For no compression we just need to read things into the correct place.
*
@ -206,9 +246,15 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p)
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp)
static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
{
uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
uint32_t flags;
if (!multifd_use_packets()) {
return multifd_file_recv_data(p, errp);
}
flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
if (flags != MULTIFD_FLAG_NOCOMP) {
error_setg(errp, "multifd %u: flags received %x flags expected %x",
@ -228,7 +274,7 @@ static MultiFDMethods multifd_nocomp_ops = {
.send_prepare = nocomp_send_prepare,
.recv_setup = nocomp_recv_setup,
.recv_cleanup = nocomp_recv_cleanup,
.recv_pages = nocomp_recv_pages
.recv = nocomp_recv
};
static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
@ -663,6 +709,19 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
{
if (p->c) {
migration_ioc_unregister_yank(p->c);
/*
* An explicit close() on the channel here is normally not
* required, but can be helpful for "file:" iochannels, where it
* will include fdatasync() to make sure the data is flushed to the
* disk backend.
*
* The object_unref() cannot guarantee that because: (1) finalize()
* of the iochannel is only triggered on the last reference, and
* it's not guaranteed that we always hold the last refcount when
* reaching here, and, (2) even if finalize() is invoked, it only
* does a close(fd) without data flush.
*/
qio_channel_close(p->c, &error_abort);
object_unref(OBJECT(p->c));
p->c = NULL;
}
@ -684,6 +743,8 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
static void multifd_send_cleanup_state(void)
{
file_cleanup_outgoing_migration();
fd_cleanup_outgoing_migration();
socket_cleanup_outgoing_migration();
qemu_sem_destroy(&multifd_send_state->channels_created);
qemu_sem_destroy(&multifd_send_state->channels_ready);
@ -795,15 +856,18 @@ static void *multifd_send_thread(void *opaque)
MigrationThread *thread = NULL;
Error *local_err = NULL;
int ret = 0;
bool use_packets = multifd_use_packets();
thread = migration_threads_add(p->name, qemu_get_thread_id());
trace_multifd_send_thread_start(p->id);
rcu_register_thread();
if (multifd_send_initial_packet(p, &local_err) < 0) {
ret = -1;
goto out;
if (use_packets) {
if (multifd_send_initial_packet(p, &local_err) < 0) {
ret = -1;
goto out;
}
}
while (true) {
@ -829,8 +893,15 @@ static void *multifd_send_thread(void *opaque)
break;
}
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
0, p->write_flags, &local_err);
if (migrate_mapped_ram()) {
ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
p->pages->block, &local_err);
} else {
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
NULL, 0, p->write_flags,
&local_err);
}
if (ret != 0) {
break;
}
@ -854,16 +925,20 @@ static void *multifd_send_thread(void *opaque)
* it doesn't require explicit memory barriers.
*/
assert(qatomic_read(&p->pending_sync));
p->flags = MULTIFD_FLAG_SYNC;
multifd_send_fill_packet(p);
ret = qio_channel_write_all(p->c, (void *)p->packet,
p->packet_len, &local_err);
if (ret != 0) {
break;
if (use_packets) {
p->flags = MULTIFD_FLAG_SYNC;
multifd_send_fill_packet(p);
ret = qio_channel_write_all(p->c, (void *)p->packet,
p->packet_len, &local_err);
if (ret != 0) {
break;
}
/* p->next_packet_size will always be zero for a SYNC packet */
stat64_add(&mig_stats.multifd_bytes, p->packet_len);
p->flags = 0;
}
/* p->next_packet_size will always be zero for a SYNC packet */
stat64_add(&mig_stats.multifd_bytes, p->packet_len);
p->flags = 0;
qatomic_set(&p->pending_sync, false);
qemu_sem_post(&p->sem_sync);
}
@ -939,7 +1014,7 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p,
return true;
}
static void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
{
qio_channel_set_delay(ioc, false);
@ -990,7 +1065,7 @@ out:
* Here we're not interested whether creation succeeded, only that
* it happened at all.
*/
qemu_sem_post(&multifd_send_state->channels_created);
multifd_send_channel_created();
if (ret) {
return;
@ -1007,9 +1082,14 @@ out:
error_free(local_err);
}
static void multifd_new_send_channel_create(gpointer opaque)
static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
{
if (!multifd_use_packets()) {
return file_send_channel_create(opaque, errp);
}
socket_send_channel_create(multifd_new_send_channel_async, opaque);
return true;
}
bool multifd_send_setup(void)
@ -1018,6 +1098,7 @@ bool multifd_send_setup(void)
Error *local_err = NULL;
int thread_count, ret = 0;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
bool use_packets = multifd_use_packets();
uint8_t i;
if (!migrate_multifd()) {
@ -1040,18 +1121,27 @@ bool multifd_send_setup(void)
qemu_sem_init(&p->sem_sync, 0);
p->id = i;
p->pages = multifd_pages_init(page_count);
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
p->packet->version = cpu_to_be32(MULTIFD_VERSION);
if (use_packets) {
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
p->packet->version = cpu_to_be32(MULTIFD_VERSION);
/* We need one extra place for the packet header */
p->iov = g_new0(struct iovec, page_count + 1);
} else {
p->iov = g_new0(struct iovec, page_count);
}
p->name = g_strdup_printf("multifdsend_%d", i);
/* We need one extra place for the packet header */
p->iov = g_new0(struct iovec, page_count + 1);
p->page_size = qemu_target_page_size();
p->page_count = page_count;
p->write_flags = 0;
multifd_new_send_channel_create(p);
if (!multifd_new_send_channel_create(p, &local_err)) {
return false;
}
}
/*
@ -1083,6 +1173,57 @@ bool multifd_send_setup(void)
return true;
}
bool multifd_recv(void)
{
int i;
static int next_recv_channel;
MultiFDRecvParams *p = NULL;
MultiFDRecvData *data = multifd_recv_state->data;
/*
* next_channel can remain from a previous migration that was
* using more channels, so ensure it doesn't overflow if the
* limit is lower now.
*/
next_recv_channel %= migrate_multifd_channels();
for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
if (multifd_recv_should_exit()) {
return false;
}
p = &multifd_recv_state->params[i];
if (qatomic_read(&p->pending_job) == false) {
next_recv_channel = (i + 1) % migrate_multifd_channels();
break;
}
}
/*
* Order pending_job read before manipulating p->data below. Pairs
* with qatomic_store_release() at multifd_recv_thread().
*/
smp_mb_acquire();
assert(!p->data->size);
multifd_recv_state->data = p->data;
p->data = data;
/*
* Order p->data update before setting pending_job. Pairs with
* qatomic_load_acquire() at multifd_recv_thread().
*/
qatomic_store_release(&p->pending_job, true);
qemu_sem_post(&p->sem);
return true;
}
MultiFDRecvData *multifd_get_recv_data(void)
{
return multifd_recv_state->data;
}
static void multifd_recv_terminate_threads(Error *err)
{
int i;
@ -1107,10 +1248,27 @@ static void multifd_recv_terminate_threads(Error *err)
MultiFDRecvParams *p = &multifd_recv_state->params[i];
/*
* multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
* however try to wakeup it without harm in cleanup phase.
* The migration thread and channels interact differently
* depending on the presence of packets.
*/
qemu_sem_post(&p->sem_sync);
if (multifd_use_packets()) {
/*
* The channel receives as long as there are packets. When
* packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
* channel waits for the migration thread to sync. If the
* sync never happens, do it here.
*/
qemu_sem_post(&p->sem_sync);
} else {
/*
* The channel waits for the migration thread to give it
* work. When the migration thread runs out of work, it
* releases the channel and waits for any pending work to
* finish. If we reach here (e.g. due to error) before the
* work runs out, release the channel.
*/
qemu_sem_post(&p->sem);
}
/*
* We could arrive here for two reasons:
@ -1138,6 +1296,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
p->c = NULL;
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem_sync);
qemu_sem_destroy(&p->sem);
g_free(p->name);
p->name = NULL;
p->packet_len = 0;
@ -1155,6 +1314,8 @@ static void multifd_recv_cleanup_state(void)
qemu_sem_destroy(&multifd_recv_state->sem_sync);
g_free(multifd_recv_state->params);
multifd_recv_state->params = NULL;
g_free(multifd_recv_state->data);
multifd_recv_state->data = NULL;
g_free(multifd_recv_state);
multifd_recv_state = NULL;
}
@ -1182,18 +1343,53 @@ void multifd_recv_cleanup(void)
void multifd_recv_sync_main(void)
{
int thread_count = migrate_multifd_channels();
bool file_based = !multifd_use_packets();
int i;
if (!migrate_multifd()) {
return;
}
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
trace_multifd_recv_sync_main_wait(p->id);
/*
* File-based channels don't use packets and therefore need to
* wait for more work. Release them to start the sync.
*/
if (file_based) {
for (i = 0; i < thread_count; i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
trace_multifd_recv_sync_main_signal(p->id);
qemu_sem_post(&p->sem);
}
}
/*
* Initiate the synchronization by waiting for all channels.
*
* For socket-based migration this means each channel has received
* the SYNC packet on the stream.
*
* For file-based migration this means each channel is done with
* the work (pending_job=false).
*/
for (i = 0; i < thread_count; i++) {
trace_multifd_recv_sync_main_wait(i);
qemu_sem_wait(&multifd_recv_state->sem_sync);
}
for (i = 0; i < migrate_multifd_channels(); i++) {
if (file_based) {
/*
* For file-based loading is done in one iteration. We're
* done.
*/
return;
}
/*
* Sync done. Release the channels for the next iteration.
*/
for (i = 0; i < thread_count; i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
WITH_QEMU_LOCK_GUARD(&p->mutex) {
@ -1211,46 +1407,87 @@ static void *multifd_recv_thread(void *opaque)
{
MultiFDRecvParams *p = opaque;
Error *local_err = NULL;
bool use_packets = multifd_use_packets();
int ret;
trace_multifd_recv_thread_start(p->id);
rcu_register_thread();
while (true) {
uint32_t flags;
uint32_t flags = 0;
bool has_data = false;
p->normal_num = 0;
if (multifd_recv_should_exit()) {
break;
}
if (use_packets) {
if (multifd_recv_should_exit()) {
break;
}
ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
p->packet_len, &local_err);
if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
break;
}
ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
p->packet_len, &local_err);
if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
break;
}
qemu_mutex_lock(&p->mutex);
ret = multifd_recv_unfill_packet(p, &local_err);
if (ret) {
qemu_mutex_lock(&p->mutex);
ret = multifd_recv_unfill_packet(p, &local_err);
if (ret) {
qemu_mutex_unlock(&p->mutex);
break;
}
flags = p->flags;
/* recv methods don't know how to handle the SYNC flag */
p->flags &= ~MULTIFD_FLAG_SYNC;
has_data = !!p->normal_num;
qemu_mutex_unlock(&p->mutex);
break;
} else {
/*
* No packets, so we need to wait for the vmstate code to
* give us work.
*/
qemu_sem_wait(&p->sem);
if (multifd_recv_should_exit()) {
break;
}
/* pairs with qatomic_store_release() at multifd_recv() */
if (!qatomic_load_acquire(&p->pending_job)) {
/*
* Migration thread did not send work, this is
* equivalent to pending_sync on the sending
* side. Post sem_sync to notify we reached this
* point.
*/
qemu_sem_post(&multifd_recv_state->sem_sync);
continue;
}
has_data = !!p->data->size;
}
flags = p->flags;
/* recv methods don't know how to handle the SYNC flag */
p->flags &= ~MULTIFD_FLAG_SYNC;
qemu_mutex_unlock(&p->mutex);
if (p->normal_num) {
ret = multifd_recv_state->ops->recv_pages(p, &local_err);
if (has_data) {
ret = multifd_recv_state->ops->recv(p, &local_err);
if (ret != 0) {
break;
}
}
if (flags & MULTIFD_FLAG_SYNC) {
qemu_sem_post(&multifd_recv_state->sem_sync);
qemu_sem_wait(&p->sem_sync);
if (use_packets) {
if (flags & MULTIFD_FLAG_SYNC) {
qemu_sem_post(&multifd_recv_state->sem_sync);
qemu_sem_wait(&p->sem_sync);
}
} else {
p->total_normal_pages += p->data->size / qemu_target_page_size();
p->data->size = 0;
/*
* Order data->size update before clearing
* pending_job. Pairs with smp_mb_acquire() at
* multifd_recv().
*/
qatomic_store_release(&p->pending_job, false);
}
}
@ -1269,6 +1506,7 @@ int multifd_recv_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
bool use_packets = multifd_use_packets();
uint8_t i;
/*
@ -1282,6 +1520,10 @@ int multifd_recv_setup(Error **errp)
thread_count = migrate_multifd_channels();
multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
multifd_recv_state->data->size = 0;
qatomic_set(&multifd_recv_state->count, 0);
qatomic_set(&multifd_recv_state->exiting, 0);
qemu_sem_init(&multifd_recv_state->sem_sync, 0);
@ -1292,10 +1534,18 @@ int multifd_recv_setup(Error **errp)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem_sync, 0);
qemu_sem_init(&p->sem, 0);
p->pending_job = false;
p->id = i;
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
p->data = g_new0(MultiFDRecvData, 1);
p->data->size = 0;
if (use_packets) {
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
}
p->name = g_strdup_printf("multifdrecv_%d", i);
p->iov = g_new0(struct iovec, page_count);
p->normal = g_new0(ram_addr_t, page_count);
@ -1339,18 +1589,23 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
{
MultiFDRecvParams *p;
Error *local_err = NULL;
bool use_packets = multifd_use_packets();
int id;
id = multifd_recv_initial_packet(ioc, &local_err);
if (id < 0) {
multifd_recv_terminate_threads(local_err);
error_propagate_prepend(errp, local_err,
"failed to receive packet"
" via multifd channel %d: ",
qatomic_read(&multifd_recv_state->count));
return;
if (use_packets) {
id = multifd_recv_initial_packet(ioc, &local_err);
if (id < 0) {
multifd_recv_terminate_threads(local_err);
error_propagate_prepend(errp, local_err,
"failed to receive packet"
" via multifd channel %d: ",
qatomic_read(&multifd_recv_state->count));
return;
}
trace_multifd_recv_new_channel(id);
} else {
id = qatomic_read(&multifd_recv_state->count);
}
trace_multifd_recv_new_channel(id);
p = &multifd_recv_state->params[id];
if (p->c != NULL) {

View file

@ -13,8 +13,13 @@
#ifndef QEMU_MIGRATION_MULTIFD_H
#define QEMU_MIGRATION_MULTIFD_H
#include "ram.h"
typedef struct MultiFDRecvData MultiFDRecvData;
bool multifd_send_setup(void);
void multifd_send_shutdown(void);
void multifd_send_channel_created(void);
int multifd_recv_setup(Error **errp);
void multifd_recv_cleanup(void);
void multifd_recv_shutdown(void);
@ -23,6 +28,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
void multifd_recv_sync_main(void);
int multifd_send_sync_main(void);
bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
bool multifd_recv(void);
MultiFDRecvData *multifd_get_recv_data(void);
/* Multifd Compression flags */
#define MULTIFD_FLAG_SYNC (1 << 0)
@ -63,6 +70,13 @@ typedef struct {
RAMBlock *block;
} MultiFDPages_t;
struct MultiFDRecvData {
void *opaque;
size_t size;
/* for preadv */
off_t file_offset;
};
typedef struct {
/* Fields are only written at creating/deletion time */
/* No lock required for them, they are read only */
@ -127,7 +141,7 @@ typedef struct {
/* number of iovs used */
uint32_t iovs_num;
/* used for compression methods */
void *data;
void *compress_data;
} MultiFDSendParams;
typedef struct {
@ -152,6 +166,8 @@ typedef struct {
/* syncs main thread and channels */
QemuSemaphore sem_sync;
/* sem where to wait for more work */
QemuSemaphore sem;
/* this mutex protects the following parameters */
QemuMutex mutex;
@ -161,6 +177,8 @@ typedef struct {
uint32_t flags;
/* global number of generated multifd packets */
uint64_t packet_num;
int pending_job;
MultiFDRecvData *data;
/* thread local variables. No locking required */
@ -183,7 +201,7 @@ typedef struct {
/* num of non zero pages */
uint32_t normal_num;
/* used for de-compression methods */
void *data;
void *compress_data;
} MultiFDRecvParams;
typedef struct {
@ -197,8 +215,8 @@ typedef struct {
int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
/* Cleanup for receiving side */
void (*recv_cleanup)(MultiFDRecvParams *p);
/* Read all pages */
int (*recv_pages)(MultiFDRecvParams *p, Error **errp);
/* Read all data */
int (*recv)(MultiFDRecvParams *p, Error **errp);
} MultiFDMethods;
void multifd_register_ops(int method, MultiFDMethods *ops);
@ -211,5 +229,6 @@ static inline void multifd_send_prepare_header(MultiFDSendParams *p)
p->iovs_num++;
}
void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc);
#endif

View file

@ -204,6 +204,7 @@ Property migration_properties[] = {
DEFINE_PROP_MIG_CAP("x-switchover-ack",
MIGRATION_CAPABILITY_SWITCHOVER_ACK),
DEFINE_PROP_MIG_CAP("x-dirty-limit", MIGRATION_CAPABILITY_DIRTY_LIMIT),
DEFINE_PROP_MIG_CAP("mapped-ram", MIGRATION_CAPABILITY_MAPPED_RAM),
DEFINE_PROP_END_OF_LIST(),
};
@ -263,6 +264,13 @@ bool migrate_events(void)
return s->capabilities[MIGRATION_CAPABILITY_EVENTS];
}
bool migrate_mapped_ram(void)
{
MigrationState *s = migrate_get_current();
return s->capabilities[MIGRATION_CAPABILITY_MAPPED_RAM];
}
bool migrate_ignore_shared(void)
{
MigrationState *s = migrate_get_current();
@ -645,6 +653,26 @@ bool migrate_caps_check(bool *old_caps, bool *new_caps, Error **errp)
}
}
if (new_caps[MIGRATION_CAPABILITY_MAPPED_RAM]) {
if (new_caps[MIGRATION_CAPABILITY_XBZRLE]) {
error_setg(errp,
"Mapped-ram migration is incompatible with xbzrle");
return false;
}
if (new_caps[MIGRATION_CAPABILITY_COMPRESS]) {
error_setg(errp,
"Mapped-ram migration is incompatible with compression");
return false;
}
if (new_caps[MIGRATION_CAPABILITY_POSTCOPY_RAM]) {
error_setg(errp,
"Mapped-ram migration is incompatible with postcopy");
return false;
}
}
return true;
}
@ -1218,6 +1246,13 @@ bool migrate_params_check(MigrationParameters *params, Error **errp)
}
#endif
if (migrate_mapped_ram() &&
(migrate_multifd_compression() || migrate_tls())) {
error_setg(errp,
"Mapped-ram only available for non-compressed non-TLS multifd migration");
return false;
}
if (params->has_x_vcpu_dirty_limit_period &&
(params->x_vcpu_dirty_limit_period < 1 ||
params->x_vcpu_dirty_limit_period > 1000)) {
@ -1312,6 +1347,12 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
if (params->has_multifd_compression) {
dest->multifd_compression = params->multifd_compression;
}
if (params->has_multifd_zlib_level) {
dest->multifd_zlib_level = params->multifd_zlib_level;
}
if (params->has_multifd_zstd_level) {
dest->multifd_zstd_level = params->multifd_zstd_level;
}
if (params->has_xbzrle_cache_size) {
dest->xbzrle_cache_size = params->xbzrle_cache_size;
}
@ -1447,6 +1488,12 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
if (params->has_multifd_compression) {
s->parameters.multifd_compression = params->multifd_compression;
}
if (params->has_multifd_zlib_level) {
s->parameters.multifd_zlib_level = params->multifd_zlib_level;
}
if (params->has_multifd_zstd_level) {
s->parameters.multifd_zstd_level = params->multifd_zstd_level;
}
if (params->has_xbzrle_cache_size) {
s->parameters.xbzrle_cache_size = params->xbzrle_cache_size;
xbzrle_cache_resize(params->xbzrle_cache_size, errp);

View file

@ -31,6 +31,7 @@ bool migrate_compress(void);
bool migrate_dirty_bitmaps(void);
bool migrate_dirty_limit(void);
bool migrate_events(void);
bool migrate_mapped_ram(void);
bool migrate_ignore_shared(void);
bool migrate_late_block_activate(void);
bool migrate_multifd(void);

View file

@ -33,6 +33,7 @@
#include "options.h"
#include "qapi/error.h"
#include "rdma.h"
#include "io/channel-file.h"
#define IO_BUF_SIZE 32768
#define MAX_IOV_SIZE MIN_CONST(IOV_MAX, 64)
@ -255,6 +256,10 @@ static void qemu_iovec_release_ram(QEMUFile *f)
memset(f->may_free, 0, sizeof(f->may_free));
}
bool qemu_file_is_seekable(QEMUFile *f)
{
return qio_channel_has_feature(f->ioc, QIO_CHANNEL_FEATURE_SEEKABLE);
}
/**
* Flushes QEMUFile buffer
@ -447,6 +452,107 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
}
}
void qemu_put_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen,
off_t pos)
{
Error *err = NULL;
size_t ret;
if (f->last_error) {
return;
}
qemu_fflush(f);
ret = qio_channel_pwrite(f->ioc, (char *)buf, buflen, pos, &err);
if (err) {
qemu_file_set_error_obj(f, -EIO, err);
return;
}
if ((ssize_t)ret == QIO_CHANNEL_ERR_BLOCK) {
qemu_file_set_error_obj(f, -EAGAIN, NULL);
return;
}
if (ret != buflen) {
error_setg(&err, "Partial write of size %zu, expected %zu", ret,
buflen);
qemu_file_set_error_obj(f, -EIO, err);
return;
}
stat64_add(&mig_stats.qemu_file_transferred, buflen);
return;
}
size_t qemu_get_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen,
off_t pos)
{
Error *err = NULL;
size_t ret;
if (f->last_error) {
return 0;
}
ret = qio_channel_pread(f->ioc, (char *)buf, buflen, pos, &err);
if ((ssize_t)ret == -1 || err) {
qemu_file_set_error_obj(f, -EIO, err);
return 0;
}
if ((ssize_t)ret == QIO_CHANNEL_ERR_BLOCK) {
qemu_file_set_error_obj(f, -EAGAIN, NULL);
return 0;
}
if (ret != buflen) {
error_setg(&err, "Partial read of size %zu, expected %zu", ret, buflen);
qemu_file_set_error_obj(f, -EIO, err);
return 0;
}
return ret;
}
void qemu_set_offset(QEMUFile *f, off_t off, int whence)
{
Error *err = NULL;
off_t ret;
if (qemu_file_is_writable(f)) {
qemu_fflush(f);
} else {
/* Drop all cached buffers if existed; will trigger a re-fill later */
f->buf_index = 0;
f->buf_size = 0;
}
ret = qio_channel_io_seek(f->ioc, off, whence, &err);
if (ret == (off_t)-1) {
qemu_file_set_error_obj(f, -EIO, err);
}
}
off_t qemu_get_offset(QEMUFile *f)
{
Error *err = NULL;
off_t ret;
qemu_fflush(f);
ret = qio_channel_io_seek(f->ioc, 0, SEEK_CUR, &err);
if (ret == (off_t)-1) {
qemu_file_set_error_obj(f, -EIO, err);
}
return ret;
}
void qemu_put_byte(QEMUFile *f, int v)
{
if (f->last_error) {

View file

@ -75,6 +75,12 @@ QEMUFile *qemu_file_get_return_path(QEMUFile *f);
int qemu_fflush(QEMUFile *f);
void qemu_file_set_blocking(QEMUFile *f, bool block);
int qemu_file_get_to_fd(QEMUFile *f, int fd, size_t size);
void qemu_set_offset(QEMUFile *f, off_t off, int whence);
off_t qemu_get_offset(QEMUFile *f);
void qemu_put_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen,
off_t pos);
size_t qemu_get_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen,
off_t pos);
QIOChannel *qemu_file_get_ioc(QEMUFile *file);

View file

@ -94,6 +94,24 @@
#define RAM_SAVE_FLAG_MULTIFD_FLUSH 0x200
/* We can't use any flag that is bigger than 0x200 */
/*
* mapped-ram migration supports O_DIRECT, so we need to make sure the
* userspace buffer, the IO operation size and the file offset are
* aligned according to the underlying device's block size. The first
* two are already aligned to page size, but we need to add padding to
* the file to align the offset. We cannot read the block size
* dynamically because the migration file can be moved between
* different systems, so use 1M to cover most block sizes and to keep
* the file offset aligned at page size as well.
*/
#define MAPPED_RAM_FILE_OFFSET_ALIGNMENT 0x100000
/*
* When doing mapped-ram migration, this is the amount we read from
* the pages region in the migration file at a time.
*/
#define MAPPED_RAM_LOAD_BUF_SIZE 0x100000
XBZRLECacheStats xbzrle_counters;
/* used by the search for pages to send */
@ -1126,12 +1144,18 @@ static int save_zero_page(RAMState *rs, PageSearchStatus *pss,
return 0;
}
stat64_add(&mig_stats.zero_pages, 1);
if (migrate_mapped_ram()) {
/* zero pages are not transferred with mapped-ram */
clear_bit_atomic(offset >> TARGET_PAGE_BITS, pss->block->file_bmap);
return 1;
}
len += save_page_header(pss, file, pss->block, offset | RAM_SAVE_FLAG_ZERO);
qemu_put_byte(file, 0);
len += 1;
ram_release_page(pss->block->idstr, offset);
stat64_add(&mig_stats.zero_pages, 1);
ram_transferred_add(len);
/*
@ -1189,14 +1213,20 @@ static int save_normal_page(PageSearchStatus *pss, RAMBlock *block,
{
QEMUFile *file = pss->pss_channel;
ram_transferred_add(save_page_header(pss, pss->pss_channel, block,
offset | RAM_SAVE_FLAG_PAGE));
if (async) {
qemu_put_buffer_async(file, buf, TARGET_PAGE_SIZE,
migrate_release_ram() &&
migration_in_postcopy());
if (migrate_mapped_ram()) {
qemu_put_buffer_at(file, buf, TARGET_PAGE_SIZE,
block->pages_offset + offset);
set_bit(offset >> TARGET_PAGE_BITS, block->file_bmap);
} else {
qemu_put_buffer(file, buf, TARGET_PAGE_SIZE);
ram_transferred_add(save_page_header(pss, pss->pss_channel, block,
offset | RAM_SAVE_FLAG_PAGE));
if (async) {
qemu_put_buffer_async(file, buf, TARGET_PAGE_SIZE,
migrate_release_ram() &&
migration_in_postcopy());
} else {
qemu_put_buffer(file, buf, TARGET_PAGE_SIZE);
}
}
ram_transferred_add(TARGET_PAGE_SIZE);
stat64_add(&mig_stats.normal_pages, 1);
@ -1332,14 +1362,18 @@ static int find_dirty_block(RAMState *rs, PageSearchStatus *pss)
pss->block = QLIST_NEXT_RCU(pss->block, next);
if (!pss->block) {
if (migrate_multifd() &&
!migrate_multifd_flush_after_each_section()) {
(!migrate_multifd_flush_after_each_section() ||
migrate_mapped_ram())) {
QEMUFile *f = rs->pss[RAM_CHANNEL_PRECOPY].pss_channel;
int ret = multifd_send_sync_main();
if (ret < 0) {
return ret;
}
qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
qemu_fflush(f);
if (!migrate_mapped_ram()) {
qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
qemu_fflush(f);
}
}
/*
* If memory migration starts over, we will meet a dirtied page
@ -2778,6 +2812,9 @@ static void ram_list_init_bitmaps(void)
*/
block->bmap = bitmap_new(pages);
bitmap_set(block->bmap, 0, pages);
if (migrate_mapped_ram()) {
block->file_bmap = bitmap_new(pages);
}
block->clear_bmap_shift = shift;
block->clear_bmap = bitmap_new(clear_bmap_size(pages, shift));
}
@ -2915,6 +2952,89 @@ void qemu_guest_free_page_hint(void *addr, size_t len)
}
}
#define MAPPED_RAM_HDR_VERSION 1
struct MappedRamHeader {
uint32_t version;
/*
* The target's page size, so we know how many pages are in the
* bitmap.
*/
uint64_t page_size;
/*
* The offset in the migration file where the pages bitmap is
* stored.
*/
uint64_t bitmap_offset;
/*
* The offset in the migration file where the actual pages (data)
* are stored.
*/
uint64_t pages_offset;
} QEMU_PACKED;
typedef struct MappedRamHeader MappedRamHeader;
static void mapped_ram_setup_ramblock(QEMUFile *file, RAMBlock *block)
{
g_autofree MappedRamHeader *header = NULL;
size_t header_size, bitmap_size;
long num_pages;
header = g_new0(MappedRamHeader, 1);
header_size = sizeof(MappedRamHeader);
num_pages = block->used_length >> TARGET_PAGE_BITS;
bitmap_size = BITS_TO_LONGS(num_pages) * sizeof(unsigned long);
/*
* Save the file offsets of where the bitmap and the pages should
* go as they are written at the end of migration and during the
* iterative phase, respectively.
*/
block->bitmap_offset = qemu_get_offset(file) + header_size;
block->pages_offset = ROUND_UP(block->bitmap_offset +
bitmap_size,
MAPPED_RAM_FILE_OFFSET_ALIGNMENT);
header->version = cpu_to_be32(MAPPED_RAM_HDR_VERSION);
header->page_size = cpu_to_be64(TARGET_PAGE_SIZE);
header->bitmap_offset = cpu_to_be64(block->bitmap_offset);
header->pages_offset = cpu_to_be64(block->pages_offset);
qemu_put_buffer(file, (uint8_t *) header, header_size);
/* prepare offset for next ramblock */
qemu_set_offset(file, block->pages_offset + block->used_length, SEEK_SET);
}
static bool mapped_ram_read_header(QEMUFile *file, MappedRamHeader *header,
Error **errp)
{
size_t ret, header_size = sizeof(MappedRamHeader);
ret = qemu_get_buffer(file, (uint8_t *)header, header_size);
if (ret != header_size) {
error_setg(errp, "Could not read whole mapped-ram migration header "
"(expected %zd, got %zd bytes)", header_size, ret);
return false;
}
/* migration stream is big-endian */
header->version = be32_to_cpu(header->version);
if (header->version > MAPPED_RAM_HDR_VERSION) {
error_setg(errp, "Migration mapped-ram capability version not "
"supported (expected <= %d, got %d)", MAPPED_RAM_HDR_VERSION,
header->version);
return false;
}
header->page_size = be64_to_cpu(header->page_size);
header->bitmap_offset = be64_to_cpu(header->bitmap_offset);
header->pages_offset = be64_to_cpu(header->pages_offset);
return true;
}
/*
* Each of ram_save_setup, ram_save_iterate and ram_save_complete has
* long-running RCU critical section. When rcu-reclaims in the code
@ -2970,6 +3090,10 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
if (migrate_ignore_shared()) {
qemu_put_be64(f, block->mr->addr);
}
if (migrate_mapped_ram()) {
mapped_ram_setup_ramblock(f, block);
}
}
}
@ -2995,7 +3119,8 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
return ret;
}
if (migrate_multifd() && !migrate_multifd_flush_after_each_section()) {
if (migrate_multifd() && !migrate_multifd_flush_after_each_section()
&& !migrate_mapped_ram()) {
qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
}
@ -3003,6 +3128,33 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
return qemu_fflush(f);
}
static void ram_save_file_bmap(QEMUFile *f)
{
RAMBlock *block;
RAMBLOCK_FOREACH_MIGRATABLE(block) {
long num_pages = block->used_length >> TARGET_PAGE_BITS;
long bitmap_size = BITS_TO_LONGS(num_pages) * sizeof(unsigned long);
qemu_put_buffer_at(f, (uint8_t *)block->file_bmap, bitmap_size,
block->bitmap_offset);
ram_transferred_add(bitmap_size);
/*
* Free the bitmap here to catch any synchronization issues
* with multifd channels. No channels should be sending pages
* after we've written the bitmap to file.
*/
g_free(block->file_bmap);
block->file_bmap = NULL;
}
}
void ramblock_set_file_bmap_atomic(RAMBlock *block, ram_addr_t offset)
{
set_bit_atomic(offset >> TARGET_PAGE_BITS, block->file_bmap);
}
/**
* ram_save_iterate: iterative stage for migration
*
@ -3112,7 +3264,8 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
out:
if (ret >= 0
&& migration_is_setup_or_active(migrate_get_current()->state)) {
if (migrate_multifd() && migrate_multifd_flush_after_each_section()) {
if (migrate_multifd() && migrate_multifd_flush_after_each_section() &&
!migrate_mapped_ram()) {
ret = multifd_send_sync_main();
if (ret < 0) {
return ret;
@ -3192,7 +3345,20 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
return ret;
}
if (migrate_multifd() && !migrate_multifd_flush_after_each_section()) {
if (migrate_mapped_ram()) {
ram_save_file_bmap(f);
if (qemu_file_get_error(f)) {
Error *local_err = NULL;
int err = qemu_file_get_error_obj(f, &local_err);
error_reportf_err(local_err, "Failed to write bitmap to file: ");
return -err;
}
}
if (migrate_multifd() && !migrate_multifd_flush_after_each_section() &&
!migrate_mapped_ram()) {
qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
}
qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
@ -3792,23 +3958,149 @@ void colo_flush_ram_cache(void)
trace_colo_flush_ram_cache_end();
}
static size_t ram_load_multifd_pages(void *host_addr, size_t size,
uint64_t offset)
{
MultiFDRecvData *data = multifd_get_recv_data();
data->opaque = host_addr;
data->file_offset = offset;
data->size = size;
if (!multifd_recv()) {
return 0;
}
return size;
}
static bool read_ramblock_mapped_ram(QEMUFile *f, RAMBlock *block,
long num_pages, unsigned long *bitmap,
Error **errp)
{
ERRP_GUARD();
unsigned long set_bit_idx, clear_bit_idx;
ram_addr_t offset;
void *host;
size_t read, unread, size;
for (set_bit_idx = find_first_bit(bitmap, num_pages);
set_bit_idx < num_pages;
set_bit_idx = find_next_bit(bitmap, num_pages, clear_bit_idx + 1)) {
clear_bit_idx = find_next_zero_bit(bitmap, num_pages, set_bit_idx + 1);
unread = TARGET_PAGE_SIZE * (clear_bit_idx - set_bit_idx);
offset = set_bit_idx << TARGET_PAGE_BITS;
while (unread > 0) {
host = host_from_ram_block_offset(block, offset);
if (!host) {
error_setg(errp, "page outside of ramblock %s range",
block->idstr);
return false;
}
size = MIN(unread, MAPPED_RAM_LOAD_BUF_SIZE);
if (migrate_multifd()) {
read = ram_load_multifd_pages(host, size,
block->pages_offset + offset);
} else {
read = qemu_get_buffer_at(f, host, size,
block->pages_offset + offset);
}
if (!read) {
goto err;
}
offset += read;
unread -= read;
}
}
return true;
err:
qemu_file_get_error_obj(f, errp);
error_prepend(errp, "(%s) failed to read page " RAM_ADDR_FMT
"from file offset %" PRIx64 ": ", block->idstr, offset,
block->pages_offset + offset);
return false;
}
static void parse_ramblock_mapped_ram(QEMUFile *f, RAMBlock *block,
ram_addr_t length, Error **errp)
{
g_autofree unsigned long *bitmap = NULL;
MappedRamHeader header;
size_t bitmap_size;
long num_pages;
if (!mapped_ram_read_header(f, &header, errp)) {
return;
}
block->pages_offset = header.pages_offset;
/*
* Check the alignment of the file region that contains pages. We
* don't enforce MAPPED_RAM_FILE_OFFSET_ALIGNMENT to allow that
* value to change in the future. Do only a sanity check with page
* size alignment.
*/
if (!QEMU_IS_ALIGNED(block->pages_offset, TARGET_PAGE_SIZE)) {
error_setg(errp,
"Error reading ramblock %s pages, region has bad alignment",
block->idstr);
return;
}
num_pages = length / header.page_size;
bitmap_size = BITS_TO_LONGS(num_pages) * sizeof(unsigned long);
bitmap = g_malloc0(bitmap_size);
if (qemu_get_buffer_at(f, (uint8_t *)bitmap, bitmap_size,
header.bitmap_offset) != bitmap_size) {
error_setg(errp, "Error reading dirty bitmap");
return;
}
if (!read_ramblock_mapped_ram(f, block, num_pages, bitmap, errp)) {
return;
}
/* Skip pages array */
qemu_set_offset(f, block->pages_offset + length, SEEK_SET);
return;
}
static int parse_ramblock(QEMUFile *f, RAMBlock *block, ram_addr_t length)
{
int ret = 0;
/* ADVISE is earlier, it shows the source has the postcopy capability on */
bool postcopy_advised = migration_incoming_postcopy_advised();
int max_hg_page_size;
Error *local_err = NULL;
assert(block);
if (migrate_mapped_ram()) {
parse_ramblock_mapped_ram(f, block, length, &local_err);
if (local_err) {
error_report_err(local_err);
return -EINVAL;
}
return 0;
}
if (!qemu_ram_is_migratable(block)) {
error_report("block %s should not be migrated !", block->idstr);
return -EINVAL;
}
if (length != block->used_length) {
Error *local_err = NULL;
ret = qemu_ram_resize(block, length, &local_err);
if (local_err) {
error_report_err(local_err);
@ -3899,6 +4191,12 @@ static int ram_load_precopy(QEMUFile *f)
invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
}
if (migrate_mapped_ram()) {
invalid_flags |= (RAM_SAVE_FLAG_HOOK | RAM_SAVE_FLAG_MULTIFD_FLUSH |
RAM_SAVE_FLAG_PAGE | RAM_SAVE_FLAG_XBZRLE |
RAM_SAVE_FLAG_ZERO);
}
while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
ram_addr_t addr;
void *host = NULL, *host_bak = NULL;
@ -3920,6 +4218,8 @@ static int ram_load_precopy(QEMUFile *f)
addr &= TARGET_PAGE_MASK;
if (flags & invalid_flags) {
error_report("Unexpected RAM flags: %d", flags & invalid_flags);
if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
error_report("Received an unexpected compressed page");
}
@ -3972,6 +4272,16 @@ static int ram_load_precopy(QEMUFile *f)
switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
case RAM_SAVE_FLAG_MEM_SIZE:
ret = parse_ramblocks(f, addr);
/*
* For mapped-ram migration (to a file) using multifd, we sync
* once and for all here to make sure all tasks we queued to
* multifd threads are completed, so that all the ramblocks
* (including all the guest memory pages within) are fully
* loaded after this sync returns.
*/
if (migrate_mapped_ram()) {
multifd_recv_sync_main();
}
break;
case RAM_SAVE_FLAG_ZERO:
@ -4012,7 +4322,12 @@ static int ram_load_precopy(QEMUFile *f)
case RAM_SAVE_FLAG_EOS:
/* normal exit */
if (migrate_multifd() &&
migrate_multifd_flush_after_each_section()) {
migrate_multifd_flush_after_each_section() &&
/*
* Mapped-ram migration flushes once and for all after
* parsing ramblocks. Always ignore EOS for it.
*/
!migrate_mapped_ram()) {
multifd_recv_sync_main();
}
break;

View file

@ -75,6 +75,7 @@ bool ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *rb, Error **errp);
bool ramblock_page_is_discarded(RAMBlock *rb, ram_addr_t start);
void postcopy_preempt_shutdown_file(MigrationState *s);
void *postcopy_preempt_thread(void *opaque);
void ramblock_set_file_bmap_atomic(RAMBlock *block, ram_addr_t offset);
/* ram cache */
int colo_init_ram_cache(void);

View file

@ -245,6 +245,7 @@ static bool should_validate_capability(int capability)
/* Validate only new capabilities to keep compatibility. */
switch (capability) {
case MIGRATION_CAPABILITY_X_IGNORE_SHARED:
case MIGRATION_CAPABILITY_MAPPED_RAM:
return true;
default:
return false;

View file

@ -132,7 +132,7 @@ multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uin
multifd_recv_new_channel(uint8_t id) "channel %u"
multifd_recv_sync_main(long packet_num) "packet num %ld"
multifd_recv_sync_main_signal(uint8_t id) "channel %u"
multifd_recv_sync_main_wait(uint8_t id) "channel %u"
multifd_recv_sync_main_wait(uint8_t id) "iter %u"
multifd_recv_terminate_threads(bool error) "error %d"
multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64
multifd_recv_thread_start(uint8_t id) "%u"