desk: Logging improvements

This commit is contained in:
Geoffrey Frogeye 2024-07-02 15:50:37 +02:00
parent a6becdae70
commit 77ec4574f1
Signed by: geoffrey
GPG key ID: C72403E7F82E6AD8

View file

@ -1,3 +1,4 @@
import logging
import struct
import time
import typing
@ -79,6 +80,7 @@ class Desk:
raw = self._dev.ctrl_transfer(
0xA1, 0x01, 0x300 + typ, 0, self.BUF_LEN
).tobytes()
self.log.debug(f"Received {raw.hex()}")
assert raw[0] == typ
size = raw[1]
end = 2 + size
@ -94,6 +96,7 @@ class Desk:
buf = bytes([typ]) + buf
# The official apps pad, not that it doesn't seem to work without
buf = buf + b"\x00" * (self.BUF_LEN - len(buf))
self.log.debug(f"Sending {buf.hex()}")
# Magic numbers: set class interface, HID set report
self._dev.ctrl_transfer(0x21, 0x09, 0x300 + typ, 0, buf)
# Non-implemented types:
@ -116,6 +119,7 @@ class Desk:
time.sleep(0.5)
def __init__(self) -> None:
self.log = logging.getLogger("Desk")
self._dev = usb.core.find(idVendor=Desk.VEND, idProduct=Desk.PROD)
if not self._dev:
raise ValueError(
@ -127,6 +131,7 @@ class Desk:
self._initialize()
self._reset_estimations()
self.last_destination = None
self.fetch_callback: typing.Callable[["Desk"], None] | None = None
@ -140,6 +145,10 @@ class Desk:
delta_s = now - self.last_est
if delta_s > self.MAX_EST_INTERVAL:
self.log.warning(
"Too long without getting a report, "
"assuming the desk might be anywhere now."
)
self._reset_estimations()
else:
delta_u = delta_s * self.SPEED
@ -170,6 +179,8 @@ class Desk:
self.est_value_top = min(self.VALUE_TOP, self.est_value_top)
if self.est_value_top == self.est_value_bot:
if self.est_value is None:
self.log.info("Height estimation converged")
self.est_value = int(self.est_value_top)
self.last_est = now
@ -180,8 +191,7 @@ class Desk:
raw = self._get_report()
break
except usb.USBError as e:
print(e)
pass
self.log.error(e)
else:
raw = self._get_report()
@ -195,6 +205,10 @@ class Desk:
# From observation. Reliable
self.destination = (struct.unpack("<H", raw[18:20])[0],)[0]
if self.destination != self.last_destination:
self.log.info(f"Destination changed to {self.destination:04x}")
self.last_destination = self.destination
self._update_estimations()
if self.fetch_callback is not None:
self.fetch_callback(self)
@ -208,6 +222,7 @@ class Desk:
position = max(self.VALUE_BOT, position)
position = min(self.VALUE_TOP, position)
self.log.info(f"Start moving to {position:04x}")
self.fetch()
while self.est_value != position:
self._move(position)
@ -229,6 +244,7 @@ class Desk:
return self._move_to(self._cmToUnit(position))
def stop(self) -> None:
self.log.info("Stop moving")
self._move(self.VALUE_STOP)
time.sleep(0.5)
@ -246,13 +262,16 @@ class Desk:
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
desk = Desk()
target_height: float | None = None
serial = "000C-34E7"
# Configure the required parameters for the MQTT broker
mqtt_settings = ha_mqtt_discoverable.Settings.MQTT(host="192.168.7.53")
serial = "000C-34E7"
ndigits = 1
target_height: float | None = None
device_info = ha_mqtt_discoverable.DeviceInfo(
name="Desk",
@ -265,8 +284,6 @@ if __name__ == "__main__":
serial_number=serial,
)
ndigits = 1
common_opts = {
"device": device_info,
"icon": "mdi:desk",
@ -295,9 +312,8 @@ if __name__ == "__main__":
message: paho.mqtt.client.MQTTMessage,
) -> None:
global target_height
number = float(message.payload.decode())
print(f"HA wants to move to: {number}")
target_height = number
target_height = float(message.payload.decode())
log.info(f"Requested height to {target_height:.1f}")
height = ha_mqtt_discoverable.sensors.Number(
height_settings, height_callback
@ -326,6 +342,7 @@ if __name__ == "__main__":
height_min = ha_mqtt_discoverable.sensors.Sensor(height_min_settings)
def fetch_callback(desk: Desk) -> None:
log.debug("Received state, sending")
hcur = desk.get_height()
hmin, hmax = desk.get_height_bounds()
@ -345,11 +362,10 @@ if __name__ == "__main__":
# Need to be rective to catch
while True:
if target_height:
print("Start moving")
temp_target_height = target_height
# Allows queuing of other instructions while moving
target_height = None
desk.move_to(temp_target_height)
print("End moving")
else:
time.sleep(interval)
desk.fetch()