mirror of
https://github.com/Motorhead1991/qemu.git
synced 2025-08-02 15:23:53 -06:00

Until now, FUA was always emulated with a separate flush after the write for file-posix. The overhead of processing a second request can reduce performance significantly for a guest disk that has disabled the write cache, especially if the host disk is already write through, too, and the flush isn't actually doing anything. Advertise support for REQ_FUA in write requests and implement it for Linux AIO and io_uring using the RWF_DSYNC flag for write requests. The thread pool still performs a separate fdatasync() call. This can be improved later by using the pwritev2() syscall if available. As an example, this is how fio numbers can be improved in some scenarios with this patch (all using virtio-blk with cache=directsync on an nvme block device for the VM, fio with ioengine=libaio,direct=1,sync=1): | old | with FUA support ------------------------------+---------------+------------------- bs=4k, iodepth=1, numjobs=1 | 45.6k iops | 56.1k iops bs=4k, iodepth=1, numjobs=16 | 183.3k iops | 236.0k iops bs=4k, iodepth=16, numjobs=1 | 258.4k iops | 311.1k iops However, not all scenarios are clear wins. On another slower disk I saw little to no improvment. In fact, in two corner case scenarios, I even observed a regression, which I however consider acceptable: 1. On slow host disks in a write through cache mode, when the guest is using virtio-blk in a separate iothread so that polling can be enabled, and each completion is quickly followed up with a new request (so that polling gets it), it can happen that enabling FUA makes things slower - the additional very fast no-op flush we used to have gave the adaptive polling algorithm a success so that it kept polling. Without it, we only have the slow write request, which disables polling. This is a problem in the polling algorithm that will be fixed later in this series. 2. With a high queue depth, it can be beneficial to have flush requests for another reason: The optimisation in bdrv_co_flush() that flushes only once per write generation acts as a synchronisation mechanism that lets all requests complete at the same time. This can result in better batching and if the disk is very fast (I only saw this with a null_blk backend), this can make up for the overhead of the flush and improve throughput. In theory, we could optionally introduce a similar artificial latency in the normal completion path to achieve the same kind of completion batching. This is not implemented in this series. Compatibility is not a concern for the kernel side of io_uring, it has supported RWF_DSYNC from the start. However, io_uring_prep_writev2() is not available before liburing 2.2. Linux AIO started supporting it in Linux 4.13 and libaio 0.3.111. The kernel is not a problem for any supported build platform, so it's not necessary to add runtime checks. However, openSUSE is still stuck with an older libaio version that would break the build. We must detect the presence of the writev2 functions in the user space libraries at build time to avoid build failures. Signed-off-by: Kevin Wolf <kwolf@redhat.com> Message-ID: <20250307221634.71951-2-kwolf@redhat.com> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
526 lines
14 KiB
C
526 lines
14 KiB
C
/*
|
|
* Linux native AIO support.
|
|
*
|
|
* Copyright (C) 2009 IBM, Corp.
|
|
* Copyright (C) 2009 Red Hat, Inc.
|
|
*
|
|
* This work is licensed under the terms of the GNU GPL, version 2 or later.
|
|
* See the COPYING file in the top-level directory.
|
|
*/
|
|
#include "qemu/osdep.h"
|
|
#include "block/aio.h"
|
|
#include "qemu/queue.h"
|
|
#include "block/block.h"
|
|
#include "block/raw-aio.h"
|
|
#include "qemu/event_notifier.h"
|
|
#include "qemu/coroutine.h"
|
|
#include "qemu/defer-call.h"
|
|
#include "qapi/error.h"
|
|
#include "system/block-backend.h"
|
|
|
|
/* Only used for assertions. */
|
|
#include "qemu/coroutine_int.h"
|
|
|
|
#include <libaio.h>
|
|
|
|
/*
|
|
* Queue size (per-device).
|
|
*
|
|
* XXX: eventually we need to communicate this to the guest and/or make it
|
|
* tunable by the guest. If we get more outstanding requests at a time
|
|
* than this we will get EAGAIN from io_submit which is communicated to
|
|
* the guest as an I/O error.
|
|
*/
|
|
#define MAX_EVENTS 1024
|
|
|
|
/* Maximum number of requests in a batch. (default value) */
|
|
#define DEFAULT_MAX_BATCH 32
|
|
|
|
struct qemu_laiocb {
|
|
Coroutine *co;
|
|
LinuxAioState *ctx;
|
|
struct iocb iocb;
|
|
ssize_t ret;
|
|
size_t nbytes;
|
|
QEMUIOVector *qiov;
|
|
bool is_read;
|
|
QSIMPLEQ_ENTRY(qemu_laiocb) next;
|
|
};
|
|
|
|
typedef struct {
|
|
unsigned int in_queue;
|
|
unsigned int in_flight;
|
|
bool blocked;
|
|
QSIMPLEQ_HEAD(, qemu_laiocb) pending;
|
|
} LaioQueue;
|
|
|
|
struct LinuxAioState {
|
|
AioContext *aio_context;
|
|
|
|
io_context_t ctx;
|
|
EventNotifier e;
|
|
|
|
/* No locking required, only accessed from AioContext home thread */
|
|
LaioQueue io_q;
|
|
QEMUBH *completion_bh;
|
|
int event_idx;
|
|
int event_max;
|
|
};
|
|
|
|
static void ioq_submit(LinuxAioState *s);
|
|
|
|
static inline ssize_t io_event_ret(struct io_event *ev)
|
|
{
|
|
return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
|
|
}
|
|
|
|
/*
|
|
* Completes an AIO request.
|
|
*/
|
|
static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
|
|
{
|
|
int ret;
|
|
|
|
ret = laiocb->ret;
|
|
if (ret != -ECANCELED) {
|
|
if (ret == laiocb->nbytes) {
|
|
ret = 0;
|
|
} else if (ret >= 0) {
|
|
/* Short reads mean EOF, pad with zeros. */
|
|
if (laiocb->is_read) {
|
|
qemu_iovec_memset(laiocb->qiov, ret, 0,
|
|
laiocb->qiov->size - ret);
|
|
} else {
|
|
ret = -ENOSPC;
|
|
}
|
|
}
|
|
}
|
|
|
|
laiocb->ret = ret;
|
|
|
|
/*
|
|
* If the coroutine is already entered it must be in ioq_submit() and
|
|
* will notice laio->ret has been filled in when it eventually runs
|
|
* later. Coroutines cannot be entered recursively so avoid doing
|
|
* that!
|
|
*/
|
|
assert(laiocb->co->ctx == laiocb->ctx->aio_context);
|
|
if (!qemu_coroutine_entered(laiocb->co)) {
|
|
aio_co_wake(laiocb->co);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* aio_ring buffer which is shared between userspace and kernel.
|
|
*
|
|
* This copied from linux/fs/aio.c, common header does not exist
|
|
* but AIO exists for ages so we assume ABI is stable.
|
|
*/
|
|
struct aio_ring {
|
|
unsigned id; /* kernel internal index number */
|
|
unsigned nr; /* number of io_events */
|
|
unsigned head; /* Written to by userland or by kernel. */
|
|
unsigned tail;
|
|
|
|
unsigned magic;
|
|
unsigned compat_features;
|
|
unsigned incompat_features;
|
|
unsigned header_length; /* size of aio_ring */
|
|
|
|
struct io_event io_events[];
|
|
};
|
|
|
|
/**
|
|
* io_getevents_peek:
|
|
* @ctx: AIO context
|
|
* @events: pointer on events array, output value
|
|
|
|
* Returns the number of completed events and sets a pointer
|
|
* on events array. This function does not update the internal
|
|
* ring buffer, only reads head and tail. When @events has been
|
|
* processed io_getevents_commit() must be called.
|
|
*/
|
|
static inline unsigned int io_getevents_peek(io_context_t ctx,
|
|
struct io_event **events)
|
|
{
|
|
struct aio_ring *ring = (struct aio_ring *)ctx;
|
|
unsigned int head = ring->head, tail = ring->tail;
|
|
unsigned int nr;
|
|
|
|
nr = tail >= head ? tail - head : ring->nr - head;
|
|
*events = ring->io_events + head;
|
|
/* To avoid speculative loads of s->events[i] before observing tail.
|
|
Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */
|
|
smp_rmb();
|
|
|
|
return nr;
|
|
}
|
|
|
|
/**
|
|
* io_getevents_commit:
|
|
* @ctx: AIO context
|
|
* @nr: the number of events on which head should be advanced
|
|
*
|
|
* Advances head of a ring buffer.
|
|
*/
|
|
static inline void io_getevents_commit(io_context_t ctx, unsigned int nr)
|
|
{
|
|
struct aio_ring *ring = (struct aio_ring *)ctx;
|
|
|
|
if (nr) {
|
|
ring->head = (ring->head + nr) % ring->nr;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* io_getevents_advance_and_peek:
|
|
* @ctx: AIO context
|
|
* @events: pointer on events array, output value
|
|
* @nr: the number of events on which head should be advanced
|
|
*
|
|
* Advances head of a ring buffer and returns number of elements left.
|
|
*/
|
|
static inline unsigned int
|
|
io_getevents_advance_and_peek(io_context_t ctx,
|
|
struct io_event **events,
|
|
unsigned int nr)
|
|
{
|
|
io_getevents_commit(ctx, nr);
|
|
return io_getevents_peek(ctx, events);
|
|
}
|
|
|
|
/**
|
|
* qemu_laio_process_completions:
|
|
* @s: AIO state
|
|
*
|
|
* Fetches completed I/O requests and invokes their callbacks.
|
|
*
|
|
* The function is somewhat tricky because it supports nested event loops, for
|
|
* example when a request callback invokes aio_poll(). In order to do this,
|
|
* indices are kept in LinuxAioState. Function schedules BH completion so it
|
|
* can be called again in a nested event loop. When there are no events left
|
|
* to complete the BH is being canceled.
|
|
*/
|
|
static void qemu_laio_process_completions(LinuxAioState *s)
|
|
{
|
|
struct io_event *events;
|
|
|
|
defer_call_begin();
|
|
|
|
/* Reschedule so nested event loops see currently pending completions */
|
|
qemu_bh_schedule(s->completion_bh);
|
|
|
|
while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events,
|
|
s->event_idx))) {
|
|
for (s->event_idx = 0; s->event_idx < s->event_max; ) {
|
|
struct iocb *iocb = events[s->event_idx].obj;
|
|
struct qemu_laiocb *laiocb =
|
|
container_of(iocb, struct qemu_laiocb, iocb);
|
|
|
|
laiocb->ret = io_event_ret(&events[s->event_idx]);
|
|
|
|
/* Change counters one-by-one because we can be nested. */
|
|
s->io_q.in_flight--;
|
|
s->event_idx++;
|
|
qemu_laio_process_completion(laiocb);
|
|
}
|
|
}
|
|
|
|
qemu_bh_cancel(s->completion_bh);
|
|
|
|
/* If we are nested we have to notify the level above that we are done
|
|
* by setting event_max to zero, upper level will then jump out of it's
|
|
* own `for` loop. If we are the last all counters dropped to zero. */
|
|
s->event_max = 0;
|
|
s->event_idx = 0;
|
|
|
|
defer_call_end();
|
|
}
|
|
|
|
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
|
|
{
|
|
qemu_laio_process_completions(s);
|
|
|
|
if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) {
|
|
ioq_submit(s);
|
|
}
|
|
}
|
|
|
|
static void qemu_laio_completion_bh(void *opaque)
|
|
{
|
|
LinuxAioState *s = opaque;
|
|
|
|
qemu_laio_process_completions_and_submit(s);
|
|
}
|
|
|
|
static void qemu_laio_completion_cb(EventNotifier *e)
|
|
{
|
|
LinuxAioState *s = container_of(e, LinuxAioState, e);
|
|
|
|
if (event_notifier_test_and_clear(&s->e)) {
|
|
qemu_laio_process_completions_and_submit(s);
|
|
}
|
|
}
|
|
|
|
static bool qemu_laio_poll_cb(void *opaque)
|
|
{
|
|
EventNotifier *e = opaque;
|
|
LinuxAioState *s = container_of(e, LinuxAioState, e);
|
|
struct io_event *events;
|
|
|
|
return io_getevents_peek(s->ctx, &events);
|
|
}
|
|
|
|
static void qemu_laio_poll_ready(EventNotifier *opaque)
|
|
{
|
|
EventNotifier *e = opaque;
|
|
LinuxAioState *s = container_of(e, LinuxAioState, e);
|
|
|
|
qemu_laio_process_completions_and_submit(s);
|
|
}
|
|
|
|
static void ioq_init(LaioQueue *io_q)
|
|
{
|
|
QSIMPLEQ_INIT(&io_q->pending);
|
|
io_q->in_queue = 0;
|
|
io_q->in_flight = 0;
|
|
io_q->blocked = false;
|
|
}
|
|
|
|
static void ioq_submit(LinuxAioState *s)
|
|
{
|
|
int ret, len;
|
|
struct qemu_laiocb *aiocb;
|
|
struct iocb *iocbs[MAX_EVENTS];
|
|
QSIMPLEQ_HEAD(, qemu_laiocb) completed;
|
|
|
|
do {
|
|
if (s->io_q.in_flight >= MAX_EVENTS) {
|
|
break;
|
|
}
|
|
len = 0;
|
|
QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
|
|
iocbs[len++] = &aiocb->iocb;
|
|
if (s->io_q.in_flight + len >= MAX_EVENTS) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
ret = io_submit(s->ctx, len, iocbs);
|
|
if (ret == -EAGAIN) {
|
|
break;
|
|
}
|
|
if (ret < 0) {
|
|
/* Fail the first request, retry the rest */
|
|
aiocb = QSIMPLEQ_FIRST(&s->io_q.pending);
|
|
QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
|
|
s->io_q.in_queue--;
|
|
aiocb->ret = ret;
|
|
qemu_laio_process_completion(aiocb);
|
|
continue;
|
|
}
|
|
|
|
s->io_q.in_flight += ret;
|
|
s->io_q.in_queue -= ret;
|
|
aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
|
|
QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
|
|
} while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
|
|
s->io_q.blocked = (s->io_q.in_queue > 0);
|
|
|
|
if (s->io_q.in_flight) {
|
|
/* We can try to complete something just right away if there are
|
|
* still requests in-flight. */
|
|
qemu_laio_process_completions(s);
|
|
/*
|
|
* Even we have completed everything (in_flight == 0), the queue can
|
|
* have still pended requests (in_queue > 0). We do not attempt to
|
|
* repeat submission to avoid IO hang. The reason is simple: s->e is
|
|
* still set and completion callback will be called shortly and all
|
|
* pended requests will be submitted from there.
|
|
*/
|
|
}
|
|
}
|
|
|
|
static uint64_t laio_max_batch(LinuxAioState *s, uint64_t dev_max_batch)
|
|
{
|
|
uint64_t max_batch = s->aio_context->aio_max_batch ?: DEFAULT_MAX_BATCH;
|
|
|
|
/*
|
|
* AIO context can be shared between multiple block devices, so
|
|
* `dev_max_batch` allows reducing the batch size for latency-sensitive
|
|
* devices.
|
|
*/
|
|
max_batch = MIN_NON_ZERO(dev_max_batch, max_batch);
|
|
|
|
/* limit the batch with the number of available events */
|
|
max_batch = MIN_NON_ZERO(MAX_EVENTS - s->io_q.in_flight, max_batch);
|
|
|
|
return max_batch;
|
|
}
|
|
|
|
static void laio_deferred_fn(void *opaque)
|
|
{
|
|
LinuxAioState *s = opaque;
|
|
|
|
if (!s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
|
|
ioq_submit(s);
|
|
}
|
|
}
|
|
|
|
static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
|
|
int type, BdrvRequestFlags flags,
|
|
uint64_t dev_max_batch)
|
|
{
|
|
LinuxAioState *s = laiocb->ctx;
|
|
struct iocb *iocbs = &laiocb->iocb;
|
|
QEMUIOVector *qiov = laiocb->qiov;
|
|
|
|
switch (type) {
|
|
case QEMU_AIO_WRITE:
|
|
#ifdef HAVE_IO_PREP_PWRITEV2
|
|
{
|
|
int laio_flags = (flags & BDRV_REQ_FUA) ? RWF_DSYNC : 0;
|
|
io_prep_pwritev2(iocbs, fd, qiov->iov, qiov->niov, offset, laio_flags);
|
|
}
|
|
#else
|
|
assert(flags == 0);
|
|
io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
|
|
#endif
|
|
break;
|
|
case QEMU_AIO_ZONE_APPEND:
|
|
io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
|
|
break;
|
|
case QEMU_AIO_READ:
|
|
io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
|
|
break;
|
|
case QEMU_AIO_FLUSH:
|
|
io_prep_fdsync(iocbs, fd);
|
|
break;
|
|
/* Currently Linux kernel does not support other operations */
|
|
default:
|
|
fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
|
|
__func__, type);
|
|
return -EIO;
|
|
}
|
|
io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
|
|
|
|
QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
|
|
s->io_q.in_queue++;
|
|
if (!s->io_q.blocked) {
|
|
if (s->io_q.in_queue >= laio_max_batch(s, dev_max_batch)) {
|
|
ioq_submit(s);
|
|
} else {
|
|
defer_call(laio_deferred_fn, s);
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov,
|
|
int type, BdrvRequestFlags flags,
|
|
uint64_t dev_max_batch)
|
|
{
|
|
int ret;
|
|
AioContext *ctx = qemu_get_current_aio_context();
|
|
struct qemu_laiocb laiocb = {
|
|
.co = qemu_coroutine_self(),
|
|
.nbytes = qiov ? qiov->size : 0,
|
|
.ctx = aio_get_linux_aio(ctx),
|
|
.ret = -EINPROGRESS,
|
|
.is_read = (type == QEMU_AIO_READ),
|
|
.qiov = qiov,
|
|
};
|
|
|
|
ret = laio_do_submit(fd, &laiocb, offset, type, flags, dev_max_batch);
|
|
if (ret < 0) {
|
|
return ret;
|
|
}
|
|
|
|
if (laiocb.ret == -EINPROGRESS) {
|
|
qemu_coroutine_yield();
|
|
}
|
|
return laiocb.ret;
|
|
}
|
|
|
|
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
|
|
{
|
|
aio_set_event_notifier(old_context, &s->e, NULL, NULL, NULL);
|
|
qemu_bh_delete(s->completion_bh);
|
|
s->aio_context = NULL;
|
|
}
|
|
|
|
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
|
|
{
|
|
s->aio_context = new_context;
|
|
s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
|
|
aio_set_event_notifier(new_context, &s->e,
|
|
qemu_laio_completion_cb,
|
|
qemu_laio_poll_cb,
|
|
qemu_laio_poll_ready);
|
|
}
|
|
|
|
LinuxAioState *laio_init(Error **errp)
|
|
{
|
|
int rc;
|
|
LinuxAioState *s;
|
|
|
|
s = g_malloc0(sizeof(*s));
|
|
rc = event_notifier_init(&s->e, false);
|
|
if (rc < 0) {
|
|
error_setg_errno(errp, -rc, "failed to initialize event notifier");
|
|
goto out_free_state;
|
|
}
|
|
|
|
rc = io_setup(MAX_EVENTS, &s->ctx);
|
|
if (rc < 0) {
|
|
error_setg_errno(errp, -rc, "failed to create linux AIO context");
|
|
goto out_close_efd;
|
|
}
|
|
|
|
ioq_init(&s->io_q);
|
|
|
|
return s;
|
|
|
|
out_close_efd:
|
|
event_notifier_cleanup(&s->e);
|
|
out_free_state:
|
|
g_free(s);
|
|
return NULL;
|
|
}
|
|
|
|
void laio_cleanup(LinuxAioState *s)
|
|
{
|
|
event_notifier_cleanup(&s->e);
|
|
|
|
if (io_destroy(s->ctx) != 0) {
|
|
fprintf(stderr, "%s: destroy AIO context %p failed\n",
|
|
__func__, &s->ctx);
|
|
}
|
|
g_free(s);
|
|
}
|
|
|
|
bool laio_has_fdsync(int fd)
|
|
{
|
|
struct iocb cb;
|
|
struct iocb *cbs[] = {&cb, NULL};
|
|
|
|
io_context_t ctx = 0;
|
|
io_setup(1, &ctx);
|
|
|
|
/* check if host kernel supports IO_CMD_FDSYNC */
|
|
io_prep_fdsync(&cb, fd);
|
|
int ret = io_submit(ctx, 1, cbs);
|
|
|
|
io_destroy(ctx);
|
|
return (ret == -EINVAL) ? false : true;
|
|
}
|
|
|
|
bool laio_has_fua(void)
|
|
{
|
|
#ifdef HAVE_IO_PREP_PWRITEV2
|
|
return true;
|
|
#else
|
|
return false;
|
|
#endif
|
|
}
|