From 3f9a4e3062bdc0313b366e7722910610d37b26d9 Mon Sep 17 00:00:00 2001
Message-Id: <3f9a4e3062bdc0313b366e7722910610d37b26d9.1427300678.git.jen@redhat.com>
In-Reply-To: <cd1e5c640073fe9f6f79125f2cbb3f434f1c7897.1427300678.git.jen@redhat.com>
References: <cd1e5c640073fe9f6f79125f2cbb3f434f1c7897.1427300678.git.jen@redhat.com>
From: Vlad Yasevich <vyasevic@redhat.com>
Date: Thu, 12 Mar 2015 19:13:03 -0500
Subject: [CHANGE 07/33] aio: introduce AioContext, move bottom halves there
To: rhvirt-patches@redhat.com,
    jen@redhat.com

RH-Author: Vlad Yasevich <vyasevic@redhat.com>
Message-id: <1426187601-21396-8-git-send-email-vyasevic@redhat.com>
Patchwork-id: 64344
O-Subject: [RHEL6.7 qemu-kvm PATCH v2 07/25] aio: introduce AioContext, move bottom halves there
Bugzilla: 1005016
RH-Acked-by: Juan Quintela <quintela@redhat.com>
RH-Acked-by: Michael S. Tsirkin <mst@redhat.com>
RH-Acked-by: Paolo Bonzini <pbonzini@redhat.com>

commit f627aab1ccea119fd94ca9e9df120cea6aab0c67
Author: Paolo Bonzini <pbonzini@redhat.com>
Date:   Mon Oct 29 23:45:23 2012 +0100

    aio: introduce AioContext, move bottom halves there

    Start introducing AioContext, which will let us remove globals from
    aio.c/async.c, and introduce multiple I/O threads.

    The bottom half functions now take an additional AioContext
    argument.
    A bottom half is created with a specific AioContext that remains the
    same throughout the lifetime.  qemu_bh_new is just a wrapper that
    uses a global context.

    Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>

Changes:
  Since we do not have mail-loop.c in our code stream, the wrapper
qemu_bh_ wrapper functions were placed into async.c.  Additionally
the global qemu_aio_context was placed there as well.  Most of
the calling code was left to call qemu_bh variants and use the
gloabal context.  I had to do it this way to limit the changes
and get nbd to build.
  There also appears to be an include depency chain between qemu_aio.h,
qemu-char.h and qerror.h.  After the original removed qemu-char.h from
qemu_aio.h, it caused compilation errors all other the place due to
undefined symbols defined in qerror.h.  Instead of fixing all those
inidividually, I've included qerror.h into qemu-aio.h.
  The rest of the code got mostly as is.

Signed-off-by: Vladislav Yasevich <vyasevic@redhat.com>
---
 aio.c                 |   2 -
 async.c               |  58 ++++++++++++++++++++--------
 block.h               |   2 -
 block_int.h           |  20 +---------
 hw/hw.h               |   1 +
 iohandler.c           |   1 +
 linux-aio.c           |   2 +-
 qemu-aio.h            | 105 +++++++++++++++++++++++++++++++++++++++++++++++++-
 qemu-char.h           |   1 +
 qemu-common.h         |   6 +--
 qemu-coroutine-lock.c |   1 +
 vl.c                  |   2 +-
 12 files changed, 156 insertions(+), 45 deletions(-)

Signed-off-by: Jeff E. Nelson <jen@redhat.com>
---
 aio.c                 |   2 -
 async.c               |  58 ++++++++++++++++++++--------
 block.h               |   2 -
 block_int.h           |  20 +---------
 hw/hw.h               |   1 +
 iohandler.c           |   1 +
 linux-aio.c           |   2 +-
 qemu-aio.h            | 105 +++++++++++++++++++++++++++++++++++++++++++++++++-
 qemu-char.h           |   1 +
 qemu-common.h         |   6 +--
 qemu-coroutine-lock.c |   1 +
 vl.c                  |   2 +-
 12 files changed, 156 insertions(+), 45 deletions(-)

diff --git a/aio.c b/aio.c
index d9c044c..aba0d9b 100644
--- a/aio.c
+++ b/aio.c
@@ -16,8 +16,6 @@
 #include "qemu-queue.h"
 #include "qemu_socket.h"
 
-typedef struct AioHandler AioHandler;
-
 /* The list of registered AIO handlers */
 static QLIST_HEAD(, AioHandler) aio_handlers;
 
diff --git a/async.c b/async.c
index a8b4ef6..db614a0 100644
--- a/async.c
+++ b/async.c
@@ -25,9 +25,6 @@
 #include "qemu-common.h"
 #include "qemu-aio.h"
 
-/* Anchor of the list of Bottom Halves belonging to the context */
-static struct QEMUBH *first_bh;
-
 /***********************************************************/
 /* bottom halves (can be seen as timers which expire ASAP) */
 
@@ -40,27 +37,26 @@ struct QEMUBH {
     QEMUBH *next;
 };
 
-QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque)
+QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
 {
     QEMUBH *bh;
     bh = qemu_mallocz(sizeof(QEMUBH));
     bh->cb = cb;
     bh->opaque = opaque;
-    bh->next = first_bh;
-    first_bh = bh;
+    bh->next = ctx->first_bh;
+    ctx->first_bh = bh;
     return bh;
 }
 
-int qemu_bh_poll(void)
+int aio_bh_poll(AioContext *ctx)
 {
     QEMUBH *bh, **bhp, *next;
     int ret;
-    static int nesting = 0;
 
-    nesting++;
+    ctx->walking_bh++;
 
     ret = 0;
-    for (bh = first_bh; bh; bh = next) {
+    for (bh = ctx->first_bh; bh; bh = next) {
         next = bh->next;
         if (!bh->deleted && bh->scheduled) {
             bh->scheduled = 0;
@@ -71,11 +67,11 @@ int qemu_bh_poll(void)
         }
     }
 
-    nesting--;
+    ctx->walking_bh--;
 
     /* remove deleted bhs */
-    if (!nesting) {
-        bhp = &first_bh;
+    if (!ctx->walking_bh) {
+        bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
             if (bh->deleted) {
@@ -119,11 +115,11 @@ void qemu_bh_delete(QEMUBH *bh)
     bh->deleted = 1;
 }
 
-void qemu_bh_update_timeout(int *timeout)
+void aio_bh_update_timeout(AioContext *ctx, uint32_t *timeout)
 {
     QEMUBH *bh;
 
-    for (bh = first_bh; bh; bh = bh->next) {
+    for (bh = ctx->first_bh; bh; bh = bh->next) {
         if (!bh->deleted && bh->scheduled) {
             if (bh->idle) {
                 /* idle bottom halves will be polled at least
@@ -139,3 +135,35 @@ void qemu_bh_update_timeout(int *timeout)
     }
 }
 
+AioContext *aio_context_new(void)
+{
+    return g_new0(AioContext, 1);
+}
+
+/*
+ * Wrappers using a static global context.
+ */
+static AioContext *__qemu_aio_context;
+
+static AioContext *qemu_aio_context(void)
+{
+    if (!__qemu_aio_context)
+        __qemu_aio_context = aio_context_new();
+
+    return __qemu_aio_context;
+}
+
+QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque)
+{
+    return aio_bh_new(qemu_aio_context(), cb, opaque);
+}
+
+int qemu_bh_poll(void)
+{
+     return aio_bh_poll(qemu_aio_context());
+}
+
+void qemu_bh_update_timeout(uint32_t *timeout)
+{
+    aio_bh_update_timeout(qemu_aio_context(), timeout);
+}
diff --git a/block.h b/block.h
index f53d153..5c055f4 100644
--- a/block.h
+++ b/block.h
@@ -264,8 +264,6 @@ typedef enum {
 int bdrv_check(BlockDriverState *bs, BdrvCheckResult *res, BdrvCheckMode fix);
 
 /* async block I/O */
-typedef struct BlockDriverAIOCB BlockDriverAIOCB;
-typedef void BlockDriverCompletionFunc(void *opaque, int ret);
 typedef void BlockDriverDirtyHandler(BlockDriverState *bs, int64_t sector,
 				     int sector_num);
 BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
diff --git a/block_int.h b/block_int.h
index 0417931..6b650d5 100644
--- a/block_int.h
+++ b/block_int.h
@@ -30,6 +30,8 @@
 #include "qemu-coroutine.h"
 #include "qemu-timer.h"
 #include "qemu/throttle.h"
+#include "qemu-aio.h"
+#include "qemu-config.h"
 #include "hbitmap.h"
 
 #define BLOCK_FLAG_ENCRYPT	1
@@ -48,12 +50,6 @@
 
 typedef struct BdrvTrackedRequest BdrvTrackedRequest;
 
-typedef struct AIOPool {
-    void (*cancel)(BlockDriverAIOCB *acb);
-    int aiocb_size;
-    BlockDriverAIOCB *free_aiocb;
-} AIOPool;
-
 typedef void BlockJobCancelFunc(void *opaque);
 typedef struct BlockJob BlockJob;
 typedef struct BlockJobType {
@@ -310,20 +306,8 @@ struct BlockDriverState {
 
 };
 
-struct BlockDriverAIOCB {
-    AIOPool *pool;
-    BlockDriverState *bs;
-    BlockDriverCompletionFunc *cb;
-    void *opaque;
-    BlockDriverAIOCB *next;
-};
-
 int get_tmp_filename(char *filename, int size);
 
-void *qemu_aio_get(AIOPool *pool, BlockDriverState *bs,
-                   BlockDriverCompletionFunc *cb, void *opaque);
-void qemu_aio_release(void *p);
-
 void *qemu_blockalign(BlockDriverState *bs, size_t size);
 void bdrv_set_io_limits(BlockDriverState *bs,
                         ThrottleConfig *cfg);
diff --git a/hw/hw.h b/hw/hw.h
index d866141..55a3190 100644
--- a/hw/hw.h
+++ b/hw/hw.h
@@ -12,6 +12,7 @@
 
 #include "ioport.h"
 #include "irq.h"
+#include "qemu-aio.h"
 
 /* VM Load/Save */
 
diff --git a/iohandler.c b/iohandler.c
index e326729..6c40d20 100644
--- a/iohandler.c
+++ b/iohandler.c
@@ -26,6 +26,7 @@
 #include "qemu-common.h"
 #include "qemu-char.h"
 #include "qemu-queue.h"
+#include "qemu-aio.h"
 
 #ifndef _WIN32
 #include <sys/wait.h>
diff --git a/linux-aio.c b/linux-aio.c
index 73c7e47..c52a683 100644
--- a/linux-aio.c
+++ b/linux-aio.c
@@ -9,7 +9,7 @@
  */
 #include "qemu-common.h"
 #include "qemu-aio.h"
-#include "block_int.h"
+#include "qemu-queue.h"
 #include "block/raw-posix-aio.h"
 
 #include <sys/eventfd.h>
diff --git a/qemu-aio.h b/qemu-aio.h
index a3e6bb3..5282778 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -15,11 +15,114 @@
 #define QEMU_AIO_H
 
 #include "qemu-common.h"
-#include "qemu-char.h"
+#include "qerror.h"
+
+typedef struct BlockDriverAIOCB BlockDriverAIOCB;
+typedef void BlockDriverCompletionFunc(void *opaque, int ret);
+
+typedef struct AIOPool {
+    void (*cancel)(BlockDriverAIOCB *acb);
+    int aiocb_size;
+    BlockDriverAIOCB *free_aiocb;
+} AIOPool;
+
+struct BlockDriverAIOCB {
+    AIOPool *pool;
+    BlockDriverState *bs;
+    BlockDriverCompletionFunc *cb;
+    void *opaque;
+    BlockDriverAIOCB *next;
+};
+
+void *qemu_aio_get(AIOPool *pool, BlockDriverState *bs,
+                   BlockDriverCompletionFunc *cb, void *opaque);
+void qemu_aio_release(void *p);
+
+typedef struct AioHandler AioHandler;
+typedef void IOHandler(void *opaque);
+
+typedef struct AioContext {
+    /* Anchor of the list of Bottom Halves belonging to the context */
+    struct QEMUBH *first_bh;
+
+    /* A simple lock used to protect the first_bh list, and ensure that
+     * no callbacks are removed while we're walking and dispatching callbacks.
+     */
+    int walking_bh;
+} AioContext;
 
 /* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
 typedef int (AioFlushHandler)(void *opaque);
 
+/* Runs all currently allowed AIO callbacks of completed requests in the
+ * respective AIO backend. Returns 0 if no requests was handled, non-zero
+ * if at least one queued request was handled. */
+typedef int (AioProcessQueue)(void *opaque);
+
+/**
+ * aio_context_new: Allocate a new AioContext.
+ *
+ * AioContext provide a mini event-loop that can be waited on synchronously.
+ * They also provide bottom halves, a service to execute a piece of code
+ * as soon as possible.
+ */
+AioContext *aio_context_new(void);
+
+/**
+ * aio_bh_new: Allocate a new bottom half structure.
+ *
+ * Bottom halves are lightweight callbacks whose invocation is guaranteed
+ * to be wait-free, thread-safe and signal-safe.  The #QEMUBH structure
+ * is opaque and must be allocated prior to its use.
+ */
+QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque);
+
+/**
+ * aio_bh_poll: Poll bottom halves for an AioContext.
+ *
+ * These are internal functions used by the QEMU main loop.
+ */
+int aio_bh_poll(AioContext *ctx);
+void aio_bh_update_timeout(AioContext *ctx, uint32_t *timeout);
+
+/**
+ * qemu_bh_schedule: Schedule a bottom half.
+ *
+ * Scheduling a bottom half interrupts the main loop and causes the
+ * execution of the callback that was passed to qemu_bh_new.
+ *
+ * Bottom halves that are scheduled from a bottom half handler are instantly
+ * invoked.  This can create an infinite loop if a bottom half handler
+ * schedules itself.
+ *
+ * @bh: The bottom half to be scheduled.
+ */
+void qemu_bh_schedule(QEMUBH *bh);
+
+/**
+ * qemu_bh_cancel: Cancel execution of a bottom half.
+ *
+ * Canceling execution of a bottom half undoes the effect of calls to
+ * qemu_bh_schedule without freeing its resources yet.  While cancellation
+ * itself is also wait-free and thread-safe, it can of course race with the
+ * loop that executes bottom halves unless you are holding the iothread
+ * mutex.  This makes it mostly useless if you are not holding the mutex.
+ *
+ * @bh: The bottom half to be canceled.
+ */
+void qemu_bh_cancel(QEMUBH *bh);
+
+/**
+ *qemu_bh_delete: Cancel execution of a bottom half and free its resources.
+ *
+ * Deleting a bottom half frees the memory that was allocated for it by
+ * qemu_bh_new.  It also implies canceling the bottom half if it was
+ * scheduled.
+ *
+ * @bh: The bottom half to be deleted.
+ */
+void qemu_bh_delete(QEMUBH *bh);
+
 /* Flush any pending AIO operation. This function will block until all
  * outstanding AIO operations have been completed or cancelled. */
 void qemu_aio_flush(void);
diff --git a/qemu-char.h b/qemu-char.h
index 7a034c5..7a0675d 100644
--- a/qemu-char.h
+++ b/qemu-char.h
@@ -5,6 +5,7 @@
 #include "qemu-queue.h"
 #include "qemu-option.h"
 #include "qemu-config.h"
+#include "qemu-aio.h"
 #include "qobject.h"
 #include "qstring.h"
 #include "qerror.h"
diff --git a/qemu-common.h b/qemu-common.h
index 54fb151..2e1a0d4 100644
--- a/qemu-common.h
+++ b/qemu-common.h
@@ -103,7 +103,6 @@ typedef struct QEMUBH QEMUBH;
 typedef void QEMUBHFunc(void *opaque);
 
 QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque);
-void qemu_bh_schedule(QEMUBH *bh);
 /* Bottom halfs that are scheduled from a bottom half handler are instantly
  * invoked.  This can create an infinite loop if a bottom half handler
  * schedules itself.  qemu_bh_schedule_idle() avoids this infinite loop by
@@ -111,10 +110,8 @@ void qemu_bh_schedule(QEMUBH *bh);
  * iteration.
  */
 void qemu_bh_schedule_idle(QEMUBH *bh);
-void qemu_bh_cancel(QEMUBH *bh);
-void qemu_bh_delete(QEMUBH *bh);
 int qemu_bh_poll(void);
-void qemu_bh_update_timeout(int *timeout);
+void qemu_bh_update_timeout(uint32_t *timeout);
 
 uint64_t muldiv64(uint64_t a, uint32_t b, uint32_t c);
 
@@ -189,7 +186,6 @@ void QEMU_NORETURN hw_error(const char *fmt, ...)
 /* IO callbacks.  */
 typedef void IOReadHandler(void *opaque, const uint8_t *buf, int size);
 typedef int IOCanReadHandler(void *opaque);
-typedef void IOHandler(void *opaque);
 
 struct ParallelIOArg {
     void *buffer;
diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c
index 84c8a07..abbd7bc 100644
--- a/qemu-coroutine-lock.c
+++ b/qemu-coroutine-lock.c
@@ -26,6 +26,7 @@
 #include "qemu-coroutine.h"
 #include "qemu-coroutine-int.h"
 #include "qemu-queue.h"
+#include "qemu-aio.h"
 #include "trace.h"
 
 static QTAILQ_HEAD(, Coroutine) unlock_bh_queue =
diff --git a/vl.c b/vl.c
index fe7e3a7..9715433 100644
--- a/vl.c
+++ b/vl.c
@@ -4025,7 +4025,7 @@ void main_loop_wait(int timeout)
     int ret, nfds;
     struct timeval tv;
 
-    qemu_bh_update_timeout(&timeout);
+    qemu_bh_update_timeout((uint32_t *)&timeout);
 
     os_host_main_loop_wait(&timeout);
 
-- 
2.1.0

