bulk_sensor: Add new BulkDataQueue class

Move the bulk sample queue collection to a new helper class in
bulk_sensor.py.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2023-12-16 11:30:51 -05:00
parent 978c294741
commit e67cbbe5c1
5 changed files with 41 additions and 65 deletions

View file

@ -3,6 +3,26 @@
# Copyright (C) 2020-2023 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import threading
# Helper class to store incoming messages in a queue
class BulkDataQueue:
def __init__(self, mcu, msg_name, oid):
# Measurement storage (accessed from background thread)
self.lock = threading.Lock()
self.raw_samples = []
# Register callback with mcu
mcu.register_response(self._handle_data, msg_name, oid)
def _handle_data(self, params):
with self.lock:
self.raw_samples.append(params)
def pull_samples(self):
with self.lock:
raw_samples = self.raw_samples
self.raw_samples = []
return raw_samples
def clear_samples(self):
self.pull_samples()
# Helper class for chip clock synchronization via linear regression
class ClockSyncRegression: