coroutine: move into libqemuutil.a library

The coroutine files are currently referenced by the block-obj-y
variable. The coroutine functionality though is already used by
more than just the block code. eg migration code uses coroutine
yield. In the future the I/O channel code will also use the
coroutine yield functionality. Since the coroutine code is nicely
self-contained it can be easily built as part of the libqemuutil.a
library, making it widely available.

The headers are also moved into include/qemu, instead of the
include/block directory, since they are now part of the util
codebase, and the impl was never in the block/ directory
either.

Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
This commit is contained in:
Daniel P. Berrange 2015-09-01 14:48:02 +01:00
parent 57cb38b383
commit 10817bf09d
36 changed files with 45 additions and 39 deletions

View file

@ -18,3 +18,6 @@ util-obj-y += getauxval.o
util-obj-y += readline.o
util-obj-y += rfifolock.o
util-obj-y += rcu.o
util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
util-obj-y += qemu-coroutine-sleep.o
util-obj-y += coroutine-$(CONFIG_COROUTINE_BACKEND).o

198
util/coroutine-gthread.c Normal file
View file

@ -0,0 +1,198 @@
/*
* GThread coroutine initialization code
*
* Copyright (C) 2006 Anthony Liguori <anthony@codemonkey.ws>
* Copyright (C) 2011 Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.0 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
#include <glib.h>
#include "qemu-common.h"
#include "qemu/coroutine_int.h"
typedef struct {
Coroutine base;
GThread *thread;
bool runnable;
bool free_on_thread_exit;
CoroutineAction action;
} CoroutineGThread;
static CompatGMutex coroutine_lock;
static CompatGCond coroutine_cond;
/* GLib 2.31 and beyond deprecated various parts of the thread API,
* but the new interfaces are not available in older GLib versions
* so we have to cope with both.
*/
#if GLIB_CHECK_VERSION(2, 31, 0)
/* Awkwardly, the GPrivate API doesn't provide a way to update the
* GDestroyNotify handler for the coroutine key dynamically. So instead
* we track whether or not the CoroutineGThread should be freed on
* thread exit / coroutine key update using the free_on_thread_exit
* field.
*/
static void coroutine_destroy_notify(gpointer data)
{
CoroutineGThread *co = data;
if (co && co->free_on_thread_exit) {
g_free(co);
}
}
static GPrivate coroutine_key = G_PRIVATE_INIT(coroutine_destroy_notify);
static inline CoroutineGThread *get_coroutine_key(void)
{
return g_private_get(&coroutine_key);
}
static inline void set_coroutine_key(CoroutineGThread *co,
bool free_on_thread_exit)
{
/* Unlike g_static_private_set() this does not call the GDestroyNotify
* if the previous value of the key was NULL. Fortunately we only need
* the GDestroyNotify in the non-NULL key case.
*/
co->free_on_thread_exit = free_on_thread_exit;
g_private_replace(&coroutine_key, co);
}
static inline GThread *create_thread(GThreadFunc func, gpointer data)
{
return g_thread_new("coroutine", func, data);
}
#else
/* Handle older GLib versions */
static GStaticPrivate coroutine_key = G_STATIC_PRIVATE_INIT;
static inline CoroutineGThread *get_coroutine_key(void)
{
return g_static_private_get(&coroutine_key);
}
static inline void set_coroutine_key(CoroutineGThread *co,
bool free_on_thread_exit)
{
g_static_private_set(&coroutine_key, co,
free_on_thread_exit ? (GDestroyNotify)g_free : NULL);
}
static inline GThread *create_thread(GThreadFunc func, gpointer data)
{
return g_thread_create_full(func, data, 0, TRUE, TRUE,
G_THREAD_PRIORITY_NORMAL, NULL);
}
#endif
static void __attribute__((constructor)) coroutine_init(void)
{
#if !GLIB_CHECK_VERSION(2, 31, 0)
if (!g_thread_supported()) {
g_thread_init(NULL);
}
#endif
}
static void coroutine_wait_runnable_locked(CoroutineGThread *co)
{
while (!co->runnable) {
g_cond_wait(&coroutine_cond, &coroutine_lock);
}
}
static void coroutine_wait_runnable(CoroutineGThread *co)
{
g_mutex_lock(&coroutine_lock);
coroutine_wait_runnable_locked(co);
g_mutex_unlock(&coroutine_lock);
}
static gpointer coroutine_thread(gpointer opaque)
{
CoroutineGThread *co = opaque;
set_coroutine_key(co, false);
coroutine_wait_runnable(co);
co->base.entry(co->base.entry_arg);
qemu_coroutine_switch(&co->base, co->base.caller, COROUTINE_TERMINATE);
return NULL;
}
Coroutine *qemu_coroutine_new(void)
{
CoroutineGThread *co;
co = g_malloc0(sizeof(*co));
co->thread = create_thread(coroutine_thread, co);
if (!co->thread) {
g_free(co);
return NULL;
}
return &co->base;
}
void qemu_coroutine_delete(Coroutine *co_)
{
CoroutineGThread *co = DO_UPCAST(CoroutineGThread, base, co_);
g_thread_join(co->thread);
g_free(co);
}
CoroutineAction qemu_coroutine_switch(Coroutine *from_,
Coroutine *to_,
CoroutineAction action)
{
CoroutineGThread *from = DO_UPCAST(CoroutineGThread, base, from_);
CoroutineGThread *to = DO_UPCAST(CoroutineGThread, base, to_);
g_mutex_lock(&coroutine_lock);
from->runnable = false;
from->action = action;
to->runnable = true;
to->action = action;
g_cond_broadcast(&coroutine_cond);
if (action != COROUTINE_TERMINATE) {
coroutine_wait_runnable_locked(from);
}
g_mutex_unlock(&coroutine_lock);
return from->action;
}
Coroutine *qemu_coroutine_self(void)
{
CoroutineGThread *co = get_coroutine_key();
if (!co) {
co = g_malloc0(sizeof(*co));
co->runnable = true;
set_coroutine_key(co, true);
}
return &co->base;
}
bool qemu_in_coroutine(void)
{
CoroutineGThread *co = get_coroutine_key();
return co && co->base.caller;
}

View file

@ -0,0 +1,293 @@
/*
* sigaltstack coroutine initialization code
*
* Copyright (C) 2006 Anthony Liguori <anthony@codemonkey.ws>
* Copyright (C) 2011 Kevin Wolf <kwolf@redhat.com>
* Copyright (C) 2012 Alex Barcelo <abarcelo@ac.upc.edu>
** This file is partly based on pth_mctx.c, from the GNU Portable Threads
** Copyright (c) 1999-2006 Ralf S. Engelschall <rse@engelschall.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */
#ifdef _FORTIFY_SOURCE
#undef _FORTIFY_SOURCE
#endif
#include <stdlib.h>
#include <setjmp.h>
#include <stdint.h>
#include <pthread.h>
#include <signal.h>
#include "qemu-common.h"
#include "qemu/coroutine_int.h"
typedef struct {
Coroutine base;
void *stack;
sigjmp_buf env;
} CoroutineUContext;
/**
* Per-thread coroutine bookkeeping
*/
typedef struct {
/** Currently executing coroutine */
Coroutine *current;
/** The default coroutine */
CoroutineUContext leader;
/** Information for the signal handler (trampoline) */
sigjmp_buf tr_reenter;
volatile sig_atomic_t tr_called;
void *tr_handler;
} CoroutineThreadState;
static pthread_key_t thread_state_key;
static CoroutineThreadState *coroutine_get_thread_state(void)
{
CoroutineThreadState *s = pthread_getspecific(thread_state_key);
if (!s) {
s = g_malloc0(sizeof(*s));
s->current = &s->leader.base;
pthread_setspecific(thread_state_key, s);
}
return s;
}
static void qemu_coroutine_thread_cleanup(void *opaque)
{
CoroutineThreadState *s = opaque;
g_free(s);
}
static void __attribute__((constructor)) coroutine_init(void)
{
int ret;
ret = pthread_key_create(&thread_state_key, qemu_coroutine_thread_cleanup);
if (ret != 0) {
fprintf(stderr, "unable to create leader key: %s\n", strerror(errno));
abort();
}
}
/* "boot" function
* This is what starts the coroutine, is called from the trampoline
* (from the signal handler when it is not signal handling, read ahead
* for more information).
*/
static void coroutine_bootstrap(CoroutineUContext *self, Coroutine *co)
{
/* Initialize longjmp environment and switch back the caller */
if (!sigsetjmp(self->env, 0)) {
siglongjmp(*(sigjmp_buf *)co->entry_arg, 1);
}
while (true) {
co->entry(co->entry_arg);
qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE);
}
}
/*
* This is used as the signal handler. This is called with the brand new stack
* (thanks to sigaltstack). We have to return, given that this is a signal
* handler and the sigmask and some other things are changed.
*/
static void coroutine_trampoline(int signal)
{
CoroutineUContext *self;
Coroutine *co;
CoroutineThreadState *coTS;
/* Get the thread specific information */
coTS = coroutine_get_thread_state();
self = coTS->tr_handler;
coTS->tr_called = 1;
co = &self->base;
/*
* Here we have to do a bit of a ping pong between the caller, given that
* this is a signal handler and we have to do a return "soon". Then the
* caller can reestablish everything and do a siglongjmp here again.
*/
if (!sigsetjmp(coTS->tr_reenter, 0)) {
return;
}
/*
* Ok, the caller has siglongjmp'ed back to us, so now prepare
* us for the real machine state switching. We have to jump
* into another function here to get a new stack context for
* the auto variables (which have to be auto-variables
* because the start of the thread happens later). Else with
* PIC (i.e. Position Independent Code which is used when PTH
* is built as a shared library) most platforms would
* horrible core dump as experience showed.
*/
coroutine_bootstrap(self, co);
}
Coroutine *qemu_coroutine_new(void)
{
const size_t stack_size = 1 << 20;
CoroutineUContext *co;
CoroutineThreadState *coTS;
struct sigaction sa;
struct sigaction osa;
stack_t ss;
stack_t oss;
sigset_t sigs;
sigset_t osigs;
sigjmp_buf old_env;
/* The way to manipulate stack is with the sigaltstack function. We
* prepare a stack, with it delivering a signal to ourselves and then
* put sigsetjmp/siglongjmp where needed.
* This has been done keeping coroutine-ucontext as a model and with the
* pth ideas (GNU Portable Threads). See coroutine-ucontext for the basics
* of the coroutines and see pth_mctx.c (from the pth project) for the
* sigaltstack way of manipulating stacks.
*/
co = g_malloc0(sizeof(*co));
co->stack = g_malloc(stack_size);
co->base.entry_arg = &old_env; /* stash away our jmp_buf */
coTS = coroutine_get_thread_state();
coTS->tr_handler = co;
/*
* Preserve the SIGUSR2 signal state, block SIGUSR2,
* and establish our signal handler. The signal will
* later transfer control onto the signal stack.
*/
sigemptyset(&sigs);
sigaddset(&sigs, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &sigs, &osigs);
sa.sa_handler = coroutine_trampoline;
sigfillset(&sa.sa_mask);
sa.sa_flags = SA_ONSTACK;
if (sigaction(SIGUSR2, &sa, &osa) != 0) {
abort();
}
/*
* Set the new stack.
*/
ss.ss_sp = co->stack;
ss.ss_size = stack_size;
ss.ss_flags = 0;
if (sigaltstack(&ss, &oss) < 0) {
abort();
}
/*
* Now transfer control onto the signal stack and set it up.
* It will return immediately via "return" after the sigsetjmp()
* was performed. Be careful here with race conditions. The
* signal can be delivered the first time sigsuspend() is
* called.
*/
coTS->tr_called = 0;
pthread_kill(pthread_self(), SIGUSR2);
sigfillset(&sigs);
sigdelset(&sigs, SIGUSR2);
while (!coTS->tr_called) {
sigsuspend(&sigs);
}
/*
* Inform the system that we are back off the signal stack by
* removing the alternative signal stack. Be careful here: It
* first has to be disabled, before it can be removed.
*/
sigaltstack(NULL, &ss);
ss.ss_flags = SS_DISABLE;
if (sigaltstack(&ss, NULL) < 0) {
abort();
}
sigaltstack(NULL, &ss);
if (!(oss.ss_flags & SS_DISABLE)) {
sigaltstack(&oss, NULL);
}
/*
* Restore the old SIGUSR2 signal handler and mask
*/
sigaction(SIGUSR2, &osa, NULL);
pthread_sigmask(SIG_SETMASK, &osigs, NULL);
/*
* Now enter the trampoline again, but this time not as a signal
* handler. Instead we jump into it directly. The functionally
* redundant ping-pong pointer arithmetic is necessary to avoid
* type-conversion warnings related to the `volatile' qualifier and
* the fact that `jmp_buf' usually is an array type.
*/
if (!sigsetjmp(old_env, 0)) {
siglongjmp(coTS->tr_reenter, 1);
}
/*
* Ok, we returned again, so now we're finished
*/
return &co->base;
}
void qemu_coroutine_delete(Coroutine *co_)
{
CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_);
g_free(co->stack);
g_free(co);
}
CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,
CoroutineAction action)
{
CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_);
CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_);
CoroutineThreadState *s = coroutine_get_thread_state();
int ret;
s->current = to_;
ret = sigsetjmp(from->env, 0);
if (ret == 0) {
siglongjmp(to->env, action);
}
return ret;
}
Coroutine *qemu_coroutine_self(void)
{
CoroutineThreadState *s = coroutine_get_thread_state();
return s->current;
}
bool qemu_in_coroutine(void)
{
CoroutineThreadState *s = pthread_getspecific(thread_state_key);
return s && s->current->caller;
}

194
util/coroutine-ucontext.c Normal file
View file

@ -0,0 +1,194 @@
/*
* ucontext coroutine initialization code
*
* Copyright (C) 2006 Anthony Liguori <anthony@codemonkey.ws>
* Copyright (C) 2011 Kevin Wolf <kwolf@redhat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.0 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */
#ifdef _FORTIFY_SOURCE
#undef _FORTIFY_SOURCE
#endif
#include <stdlib.h>
#include <setjmp.h>
#include <stdint.h>
#include <ucontext.h>
#include "qemu-common.h"
#include "qemu/coroutine_int.h"
#ifdef CONFIG_VALGRIND_H
#include <valgrind/valgrind.h>
#endif
typedef struct {
Coroutine base;
void *stack;
sigjmp_buf env;
#ifdef CONFIG_VALGRIND_H
unsigned int valgrind_stack_id;
#endif
} CoroutineUContext;
/**
* Per-thread coroutine bookkeeping
*/
static __thread CoroutineUContext leader;
static __thread Coroutine *current;
/*
* va_args to makecontext() must be type 'int', so passing
* the pointer we need may require several int args. This
* union is a quick hack to let us do that
*/
union cc_arg {
void *p;
int i[2];
};
static void coroutine_trampoline(int i0, int i1)
{
union cc_arg arg;
CoroutineUContext *self;
Coroutine *co;
arg.i[0] = i0;
arg.i[1] = i1;
self = arg.p;
co = &self->base;
/* Initialize longjmp environment and switch back the caller */
if (!sigsetjmp(self->env, 0)) {
siglongjmp(*(sigjmp_buf *)co->entry_arg, 1);
}
while (true) {
co->entry(co->entry_arg);
qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE);
}
}
Coroutine *qemu_coroutine_new(void)
{
const size_t stack_size = 1 << 20;
CoroutineUContext *co;
ucontext_t old_uc, uc;
sigjmp_buf old_env;
union cc_arg arg = {0};
/* The ucontext functions preserve signal masks which incurs a
* system call overhead. sigsetjmp(buf, 0)/siglongjmp() does not
* preserve signal masks but only works on the current stack.
* Since we need a way to create and switch to a new stack, use
* the ucontext functions for that but sigsetjmp()/siglongjmp() for
* everything else.
*/
if (getcontext(&uc) == -1) {
abort();
}
co = g_malloc0(sizeof(*co));
co->stack = g_malloc(stack_size);
co->base.entry_arg = &old_env; /* stash away our jmp_buf */
uc.uc_link = &old_uc;
uc.uc_stack.ss_sp = co->stack;
uc.uc_stack.ss_size = stack_size;
uc.uc_stack.ss_flags = 0;
#ifdef CONFIG_VALGRIND_H
co->valgrind_stack_id =
VALGRIND_STACK_REGISTER(co->stack, co->stack + stack_size);
#endif
arg.p = co;
makecontext(&uc, (void (*)(void))coroutine_trampoline,
2, arg.i[0], arg.i[1]);
/* swapcontext() in, siglongjmp() back out */
if (!sigsetjmp(old_env, 0)) {
swapcontext(&old_uc, &uc);
}
return &co->base;
}
#ifdef CONFIG_VALGRIND_H
#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE
/* Work around an unused variable in the valgrind.h macro... */
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#endif
static inline void valgrind_stack_deregister(CoroutineUContext *co)
{
VALGRIND_STACK_DEREGISTER(co->valgrind_stack_id);
}
#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE
#pragma GCC diagnostic pop
#endif
#endif
void qemu_coroutine_delete(Coroutine *co_)
{
CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_);
#ifdef CONFIG_VALGRIND_H
valgrind_stack_deregister(co);
#endif
g_free(co->stack);
g_free(co);
}
/* This function is marked noinline to prevent GCC from inlining it
* into coroutine_trampoline(). If we allow it to do that then it
* hoists the code to get the address of the TLS variable "current"
* out of the while() loop. This is an invalid transformation because
* the sigsetjmp() call may be called when running thread A but
* return in thread B, and so we might be in a different thread
* context each time round the loop.
*/
CoroutineAction __attribute__((noinline))
qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,
CoroutineAction action)
{
CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_);
CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_);
int ret;
current = to_;
ret = sigsetjmp(from->env, 0);
if (ret == 0) {
siglongjmp(to->env, action);
}
return ret;
}
Coroutine *qemu_coroutine_self(void)
{
if (!current) {
current = &leader.base;
}
return current;
}
bool qemu_in_coroutine(void)
{
return current && current->caller;
}

101
util/coroutine-win32.c Normal file
View file

@ -0,0 +1,101 @@
/*
* Win32 coroutine initialization code
*
* Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "qemu-common.h"
#include "qemu/coroutine_int.h"
typedef struct
{
Coroutine base;
LPVOID fiber;
CoroutineAction action;
} CoroutineWin32;
static __thread CoroutineWin32 leader;
static __thread Coroutine *current;
/* This function is marked noinline to prevent GCC from inlining it
* into coroutine_trampoline(). If we allow it to do that then it
* hoists the code to get the address of the TLS variable "current"
* out of the while() loop. This is an invalid transformation because
* the SwitchToFiber() call may be called when running thread A but
* return in thread B, and so we might be in a different thread
* context each time round the loop.
*/
CoroutineAction __attribute__((noinline))
qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,
CoroutineAction action)
{
CoroutineWin32 *from = DO_UPCAST(CoroutineWin32, base, from_);
CoroutineWin32 *to = DO_UPCAST(CoroutineWin32, base, to_);
current = to_;
to->action = action;
SwitchToFiber(to->fiber);
return from->action;
}
static void CALLBACK coroutine_trampoline(void *co_)
{
Coroutine *co = co_;
while (true) {
co->entry(co->entry_arg);
qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE);
}
}
Coroutine *qemu_coroutine_new(void)
{
const size_t stack_size = 1 << 20;
CoroutineWin32 *co;
co = g_malloc0(sizeof(*co));
co->fiber = CreateFiber(stack_size, coroutine_trampoline, &co->base);
return &co->base;
}
void qemu_coroutine_delete(Coroutine *co_)
{
CoroutineWin32 *co = DO_UPCAST(CoroutineWin32, base, co_);
DeleteFiber(co->fiber);
g_free(co);
}
Coroutine *qemu_coroutine_self(void)
{
if (!current) {
current = &leader.base;
leader.fiber = ConvertThreadToFiber(NULL);
}
return current;
}
bool qemu_in_coroutine(void)
{
return current && current->caller;
}

91
util/qemu-coroutine-io.c Normal file
View file

@ -0,0 +1,91 @@
/*
* Coroutine-aware I/O functions
*
* Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
* Copyright (c) 2011, Red Hat, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "qemu-common.h"
#include "qemu/sockets.h"
#include "qemu/coroutine.h"
#include "qemu/iov.h"
#include "qemu/main-loop.h"
ssize_t coroutine_fn
qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned iov_cnt,
size_t offset, size_t bytes, bool do_send)
{
size_t done = 0;
ssize_t ret;
int err;
while (done < bytes) {
ret = iov_send_recv(sockfd, iov, iov_cnt,
offset + done, bytes - done, do_send);
if (ret > 0) {
done += ret;
} else if (ret < 0) {
err = socket_error();
if (err == EAGAIN || err == EWOULDBLOCK) {
qemu_coroutine_yield();
} else if (done == 0) {
return -err;
} else {
break;
}
} else if (ret == 0 && !do_send) {
/* write (send) should never return 0.
* read (recv) returns 0 for end-of-file (-data).
* In both cases there's little point retrying,
* but we do for write anyway, just in case */
break;
}
}
return done;
}
ssize_t coroutine_fn
qemu_co_send_recv(int sockfd, void *buf, size_t bytes, bool do_send)
{
struct iovec iov = { .iov_base = buf, .iov_len = bytes };
return qemu_co_sendv_recvv(sockfd, &iov, 1, 0, bytes, do_send);
}
typedef struct {
Coroutine *co;
int fd;
} FDYieldUntilData;
static void fd_coroutine_enter(void *opaque)
{
FDYieldUntilData *data = opaque;
qemu_set_fd_handler(data->fd, NULL, NULL, NULL);
qemu_coroutine_enter(data->co, NULL);
}
void coroutine_fn yield_until_fd_readable(int fd)
{
FDYieldUntilData data;
assert(qemu_in_coroutine());
data.co = qemu_coroutine_self();
data.fd = fd;
qemu_set_fd_handler(fd, fd_coroutine_enter, NULL, &data);
qemu_coroutine_yield();
}

186
util/qemu-coroutine-lock.c Normal file
View file

@ -0,0 +1,186 @@
/*
* coroutine queues and locks
*
* Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "qemu-common.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
#include "qemu/queue.h"
#include "trace.h"
void qemu_co_queue_init(CoQueue *queue)
{
QTAILQ_INIT(&queue->entries);
}
void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
{
Coroutine *self = qemu_coroutine_self();
QTAILQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
qemu_coroutine_yield();
assert(qemu_in_coroutine());
}
/**
* qemu_co_queue_run_restart:
*
* Enter each coroutine that was previously marked for restart by
* qemu_co_queue_next() or qemu_co_queue_restart_all(). This function is
* invoked by the core coroutine code when the current coroutine yields or
* terminates.
*/
void qemu_co_queue_run_restart(Coroutine *co)
{
Coroutine *next;
trace_qemu_co_queue_run_restart(co);
while ((next = QTAILQ_FIRST(&co->co_queue_wakeup))) {
QTAILQ_REMOVE(&co->co_queue_wakeup, next, co_queue_next);
qemu_coroutine_enter(next, NULL);
}
}
static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
{
Coroutine *self = qemu_coroutine_self();
Coroutine *next;
if (QTAILQ_EMPTY(&queue->entries)) {
return false;
}
while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) {
QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
QTAILQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next);
trace_qemu_co_queue_next(next);
if (single) {
break;
}
}
return true;
}
bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
{
assert(qemu_in_coroutine());
return qemu_co_queue_do_restart(queue, true);
}
void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
{
assert(qemu_in_coroutine());
qemu_co_queue_do_restart(queue, false);
}
bool qemu_co_enter_next(CoQueue *queue)
{
Coroutine *next;
next = QTAILQ_FIRST(&queue->entries);
if (!next) {
return false;
}
QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
qemu_coroutine_enter(next, NULL);
return true;
}
bool qemu_co_queue_empty(CoQueue *queue)
{
return QTAILQ_FIRST(&queue->entries) == NULL;
}
void qemu_co_mutex_init(CoMutex *mutex)
{
memset(mutex, 0, sizeof(*mutex));
qemu_co_queue_init(&mutex->queue);
}
void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
{
Coroutine *self = qemu_coroutine_self();
trace_qemu_co_mutex_lock_entry(mutex, self);
while (mutex->locked) {
qemu_co_queue_wait(&mutex->queue);
}
mutex->locked = true;
trace_qemu_co_mutex_lock_return(mutex, self);
}
void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
{
Coroutine *self = qemu_coroutine_self();
trace_qemu_co_mutex_unlock_entry(mutex, self);
assert(mutex->locked == true);
assert(qemu_in_coroutine());
mutex->locked = false;
qemu_co_queue_next(&mutex->queue);
trace_qemu_co_mutex_unlock_return(mutex, self);
}
void qemu_co_rwlock_init(CoRwlock *lock)
{
memset(lock, 0, sizeof(*lock));
qemu_co_queue_init(&lock->queue);
}
void qemu_co_rwlock_rdlock(CoRwlock *lock)
{
while (lock->writer) {
qemu_co_queue_wait(&lock->queue);
}
lock->reader++;
}
void qemu_co_rwlock_unlock(CoRwlock *lock)
{
assert(qemu_in_coroutine());
if (lock->writer) {
lock->writer = false;
qemu_co_queue_restart_all(&lock->queue);
} else {
lock->reader--;
assert(lock->reader >= 0);
/* Wakeup only one waiting writer */
if (!lock->reader) {
qemu_co_queue_next(&lock->queue);
}
}
}
void qemu_co_rwlock_wrlock(CoRwlock *lock)
{
while (lock->writer || lock->reader) {
qemu_co_queue_wait(&lock->queue);
}
lock->writer = true;
}

View file

@ -0,0 +1,41 @@
/*
* QEMU coroutine sleep
*
* Copyright IBM, Corp. 2011
*
* Authors:
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
*
* This work is licensed under the terms of the GNU LGPL, version 2 or later.
* See the COPYING.LIB file in the top-level directory.
*
*/
#include "qemu/coroutine.h"
#include "qemu/timer.h"
#include "block/aio.h"
typedef struct CoSleepCB {
QEMUTimer *ts;
Coroutine *co;
} CoSleepCB;
static void co_sleep_cb(void *opaque)
{
CoSleepCB *sleep_cb = opaque;
qemu_coroutine_enter(sleep_cb->co, NULL);
}
void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type,
int64_t ns)
{
CoSleepCB sleep_cb = {
.co = qemu_coroutine_self(),
};
sleep_cb.ts = aio_timer_new(ctx, type, SCALE_NS, co_sleep_cb, &sleep_cb);
timer_mod(sleep_cb.ts, qemu_clock_get_ns(type) + ns);
qemu_coroutine_yield();
timer_del(sleep_cb.ts);
timer_free(sleep_cb.ts);
}

146
util/qemu-coroutine.c Normal file
View file

@ -0,0 +1,146 @@
/*
* QEMU coroutines
*
* Copyright IBM, Corp. 2011
*
* Authors:
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
* Kevin Wolf <kwolf@redhat.com>
*
* This work is licensed under the terms of the GNU LGPL, version 2 or later.
* See the COPYING.LIB file in the top-level directory.
*
*/
#include "trace.h"
#include "qemu-common.h"
#include "qemu/thread.h"
#include "qemu/atomic.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
enum {
POOL_BATCH_SIZE = 64,
};
/** Free list to speed up creation */
static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool);
static unsigned int release_pool_size;
static __thread QSLIST_HEAD(, Coroutine) alloc_pool = QSLIST_HEAD_INITIALIZER(pool);
static __thread unsigned int alloc_pool_size;
static __thread Notifier coroutine_pool_cleanup_notifier;
static void coroutine_pool_cleanup(Notifier *n, void *value)
{
Coroutine *co;
Coroutine *tmp;
QSLIST_FOREACH_SAFE(co, &alloc_pool, pool_next, tmp) {
QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
qemu_coroutine_delete(co);
}
}
Coroutine *qemu_coroutine_create(CoroutineEntry *entry)
{
Coroutine *co = NULL;
if (CONFIG_COROUTINE_POOL) {
co = QSLIST_FIRST(&alloc_pool);
if (!co) {
if (release_pool_size > POOL_BATCH_SIZE) {
/* Slow path; a good place to register the destructor, too. */
if (!coroutine_pool_cleanup_notifier.notify) {
coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup;
qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier);
}
/* This is not exact; there could be a little skew between
* release_pool_size and the actual size of release_pool. But
* it is just a heuristic, it does not need to be perfect.
*/
alloc_pool_size = atomic_xchg(&release_pool_size, 0);
QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool);
co = QSLIST_FIRST(&alloc_pool);
}
}
if (co) {
QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
alloc_pool_size--;
}
}
if (!co) {
co = qemu_coroutine_new();
}
co->entry = entry;
QTAILQ_INIT(&co->co_queue_wakeup);
return co;
}
static void coroutine_delete(Coroutine *co)
{
co->caller = NULL;
if (CONFIG_COROUTINE_POOL) {
if (release_pool_size < POOL_BATCH_SIZE * 2) {
QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next);
atomic_inc(&release_pool_size);
return;
}
if (alloc_pool_size < POOL_BATCH_SIZE) {
QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next);
alloc_pool_size++;
return;
}
}
qemu_coroutine_delete(co);
}
void qemu_coroutine_enter(Coroutine *co, void *opaque)
{
Coroutine *self = qemu_coroutine_self();
CoroutineAction ret;
trace_qemu_coroutine_enter(self, co, opaque);
if (co->caller) {
fprintf(stderr, "Co-routine re-entered recursively\n");
abort();
}
co->caller = self;
co->entry_arg = opaque;
ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);
qemu_co_queue_run_restart(co);
switch (ret) {
case COROUTINE_YIELD:
return;
case COROUTINE_TERMINATE:
trace_qemu_coroutine_terminate(co);
coroutine_delete(co);
return;
default:
abort();
}
}
void coroutine_fn qemu_coroutine_yield(void)
{
Coroutine *self = qemu_coroutine_self();
Coroutine *to = self->caller;
trace_qemu_coroutine_yield(self, to);
if (!to) {
fprintf(stderr, "Co-routine is yielding to no one\n");
abort();
}
self->caller = NULL;
qemu_coroutine_switch(self, to, COROUTINE_YIELD);
}