mirror of
https://github.com/Klipper3d/klipper.git
synced 2025-07-22 22:24:02 -06:00
bulk_sensor: Rework APIDumpHelper() to BatchBulkHelper()
The APIDumpHelper class is mainly intended to help process messages in batches. Rework the class methods to make that more clear. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
parent
95c753292d
commit
3370134593
6 changed files with 112 additions and 124 deletions
|
@ -5,20 +5,23 @@
|
|||
# This file may be distributed under the terms of the GNU GPLv3 license.
|
||||
import threading
|
||||
|
||||
API_UPDATE_INTERVAL = 0.500
|
||||
BATCH_INTERVAL = 0.500
|
||||
|
||||
# Helper to periodically transmit data to a set of API clients
|
||||
class APIDumpHelper:
|
||||
def __init__(self, printer, data_cb, startstop_cb=None,
|
||||
update_interval=API_UPDATE_INTERVAL):
|
||||
# Helper to process accumulated messages in periodic batches
|
||||
class BatchBulkHelper:
|
||||
def __init__(self, printer, batch_cb, start_cb=None, stop_cb=None,
|
||||
batch_interval=BATCH_INTERVAL):
|
||||
self.printer = printer
|
||||
self.data_cb = data_cb
|
||||
if startstop_cb is None:
|
||||
startstop_cb = (lambda is_start: None)
|
||||
self.startstop_cb = startstop_cb
|
||||
self.batch_cb = batch_cb
|
||||
if start_cb is None:
|
||||
start_cb = (lambda: None)
|
||||
self.start_cb = start_cb
|
||||
if stop_cb is None:
|
||||
stop_cb = (lambda: None)
|
||||
self.stop_cb = stop_cb
|
||||
self.is_started = False
|
||||
self.update_interval = update_interval
|
||||
self.update_timer = None
|
||||
self.batch_interval = batch_interval
|
||||
self.batch_timer = None
|
||||
self.clients = {}
|
||||
self.webhooks_start_resp = {}
|
||||
# Periodic batch processing
|
||||
|
@ -27,40 +30,40 @@ class APIDumpHelper:
|
|||
return
|
||||
self.is_started = True
|
||||
try:
|
||||
self.startstop_cb(True)
|
||||
self.start_cb()
|
||||
except self.printer.command_error as e:
|
||||
logging.exception("API Dump Helper start callback error")
|
||||
logging.exception("BatchBulkHelper start callback error")
|
||||
self.is_started = False
|
||||
self.clients.clear()
|
||||
raise
|
||||
reactor = self.printer.get_reactor()
|
||||
systime = reactor.monotonic()
|
||||
waketime = systime + self.update_interval
|
||||
self.update_timer = reactor.register_timer(self._update, waketime)
|
||||
waketime = systime + self.batch_interval
|
||||
self.batch_timer = reactor.register_timer(self._proc_batch, waketime)
|
||||
def _stop(self):
|
||||
self.clients.clear()
|
||||
self.printer.get_reactor().unregister_timer(self.update_timer)
|
||||
self.update_timer = None
|
||||
self.printer.get_reactor().unregister_timer(self.batch_timer)
|
||||
self.batch_timer = None
|
||||
if not self.is_started:
|
||||
return
|
||||
try:
|
||||
self.startstop_cb(False)
|
||||
self.stop_cb()
|
||||
except self.printer.command_error as e:
|
||||
logging.exception("API Dump Helper stop callback error")
|
||||
logging.exception("BatchBulkHelper stop callback error")
|
||||
self.clients.clear()
|
||||
self.is_started = False
|
||||
if self.clients:
|
||||
# New client started while in process of stopping
|
||||
self._start()
|
||||
def _update(self, eventtime):
|
||||
def _proc_batch(self, eventtime):
|
||||
try:
|
||||
msg = self.data_cb(eventtime)
|
||||
msg = self.batch_cb(eventtime)
|
||||
except self.printer.command_error as e:
|
||||
logging.exception("API Dump Helper data callback error")
|
||||
logging.exception("BatchBulkHelper batch callback error")
|
||||
self._stop()
|
||||
return self.printer.get_reactor().NEVER
|
||||
if not msg:
|
||||
return eventtime + self.update_interval
|
||||
return eventtime + self.batch_interval
|
||||
for cconn, template in list(self.clients.items()):
|
||||
if cconn.is_closed():
|
||||
del self.clients[cconn]
|
||||
|
@ -71,7 +74,7 @@ class APIDumpHelper:
|
|||
tmp = dict(template)
|
||||
tmp['params'] = msg
|
||||
cconn.send(tmp)
|
||||
return eventtime + self.update_interval
|
||||
return eventtime + self.batch_interval
|
||||
# Internal clients
|
||||
def add_internal_client(self):
|
||||
cconn = InternalDumpClient()
|
||||
|
@ -90,7 +93,7 @@ class APIDumpHelper:
|
|||
wh = self.printer.lookup_object('webhooks')
|
||||
wh.register_mux_endpoint(path, key, value, self._add_api_client)
|
||||
|
||||
# An "internal webhooks" wrapper for using APIDumpHelper internally
|
||||
# An "internal webhooks" wrapper for using BatchBulkHelper internally
|
||||
class InternalDumpClient:
|
||||
def __init__(self):
|
||||
self.msgs = []
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue