From d7da45e152995e57123983e05f5faa9974e5df7a Mon Sep 17 00:00:00 2001 From: Timofey Titovets Date: Mon, 22 Sep 2025 18:19:55 +0200 Subject: [PATCH] serialqueue: decouple pending & ready queues Simply describe how the cmdqueue is moved between states. Signed-off-by: Timofey Titovets --- klippy/chelper/serialqueue.c | 107 +++++++++++++++++++++-------------- 1 file changed, 63 insertions(+), 44 deletions(-) diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c index 914d4c395..724064bae 100644 --- a/klippy/chelper/serialqueue.c +++ b/klippy/chelper/serialqueue.c @@ -29,11 +29,15 @@ #include "pyhelper.h" // get_monotonic #include "serialqueue.h" // struct queue_message -struct command_queue { - struct list_head upcoming_queue, ready_queue; +struct message_sub_queue { + struct list_head msg_queue; struct list_node node; }; +struct command_queue { + struct message_sub_queue ready, upcoming; +}; + struct serialqueue { // Input reading struct pollreactor *pr; @@ -59,8 +63,10 @@ struct serialqueue { struct list_head sent_queue; double srtt, rttvar, rto; // Pending transmission message queues - struct list_head pending_queues; - int ready_bytes, upcoming_bytes, need_ack_bytes, last_ack_bytes; + struct list_head upcoming_queues; + int upcoming_bytes; + struct list_head ready_queues; + int ready_bytes, need_ack_bytes, last_ack_bytes; uint64_t need_kick_clock; struct list_head notify_queue; double last_write_fail_time; @@ -452,23 +458,21 @@ build_and_send_command(struct serialqueue *sq, uint8_t *buf, int pending uint64_t min_clock = MAX_CLOCK; struct command_queue *q, *cq = NULL; struct queue_message *qm = NULL; - list_for_each_entry(q, &sq->pending_queues, node) { - if (!list_empty(&q->ready_queue)) { - struct queue_message *m = list_first_entry( - &q->ready_queue, struct queue_message, node); - if (m->req_clock < min_clock) { - min_clock = m->req_clock; - cq = q; - qm = m; - } + list_for_each_entry(q, &sq->ready_queues, ready.node) { + struct queue_message *m = list_first_entry( + &q->ready.msg_queue, struct queue_message, node); + if (m->req_clock < min_clock) { + min_clock = m->req_clock; + cq = q; + qm = m; } } // Append message to outgoing command if (len + qm->len > MESSAGE_MAX - MESSAGE_TRAILER_SIZE) break; list_del(&qm->node); - if (list_empty(&cq->ready_queue) && list_empty(&cq->upcoming_queue)) - list_del(&cq->node); + if (list_empty(&cq->ready.msg_queue)) + list_del(&cq->ready.node); memcpy(&buf[len], qm->msg, qm->len); len += qm->len; sq->ready_bytes -= qm->len; @@ -530,34 +534,42 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime) idletime += calculate_bittime(sq, pending + MESSAGE_MIN); uint64_t ack_clock = clock_from_time(&sq->ce, idletime); uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK; - struct command_queue *cq; - list_for_each_entry(cq, &sq->pending_queues, node) { - // Move messages from the upcoming_queue to the ready_queue - while (!list_empty(&cq->upcoming_queue)) { + struct command_queue *cq, *_ncq; + list_for_each_entry_safe(cq, _ncq, &sq->upcoming_queues, upcoming.node) { + int not_in_ready_queues = list_empty(&cq->ready.msg_queue); + // Move messages from the upcoming.msg_queue to the ready.msg_queue + while (!list_empty(&cq->upcoming.msg_queue)) { struct queue_message *qm = list_first_entry( - &cq->upcoming_queue, struct queue_message, node); + &cq->upcoming.msg_queue, struct queue_message, node); if (ack_clock < qm->min_clock) { if (qm->min_clock < min_stalled_clock) min_stalled_clock = qm->min_clock; break; } list_del(&qm->node); - list_add_tail(&qm->node, &cq->ready_queue); + list_add_tail(&qm->node, &cq->ready.msg_queue); sq->upcoming_bytes -= qm->len; sq->ready_bytes += qm->len; } + // Remove cq from the list if it is now empty + if (list_empty(&cq->upcoming.msg_queue)) + list_del(&cq->upcoming.node); + // Add to ready queues + if (not_in_ready_queues && !list_empty(&cq->ready.msg_queue)) + list_add_tail(&cq->ready.node, &sq->ready_queues); + } + // Check if it is still needed to send messages from the ready_queues + list_for_each_entry(cq, &sq->ready_queues, ready.node) { // Update min_ready_clock - if (!list_empty(&cq->ready_queue)) { - struct queue_message *qm = list_first_entry( - &cq->ready_queue, struct queue_message, node); - uint64_t req_clock = qm->req_clock; - double bgtime = pending ? idletime : sq->idle_time; - double bgoffset = MIN_REQTIME_DELTA + MIN_BACKGROUND_DELTA; - if (req_clock == BACKGROUND_PRIORITY_CLOCK) - req_clock = clock_from_time(&sq->ce, bgtime + bgoffset); - if (req_clock < min_ready_clock) - min_ready_clock = req_clock; - } + struct queue_message *qm = list_first_entry( + &cq->ready.msg_queue, struct queue_message, node); + uint64_t req_clock = qm->req_clock; + double bgtime = pending ? idletime : sq->idle_time; + double bgoffset = MIN_REQTIME_DELTA + MIN_BACKGROUND_DELTA; + if (req_clock == BACKGROUND_PRIORITY_CLOCK) + req_clock = clock_from_time(&sq->ce, bgtime + bgoffset); + if (req_clock < min_ready_clock) + min_ready_clock = req_clock; } // Check for messages to send @@ -664,7 +676,8 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id // Queues sq->need_kick_clock = MAX_CLOCK; - list_init(&sq->pending_queues); + list_init(&sq->upcoming_queues); + list_init(&sq->ready_queues); list_init(&sq->sent_queue); list_init(&sq->receive_queue); list_init(&sq->notify_queue); @@ -722,12 +735,17 @@ serialqueue_free(struct serialqueue *sq) message_queue_free(&sq->notify_queue); message_queue_free(&sq->old_sent); message_queue_free(&sq->old_receive); - while (!list_empty(&sq->pending_queues)) { + while (!list_empty(&sq->ready_queues)) { + struct command_queue* cq = list_first_entry( + &sq->ready_queues, struct command_queue, ready.node); + list_del(&cq->ready.node); + message_queue_free(&cq->ready.msg_queue); + } + while (!list_empty(&sq->upcoming_queues)) { struct command_queue *cq = list_first_entry( - &sq->pending_queues, struct command_queue, node); - list_del(&cq->node); - message_queue_free(&cq->ready_queue); - message_queue_free(&cq->upcoming_queue); + &sq->upcoming_queues, struct command_queue, upcoming.node); + list_del(&cq->upcoming.node); + message_queue_free(&cq->upcoming.msg_queue); } pthread_mutex_unlock(&sq->lock); pollreactor_free(sq->pr); @@ -740,8 +758,8 @@ serialqueue_alloc_commandqueue(void) { struct command_queue *cq = malloc(sizeof(*cq)); memset(cq, 0, sizeof(*cq)); - list_init(&cq->ready_queue); - list_init(&cq->upcoming_queue); + list_init(&cq->ready.msg_queue); + list_init(&cq->upcoming.msg_queue); return cq; } @@ -751,7 +769,8 @@ serialqueue_free_commandqueue(struct command_queue *cq) { if (!cq) return; - if (!list_empty(&cq->ready_queue) || !list_empty(&cq->upcoming_queue)) { + if (!list_empty(&cq->ready.msg_queue) || + !list_empty(&cq->upcoming.msg_queue)) { errorf("Memory leak! Can't free non-empty commandqueue"); return; } @@ -799,9 +818,9 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq // Add list to cq->upcoming_queue pthread_mutex_lock(&sq->lock); - if (list_empty(&cq->ready_queue) && list_empty(&cq->upcoming_queue)) - list_add_tail(&cq->node, &sq->pending_queues); - list_join_tail(msgs, &cq->upcoming_queue); + if (list_empty(&cq->upcoming.msg_queue)) + list_add_tail(&cq->upcoming.node, &sq->upcoming_queues); + list_join_tail(msgs, &cq->upcoming.msg_queue); sq->upcoming_bytes += len; int mustwake = 0; if (qm->min_clock < sq->need_kick_clock) {