From 493271697f2aff1b54bdae62c27588e2db614161 Mon Sep 17 00:00:00 2001 From: Timofey Titovets Date: Fri, 19 Sep 2025 01:28:14 +0200 Subject: [PATCH] serialqueue: decouple serialhdl receive lock Signed-off-by: Timofey Titovets --- klippy/chelper/serialqueue.c | 121 +++++++++++++++++++++-------------- 1 file changed, 74 insertions(+), 47 deletions(-) diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c index 724064bae..0403b7a80 100644 --- a/klippy/chelper/serialqueue.c +++ b/klippy/chelper/serialqueue.c @@ -38,6 +38,14 @@ struct command_queue { struct message_sub_queue ready, upcoming; }; +struct receiver { + pthread_mutex_t lock; + pthread_cond_t cond; + int waiting; + struct list_head queue; + struct list_head old_receive; +}; + struct serialqueue { // Input reading struct pollreactor *pr; @@ -49,9 +57,9 @@ struct serialqueue { // Threading char name[16]; pthread_t tid; + // SerialHDL reader + struct receiver receiver; pthread_mutex_t lock; // protects variables below - pthread_cond_t cond; - int receive_waiting; // Baud / clock tracking int receive_window; double bittime_adjust, idle_time; @@ -70,13 +78,11 @@ struct serialqueue { uint64_t need_kick_clock; struct list_head notify_queue; double last_write_fail_time; - // Received messages - struct list_head receive_queue; // Fastreader support pthread_mutex_t fast_reader_dispatch_lock; struct list_head fast_readers; // Debugging - struct list_head old_sent, old_receive; + struct list_head old_sent; // Stats uint32_t bytes_write, bytes_read, bytes_retransmit, bytes_invalid; }; @@ -115,23 +121,30 @@ debug_queue_alloc(struct list_head *root, int count) } // Copy a message to a debug queue and free old debug messages -static void -debug_queue_add(struct list_head *root, struct queue_message *qm) +static struct queue_message * +_debug_queue_add(struct list_head *root, struct queue_message *qm) { list_add_tail(&qm->node, root); struct queue_message *old = list_first_entry( root, struct queue_message, node); list_del(&old->node); + return old; +} + +static void +debug_queue_add(struct list_head *root, struct queue_message *qm) +{ + struct queue_message *old = _debug_queue_add(root, qm); message_free(old); } // Wake up the receiver thread if it is waiting static void -check_wake_receive(struct serialqueue *sq) +check_wake_receive(struct receiver *receiver) { - if (sq->receive_waiting) { - sq->receive_waiting = 0; - pthread_cond_signal(&sq->cond); + if (receiver->waiting) { + receiver->waiting = 0; + pthread_cond_signal(&receiver->cond); } } @@ -245,7 +258,8 @@ handle_message(struct serialqueue *sq, double eventtime, int len) sq->bytes_read += len; // Check for pending messages on notify_queue - int must_wake = 0; + struct list_head received; + list_init(&received); while (!list_empty(&sq->notify_queue)) { struct queue_message *qm = list_first_entry( &sq->notify_queue, struct queue_message, node); @@ -257,8 +271,7 @@ handle_message(struct serialqueue *sq, double eventtime, int len) qm->len = 0; qm->sent_time = sq->last_receive_sent_time; qm->receive_time = eventtime; - list_add_tail(&qm->node, &sq->receive_queue); - must_wake = 1; + list_add_tail(&qm->node, &received); } // Process message @@ -276,8 +289,14 @@ handle_message(struct serialqueue *sq, double eventtime, int len) ? sq->last_receive_sent_time : 0.); qm->receive_time = get_monotonic(); // must be time post read() qm->receive_time -= calculate_bittime(sq, len); - list_add_tail(&qm->node, &sq->receive_queue); - must_wake = 1; + list_add_tail(&qm->node, &received); + } + + if (!list_empty(&received)) { + pthread_mutex_lock(&sq->receiver.lock); + list_join_tail(&received, &sq->receiver.queue); + check_wake_receive(&sq->receiver); + pthread_mutex_unlock(&sq->receiver.lock); } // Check fast readers @@ -289,16 +308,11 @@ handle_message(struct serialqueue *sq, double eventtime, int len) continue; // Release main lock and invoke callback pthread_mutex_lock(&sq->fast_reader_dispatch_lock); - if (must_wake) - check_wake_receive(sq); pthread_mutex_unlock(&sq->lock); fr->func(fr, sq->input_buf, len); pthread_mutex_unlock(&sq->fast_reader_dispatch_lock); return; } - - if (must_wake) - check_wake_receive(sq); pthread_mutex_unlock(&sq->lock); } @@ -628,9 +642,9 @@ background_thread(void *data) set_thread_name(sq->name); pollreactor_run(sq->pr); - pthread_mutex_lock(&sq->lock); - check_wake_receive(sq); - pthread_mutex_unlock(&sq->lock); + pthread_mutex_lock(&sq->receiver.lock); + check_wake_receive(&sq->receiver); + pthread_mutex_unlock(&sq->receiver.lock); return NULL; } @@ -679,21 +693,24 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id list_init(&sq->upcoming_queues); list_init(&sq->ready_queues); list_init(&sq->sent_queue); - list_init(&sq->receive_queue); + list_init(&sq->receiver.queue); list_init(&sq->notify_queue); list_init(&sq->fast_readers); // Debugging list_init(&sq->old_sent); - list_init(&sq->old_receive); + list_init(&sq->receiver.old_receive); debug_queue_alloc(&sq->old_sent, DEBUG_QUEUE_SENT); - debug_queue_alloc(&sq->old_receive, DEBUG_QUEUE_RECEIVE); + debug_queue_alloc(&sq->receiver.old_receive, DEBUG_QUEUE_RECEIVE); // Thread setup ret = pthread_mutex_init(&sq->lock, NULL); if (ret) goto fail; - ret = pthread_cond_init(&sq->cond, NULL); + ret = pthread_mutex_init(&sq->receiver.lock, NULL); + if (ret) + goto fail; + ret = pthread_cond_init(&sq->receiver.cond, NULL); if (ret) goto fail; ret = pthread_mutex_init(&sq->fast_reader_dispatch_lock, NULL); @@ -731,10 +748,12 @@ serialqueue_free(struct serialqueue *sq) serialqueue_exit(sq); pthread_mutex_lock(&sq->lock); message_queue_free(&sq->sent_queue); - message_queue_free(&sq->receive_queue); + pthread_mutex_lock(&sq->receiver.lock); + message_queue_free(&sq->receiver.queue); + message_queue_free(&sq->receiver.old_receive); + pthread_mutex_unlock(&sq->receiver.lock); message_queue_free(&sq->notify_queue); message_queue_free(&sq->old_sent); - message_queue_free(&sq->old_receive); while (!list_empty(&sq->ready_queues)) { struct command_queue* cq = list_first_entry( &sq->ready_queues, struct command_queue, ready.node); @@ -864,20 +883,21 @@ serialqueue_send(struct serialqueue *sq, struct command_queue *cq, uint8_t *msg void __visible serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) { - pthread_mutex_lock(&sq->lock); + struct receiver *receiver = &sq->receiver; + pthread_mutex_lock(&receiver->lock); // Wait for message to be available - while (list_empty(&sq->receive_queue)) { + while (list_empty(&receiver->queue)) { if (pollreactor_is_exit(sq->pr)) goto exit; - sq->receive_waiting = 1; - int ret = pthread_cond_wait(&sq->cond, &sq->lock); + receiver->waiting = 1; + int ret = pthread_cond_wait(&receiver->cond, &receiver->lock); if (ret) report_errno("pthread_cond_wait", ret); } // Remove message from queue struct queue_message *qm = list_first_entry( - &sq->receive_queue, struct queue_message, node); + &receiver->queue, struct queue_message, node); list_del(&qm->node); // Copy message @@ -887,16 +907,14 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) pqm->receive_time = qm->receive_time; pqm->notify_id = qm->notify_id; if (qm->len) - debug_queue_add(&sq->old_receive, qm); - else - message_free(qm); - - pthread_mutex_unlock(&sq->lock); + qm = _debug_queue_add(&receiver->old_receive, qm); + pthread_mutex_unlock(&receiver->lock); + message_free(qm); return; exit: pqm->len = -1; - pthread_mutex_unlock(&sq->lock); + pthread_mutex_unlock(&receiver->lock); } void __visible @@ -969,18 +987,27 @@ serialqueue_extract_old(struct serialqueue *sq, int sentq , struct pull_queue_message *q, int max) { int count = sentq ? DEBUG_QUEUE_SENT : DEBUG_QUEUE_RECEIVE; - struct list_head *rootp = sentq ? &sq->old_sent : &sq->old_receive; + struct list_head *rootp; + rootp = sentq ? &sq->old_sent : &sq->receiver.old_receive; struct list_head replacement, current; list_init(&replacement); debug_queue_alloc(&replacement, count); list_init(¤t); // Atomically replace existing debug list with new zero'd list - pthread_mutex_lock(&sq->lock); - list_join_tail(rootp, ¤t); - list_init(rootp); - list_join_tail(&replacement, rootp); - pthread_mutex_unlock(&sq->lock); + if (rootp == &sq->receiver.old_receive) { + pthread_mutex_lock(&sq->receiver.lock); + list_join_tail(rootp, ¤t); + list_init(rootp); + list_join_tail(&replacement, rootp); + pthread_mutex_unlock(&sq->receiver.lock); + } else { + pthread_mutex_lock(&sq->lock); + list_join_tail(rootp, ¤t); + list_init(rootp); + list_join_tail(&replacement, rootp); + pthread_mutex_unlock(&sq->lock); + } // Walk the debug list int pos = 0;