From 7a723bdc1cb5e292ca0cd7b708035a7249b5e9a7 Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Sat, 18 Oct 2025 19:46:23 -0400 Subject: [PATCH] serialqueue: Revert recent serialqueue locking changes This reverts commit aea8d8e0a1f3a8655eab3ff0c51d5d7c7950f553. This reverts commit 493271697f2aff1b54bdae62c27588e2db614161. This reverts commit d7da45e152995e57123983e05f5faa9974e5df7a. There are reports of a regression since making this change. Revert for now until the root cause can be found. Signed-off-by: Kevin O'Connor --- klippy/chelper/serialqueue.c | 291 ++++++++++++++--------------------- 1 file changed, 112 insertions(+), 179 deletions(-) diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c index b6d274a91..914d4c395 100644 --- a/klippy/chelper/serialqueue.c +++ b/klippy/chelper/serialqueue.c @@ -29,44 +29,25 @@ #include "pyhelper.h" // get_monotonic #include "serialqueue.h" // struct queue_message -struct message_sub_queue { - struct list_head msg_queue; - struct list_node node; -}; - struct command_queue { - struct message_sub_queue ready, upcoming; -}; - -struct receiver { - pthread_mutex_t lock; - pthread_cond_t cond; - int waiting; - struct list_head queue; - struct list_head old_receive; -}; - -struct transmit_requests { - int pipe_fds[2]; - pthread_mutex_t lock; // protects variables below - struct list_head upcoming_queues; - int upcoming_bytes; - uint64_t need_kick_clock; + struct list_head upcoming_queue, ready_queue; + struct list_node node; }; struct serialqueue { // Input reading struct pollreactor *pr; int serial_fd, serial_fd_type, client_id; + int pipe_fds[2]; uint8_t input_buf[4096]; uint8_t need_sync; int input_pos; // Threading char name[16]; pthread_t tid; - // SerialHDL reader - struct receiver receiver; pthread_mutex_t lock; // protects variables below + pthread_cond_t cond; + int receive_waiting; // Baud / clock tracking int receive_window; double bittime_adjust, idle_time; @@ -78,16 +59,18 @@ struct serialqueue { struct list_head sent_queue; double srtt, rttvar, rto; // Pending transmission message queues - struct transmit_requests transmit_requests; - struct list_head ready_queues; - int ready_bytes, need_ack_bytes, last_ack_bytes; + struct list_head pending_queues; + int ready_bytes, upcoming_bytes, need_ack_bytes, last_ack_bytes; + uint64_t need_kick_clock; struct list_head notify_queue; double last_write_fail_time; + // Received messages + struct list_head receive_queue; // Fastreader support pthread_mutex_t fast_reader_dispatch_lock; struct list_head fast_readers; // Debugging - struct list_head old_sent; + struct list_head old_sent, old_receive; // Stats uint32_t bytes_write, bytes_read, bytes_retransmit, bytes_invalid; }; @@ -126,30 +109,23 @@ debug_queue_alloc(struct list_head *root, int count) } // Copy a message to a debug queue and free old debug messages -static struct queue_message * -_debug_queue_add(struct list_head *root, struct queue_message *qm) +static void +debug_queue_add(struct list_head *root, struct queue_message *qm) { list_add_tail(&qm->node, root); struct queue_message *old = list_first_entry( root, struct queue_message, node); list_del(&old->node); - return old; -} - -static void -debug_queue_add(struct list_head *root, struct queue_message *qm) -{ - struct queue_message *old = _debug_queue_add(root, qm); message_free(old); } // Wake up the receiver thread if it is waiting static void -check_wake_receive(struct receiver *receiver) +check_wake_receive(struct serialqueue *sq) { - if (receiver->waiting) { - receiver->waiting = 0; - pthread_cond_signal(&receiver->cond); + if (sq->receive_waiting) { + sq->receive_waiting = 0; + pthread_cond_signal(&sq->cond); } } @@ -157,7 +133,7 @@ check_wake_receive(struct receiver *receiver) static void kick_bg_thread(struct serialqueue *sq) { - int ret = write(sq->transmit_requests.pipe_fds[1], ".", 1); + int ret = write(sq->pipe_fds[1], ".", 1); if (ret < 0) report_errno("pipe write", ret); } @@ -263,8 +239,7 @@ handle_message(struct serialqueue *sq, double eventtime, int len) sq->bytes_read += len; // Check for pending messages on notify_queue - struct list_head received; - list_init(&received); + int must_wake = 0; while (!list_empty(&sq->notify_queue)) { struct queue_message *qm = list_first_entry( &sq->notify_queue, struct queue_message, node); @@ -276,7 +251,8 @@ handle_message(struct serialqueue *sq, double eventtime, int len) qm->len = 0; qm->sent_time = sq->last_receive_sent_time; qm->receive_time = eventtime; - list_add_tail(&qm->node, &received); + list_add_tail(&qm->node, &sq->receive_queue); + must_wake = 1; } // Process message @@ -294,14 +270,8 @@ handle_message(struct serialqueue *sq, double eventtime, int len) ? sq->last_receive_sent_time : 0.); qm->receive_time = get_monotonic(); // must be time post read() qm->receive_time -= calculate_bittime(sq, len); - list_add_tail(&qm->node, &received); - } - - if (!list_empty(&received)) { - pthread_mutex_lock(&sq->receiver.lock); - list_join_tail(&received, &sq->receiver.queue); - check_wake_receive(&sq->receiver); - pthread_mutex_unlock(&sq->receiver.lock); + list_add_tail(&qm->node, &sq->receive_queue); + must_wake = 1; } // Check fast readers @@ -313,11 +283,16 @@ handle_message(struct serialqueue *sq, double eventtime, int len) continue; // Release main lock and invoke callback pthread_mutex_lock(&sq->fast_reader_dispatch_lock); + if (must_wake) + check_wake_receive(sq); pthread_mutex_unlock(&sq->lock); fr->func(fr, sq->input_buf, len); pthread_mutex_unlock(&sq->fast_reader_dispatch_lock); return; } + + if (must_wake) + check_wake_receive(sq); pthread_mutex_unlock(&sq->lock); } @@ -376,7 +351,7 @@ static void kick_event(struct serialqueue *sq, double eventtime) { char dummy[4096]; - int ret = read(sq->transmit_requests.pipe_fds[0], dummy, sizeof(dummy)); + int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy)); if (ret < 0) report_errno("pipe read", ret); pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW); @@ -477,21 +452,23 @@ build_and_send_command(struct serialqueue *sq, uint8_t *buf, int pending uint64_t min_clock = MAX_CLOCK; struct command_queue *q, *cq = NULL; struct queue_message *qm = NULL; - list_for_each_entry(q, &sq->ready_queues, ready.node) { - struct queue_message *m = list_first_entry( - &q->ready.msg_queue, struct queue_message, node); - if (m->req_clock < min_clock) { - min_clock = m->req_clock; - cq = q; - qm = m; + list_for_each_entry(q, &sq->pending_queues, node) { + if (!list_empty(&q->ready_queue)) { + struct queue_message *m = list_first_entry( + &q->ready_queue, struct queue_message, node); + if (m->req_clock < min_clock) { + min_clock = m->req_clock; + cq = q; + qm = m; + } } } // Append message to outgoing command if (len + qm->len > MESSAGE_MAX - MESSAGE_TRAILER_SIZE) break; list_del(&qm->node); - if (list_empty(&cq->ready.msg_queue)) - list_del(&cq->ready.node); + if (list_empty(&cq->ready_queue) && list_empty(&cq->upcoming_queue)) + list_del(&cq->node); memcpy(&buf[len], qm->msg, qm->len); len += qm->len; sq->ready_bytes -= qm->len; @@ -553,68 +530,53 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime) idletime += calculate_bittime(sq, pending + MESSAGE_MIN); uint64_t ack_clock = clock_from_time(&sq->ce, idletime); uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK; - struct command_queue *cq, *_ncq; - pthread_mutex_lock(&sq->transmit_requests.lock); - list_for_each_entry_safe(cq, _ncq, &sq->transmit_requests.upcoming_queues, - upcoming.node) { - int not_in_ready_queues = list_empty(&cq->ready.msg_queue); - // Move messages from the upcoming.msg_queue to the ready.msg_queue - while (!list_empty(&cq->upcoming.msg_queue)) { + struct command_queue *cq; + list_for_each_entry(cq, &sq->pending_queues, node) { + // Move messages from the upcoming_queue to the ready_queue + while (!list_empty(&cq->upcoming_queue)) { struct queue_message *qm = list_first_entry( - &cq->upcoming.msg_queue, struct queue_message, node); + &cq->upcoming_queue, struct queue_message, node); if (ack_clock < qm->min_clock) { if (qm->min_clock < min_stalled_clock) min_stalled_clock = qm->min_clock; break; } list_del(&qm->node); - list_add_tail(&qm->node, &cq->ready.msg_queue); - sq->transmit_requests.upcoming_bytes -= qm->len; + list_add_tail(&qm->node, &cq->ready_queue); + sq->upcoming_bytes -= qm->len; sq->ready_bytes += qm->len; } - // Remove cq from the list if it is now empty - if (list_empty(&cq->upcoming.msg_queue)) - list_del(&cq->upcoming.node); - // Add to ready queues - if (not_in_ready_queues && !list_empty(&cq->ready.msg_queue)) - list_add_tail(&cq->ready.node, &sq->ready_queues); - } - // Check if it is still needed to send messages from the ready_queues - list_for_each_entry(cq, &sq->ready_queues, ready.node) { // Update min_ready_clock - struct queue_message *qm = list_first_entry( - &cq->ready.msg_queue, struct queue_message, node); - uint64_t req_clock = qm->req_clock; - double bgtime = pending ? idletime : sq->idle_time; - double bgoffset = MIN_REQTIME_DELTA + MIN_BACKGROUND_DELTA; - if (req_clock == BACKGROUND_PRIORITY_CLOCK) - req_clock = clock_from_time(&sq->ce, bgtime + bgoffset); - if (req_clock < min_ready_clock) - min_ready_clock = req_clock; + if (!list_empty(&cq->ready_queue)) { + struct queue_message *qm = list_first_entry( + &cq->ready_queue, struct queue_message, node); + uint64_t req_clock = qm->req_clock; + double bgtime = pending ? idletime : sq->idle_time; + double bgoffset = MIN_REQTIME_DELTA + MIN_BACKGROUND_DELTA; + if (req_clock == BACKGROUND_PRIORITY_CLOCK) + req_clock = clock_from_time(&sq->ce, bgtime + bgoffset); + if (req_clock < min_ready_clock) + min_ready_clock = req_clock; + } } // Check for messages to send if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX) - goto now; + return PR_NOW; if (! sq->ce.est_freq) { if (sq->ready_bytes) - goto now; - sq->transmit_requests.need_kick_clock = MAX_CLOCK; - pthread_mutex_unlock(&sq->transmit_requests.lock); + return PR_NOW; + sq->need_kick_clock = MAX_CLOCK; return PR_NEVER; } uint64_t reqclock_delta = MIN_REQTIME_DELTA * sq->ce.est_freq; if (min_ready_clock <= ack_clock + reqclock_delta) - goto now; + return PR_NOW; uint64_t wantclock = min_ready_clock - reqclock_delta; if (min_stalled_clock < wantclock) wantclock = min_stalled_clock; - sq->transmit_requests.need_kick_clock = wantclock; - pthread_mutex_unlock(&sq->transmit_requests.lock); + sq->need_kick_clock = wantclock; return idletime + (wantclock - ack_clock) / sq->ce.est_freq; -now: - pthread_mutex_unlock(&sq->transmit_requests.lock); - return PR_NOW; } // Callback timer to send data to the serial port @@ -654,9 +616,9 @@ background_thread(void *data) set_thread_name(sq->name); pollreactor_run(sq->pr); - pthread_mutex_lock(&sq->receiver.lock); - check_wake_receive(&sq->receiver); - pthread_mutex_unlock(&sq->receiver.lock); + pthread_mutex_lock(&sq->lock); + check_wake_receive(sq); + pthread_mutex_unlock(&sq->lock); return NULL; } @@ -674,7 +636,7 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id strncpy(sq->name, name, sizeof(sq->name)); sq->name[sizeof(sq->name)-1] = '\0'; - int ret = pipe(sq->transmit_requests.pipe_fds); + int ret = pipe(sq->pipe_fds); if (ret) goto fail; @@ -682,13 +644,12 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id sq->pr = pollreactor_alloc(SQPF_NUM, SQPT_NUM, sq); pollreactor_add_fd(sq->pr, SQPF_SERIAL, serial_fd, input_event , serial_fd_type==SQT_DEBUGFILE); - pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->transmit_requests.pipe_fds[0] - , kick_event, 0); + pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0); pollreactor_add_timer(sq->pr, SQPT_RETRANSMIT, retransmit_event); pollreactor_add_timer(sq->pr, SQPT_COMMAND, command_event); fd_set_non_blocking(serial_fd); - fd_set_non_blocking(sq->transmit_requests.pipe_fds[0]); - fd_set_non_blocking(sq->transmit_requests.pipe_fds[1]); + fd_set_non_blocking(sq->pipe_fds[0]); + fd_set_non_blocking(sq->pipe_fds[1]); // Retransmit setup sq->send_seq = 1; @@ -702,29 +663,24 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id } // Queues - sq->transmit_requests.need_kick_clock = MAX_CLOCK; - list_init(&sq->transmit_requests.upcoming_queues); - pthread_mutex_init(&sq->transmit_requests.lock, NULL); - list_init(&sq->ready_queues); + sq->need_kick_clock = MAX_CLOCK; + list_init(&sq->pending_queues); list_init(&sq->sent_queue); - list_init(&sq->receiver.queue); + list_init(&sq->receive_queue); list_init(&sq->notify_queue); list_init(&sq->fast_readers); // Debugging list_init(&sq->old_sent); - list_init(&sq->receiver.old_receive); + list_init(&sq->old_receive); debug_queue_alloc(&sq->old_sent, DEBUG_QUEUE_SENT); - debug_queue_alloc(&sq->receiver.old_receive, DEBUG_QUEUE_RECEIVE); + debug_queue_alloc(&sq->old_receive, DEBUG_QUEUE_RECEIVE); // Thread setup ret = pthread_mutex_init(&sq->lock, NULL); if (ret) goto fail; - ret = pthread_mutex_init(&sq->receiver.lock, NULL); - if (ret) - goto fail; - ret = pthread_cond_init(&sq->receiver.cond, NULL); + ret = pthread_cond_init(&sq->cond, NULL); if (ret) goto fail; ret = pthread_mutex_init(&sq->fast_reader_dispatch_lock, NULL); @@ -762,27 +718,17 @@ serialqueue_free(struct serialqueue *sq) serialqueue_exit(sq); pthread_mutex_lock(&sq->lock); message_queue_free(&sq->sent_queue); - pthread_mutex_lock(&sq->receiver.lock); - message_queue_free(&sq->receiver.queue); - message_queue_free(&sq->receiver.old_receive); - pthread_mutex_unlock(&sq->receiver.lock); + message_queue_free(&sq->receive_queue); message_queue_free(&sq->notify_queue); message_queue_free(&sq->old_sent); - while (!list_empty(&sq->ready_queues)) { - struct command_queue* cq = list_first_entry( - &sq->ready_queues, struct command_queue, ready.node); - list_del(&cq->ready.node); - message_queue_free(&cq->ready.msg_queue); - } - pthread_mutex_lock(&sq->transmit_requests.lock); - while (!list_empty(&sq->transmit_requests.upcoming_queues)) { + message_queue_free(&sq->old_receive); + while (!list_empty(&sq->pending_queues)) { struct command_queue *cq = list_first_entry( - &sq->transmit_requests.upcoming_queues, - struct command_queue, upcoming.node); - list_del(&cq->upcoming.node); - message_queue_free(&cq->upcoming.msg_queue); + &sq->pending_queues, struct command_queue, node); + list_del(&cq->node); + message_queue_free(&cq->ready_queue); + message_queue_free(&cq->upcoming_queue); } - pthread_mutex_unlock(&sq->transmit_requests.lock); pthread_mutex_unlock(&sq->lock); pollreactor_free(sq->pr); free(sq); @@ -794,8 +740,8 @@ serialqueue_alloc_commandqueue(void) { struct command_queue *cq = malloc(sizeof(*cq)); memset(cq, 0, sizeof(*cq)); - list_init(&cq->ready.msg_queue); - list_init(&cq->upcoming.msg_queue); + list_init(&cq->ready_queue); + list_init(&cq->upcoming_queue); return cq; } @@ -805,8 +751,7 @@ serialqueue_free_commandqueue(struct command_queue *cq) { if (!cq) return; - if (!list_empty(&cq->ready.msg_queue) || - !list_empty(&cq->upcoming.msg_queue)) { + if (!list_empty(&cq->ready_queue) || !list_empty(&cq->upcoming_queue)) { errorf("Memory leak! Can't free non-empty commandqueue"); return; } @@ -853,19 +798,17 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq qm = list_first_entry(msgs, struct queue_message, node); // Add list to cq->upcoming_queue - pthread_mutex_lock(&sq->transmit_requests.lock); - if (list_empty(&cq->upcoming.msg_queue)) - list_add_tail(&cq->upcoming.node, - &sq->transmit_requests.upcoming_queues); - list_join_tail(msgs, &cq->upcoming.msg_queue); - sq->transmit_requests.upcoming_bytes += len; - + pthread_mutex_lock(&sq->lock); + if (list_empty(&cq->ready_queue) && list_empty(&cq->upcoming_queue)) + list_add_tail(&cq->node, &sq->pending_queues); + list_join_tail(msgs, &cq->upcoming_queue); + sq->upcoming_bytes += len; int mustwake = 0; - if (qm->min_clock < sq->transmit_requests.need_kick_clock) { - sq->transmit_requests.need_kick_clock = 0; + if (qm->min_clock < sq->need_kick_clock) { + sq->need_kick_clock = 0; mustwake = 1; } - pthread_mutex_unlock(&sq->transmit_requests.lock); + pthread_mutex_unlock(&sq->lock); // Wake the background thread if necessary if (mustwake) @@ -902,21 +845,20 @@ serialqueue_send(struct serialqueue *sq, struct command_queue *cq, uint8_t *msg void __visible serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) { - struct receiver *receiver = &sq->receiver; - pthread_mutex_lock(&receiver->lock); + pthread_mutex_lock(&sq->lock); // Wait for message to be available - while (list_empty(&receiver->queue)) { + while (list_empty(&sq->receive_queue)) { if (pollreactor_is_exit(sq->pr)) goto exit; - receiver->waiting = 1; - int ret = pthread_cond_wait(&receiver->cond, &receiver->lock); + sq->receive_waiting = 1; + int ret = pthread_cond_wait(&sq->cond, &sq->lock); if (ret) report_errno("pthread_cond_wait", ret); } // Remove message from queue struct queue_message *qm = list_first_entry( - &receiver->queue, struct queue_message, node); + &sq->receive_queue, struct queue_message, node); list_del(&qm->node); // Copy message @@ -926,14 +868,16 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) pqm->receive_time = qm->receive_time; pqm->notify_id = qm->notify_id; if (qm->len) - qm = _debug_queue_add(&receiver->old_receive, qm); - pthread_mutex_unlock(&receiver->lock); - message_free(qm); + debug_queue_add(&sq->old_receive, qm); + else + message_free(qm); + + pthread_mutex_unlock(&sq->lock); return; exit: pqm->len = -1; - pthread_mutex_unlock(&receiver->lock); + pthread_mutex_unlock(&sq->lock); } void __visible @@ -984,9 +928,7 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len) { struct serialqueue stats; pthread_mutex_lock(&sq->lock); - pthread_mutex_lock(&sq->transmit_requests.lock); memcpy(&stats, sq, sizeof(stats)); - pthread_mutex_unlock(&sq->transmit_requests.lock); pthread_mutex_unlock(&sq->lock); snprintf(buf, len, "bytes_write=%u bytes_read=%u" @@ -999,7 +941,7 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len) , (int)stats.send_seq, (int)stats.receive_seq , (int)stats.retransmit_seq , stats.srtt, stats.rttvar, stats.rto - , stats.ready_bytes, stats.transmit_requests.upcoming_bytes); + , stats.ready_bytes, stats.upcoming_bytes); } // Extract old messages stored in the debug queues @@ -1008,27 +950,18 @@ serialqueue_extract_old(struct serialqueue *sq, int sentq , struct pull_queue_message *q, int max) { int count = sentq ? DEBUG_QUEUE_SENT : DEBUG_QUEUE_RECEIVE; - struct list_head *rootp; - rootp = sentq ? &sq->old_sent : &sq->receiver.old_receive; + struct list_head *rootp = sentq ? &sq->old_sent : &sq->old_receive; struct list_head replacement, current; list_init(&replacement); debug_queue_alloc(&replacement, count); list_init(¤t); // Atomically replace existing debug list with new zero'd list - if (rootp == &sq->receiver.old_receive) { - pthread_mutex_lock(&sq->receiver.lock); - list_join_tail(rootp, ¤t); - list_init(rootp); - list_join_tail(&replacement, rootp); - pthread_mutex_unlock(&sq->receiver.lock); - } else { - pthread_mutex_lock(&sq->lock); - list_join_tail(rootp, ¤t); - list_init(rootp); - list_join_tail(&replacement, rootp); - pthread_mutex_unlock(&sq->lock); - } + pthread_mutex_lock(&sq->lock); + list_join_tail(rootp, ¤t); + list_init(rootp); + list_join_tail(&replacement, rootp); + pthread_mutex_unlock(&sq->lock); // Walk the debug list int pos = 0;