dotfiles/curacao/desk/desk_mqtt.py
2025-05-09 00:28:33 +02:00

397 lines
13 KiB
Python

import logging
import struct
import time
import typing
import ha_mqtt_discoverable
import ha_mqtt_discoverable.sensors
import paho.mqtt.client
import usb.core
import usb.util
class Desk:
"""
Controls my Linak desk.
It is a CBD4P controller connected via USB2LIN06.
This particular combination doesn't seem to report desk height,
so it is estimated from the physical controller that does work.
"""
# Source of data:
# https://github.com/UrbanskiDawid/usb2lin06-HID-in-linux-for-LINAK-Desk-Control-Cable
# https://github.com/monofox/python-linak-desk-control
# https://github.com/gryf/linak-ctrl
# Desk Control Basic Software
# https://www.linak-us.com/products/controls/desk-control-basic-software/
# Says it's connected but doesn't report height and buttons do nothing
# Expected, as manual says it only works with CBD4A or CBD6
# Decompiled with ILSpy (easy), doesn't offer much though
# CBD4+5 Configurator
# https://www.linak.nl/technische-ondersteuning/#/cbd4-cbd6s-configurator
# Connects, and settings can be changed.
# Don't think there's much that would help with our problem.
# Tried to decompile with Ghidra (hard), didn't go super far
VEND = 0x12D3
PROD = 0x0002
# Official apps use HID library, although only managed to barely make
# pyhidapi read manufacturer and product once after device reset
BUF_LEN = 64
MOVE_CMD_REPEAT_INTERVAL = 0.2 # s
STOP_CMD_INTERVAL = 1 # s
MAX_EST_INTERVAL = 10 # s
# Theoritical height values
VALUE_MIN = 0x0000
VALUE_MAX = 0x7FFE
VALUE_DOWN = 0x7FFF
VALUE_UP = 0x8000
VALUE_STOP = 0x8001
# Measured values
VALUE_BOT = 0x0001
VALUE_TOP = 0x1A50
HEIGHT_BOT = 68
HEIGHT_TOP = 135
FULL_RISE_TIME = 17.13 # s
FULL_FALL_TIME = 16.64 # s
# Computed values
HEIGHT_OFFSET = HEIGHT_BOT # cm
HEIGHT_MULT = VALUE_TOP / (HEIGHT_TOP - HEIGHT_BOT) # unit / cm
# Should be 100 in theory (1 unit = 0.1 mm)
FULL_TIME = (FULL_FALL_TIME + FULL_RISE_TIME) / 2 # s
SPEED_MARGIN = 0.9
# Better estimate a bit slower
SPEED = (VALUE_TOP - VALUE_BOT) / FULL_TIME * SPEED_MARGIN # unit / s
HEADER_STRUCT = struct.Struct("<BB")
REPORT_STRUCT = struct.Struct("<HH14xH36x")
def _cm_to_unit(self, height: float) -> int:
return round((height - self.HEIGHT_OFFSET) * self.HEIGHT_MULT)
def _unit_to_cm(self, height: int) -> float:
return height / self.HEIGHT_MULT + self.HEIGHT_OFFSET
def _get(self, typ: int) -> bytes:
# Magic numbers: get class interface, HID get report
raw = self._dev.ctrl_transfer(
0xA1, 0x01, 0x300 + typ, 0, self.BUF_LEN
).tobytes()
self.log.debug("Received %s", raw.hex())
typ, size = self.HEADER_STRUCT.unpack(raw[: self.HEADER_STRUCT.size])
start = self.HEADER_STRUCT.size
end = self.HEADER_STRUCT.size + size
if end >= self.BUF_LEN:
raise OverflowError
return raw[start:end]
# Non-implemented types:
# 1, 7: some kind of stream when the device isn't initialized?
# size reduces the faster you poll, increases when buttons are held
# 9: unknown, always report 0
def _set(self, typ: int, buf: bytes) -> None:
buf = bytes([typ]) + buf
# The official apps pad, not that it doesn't seem to work without
buf = buf + b"\x00" * (self.BUF_LEN - len(buf))
self.log.debug("Sending %s", buf.hex())
# Magic numbers: set class interface, HID set report
self._dev.ctrl_transfer(0x21, 0x09, 0x300 + typ, 0, buf)
# Non-implemented types:
# Some stuff < 10
def _reset_estimations(self) -> None:
self.est_value: None | int = None
self.est_value_bot = float(self.VALUE_BOT)
self.est_value_top = float(self.VALUE_TOP)
self.last_est: float = 0.0
def _initialize(self) -> None:
"""
Seems to take the USB2LIN06 out of "boot mode".
It is like that after reset.
Permits control and reading the report.
(name according to CBD4 Controller)
"""
buf = bytes([0x04, 0x00, 0xFB])
self._set(3, buf)
time.sleep(0.5)
def __init__(self) -> None:
self.log = logging.getLogger("Desk")
self._dev = usb.core.find(idVendor=Desk.VEND, idProduct=Desk.PROD)
if not self._dev:
msg = f"Device {Desk.VEND}: {Desk.PROD:04d} not found!"
raise ValueError(msg)
if self._dev.is_kernel_driver_active(0):
self._dev.detach_kernel_driver(0)
self._initialize()
self._reset_estimations()
self.last_destination = None
self.fetch_callback: typing.Callable[[Desk], None] | None = None
def _get_report(self) -> bytes:
return self._get(4)
def _update_estimations(self) -> None:
now = time.time()
delta_s = now - self.last_est
if delta_s > self.MAX_EST_INTERVAL:
# Attempt at fixing the issue of
# the service not working after the night
self._initialize()
self.log.warning(
"Too long without getting a report, "
"assuming the desk might be anywhere now."
)
self._reset_estimations()
else:
delta_u = delta_s * self.SPEED
if self.destination == self.VALUE_STOP:
pass
elif self.destination == self.VALUE_UP:
self.est_value_bot += delta_u
self.est_value_top += delta_u
elif self.destination == self.VALUE_DOWN:
self.est_value_bot -= delta_u
self.est_value_top -= delta_u
else:
def move_closer(start_val: float) -> float:
if start_val < self.destination:
end_val = start_val + delta_u
return min(end_val, self.destination)
end_val = start_val - delta_u
return max(end_val, self.destination)
self.est_value_bot = move_closer(self.est_value_bot)
self.est_value_top = move_closer(self.est_value_top)
# Clamp
self.est_value_bot = max(self.VALUE_BOT, self.est_value_bot)
self.est_value_top = min(self.VALUE_TOP, self.est_value_top)
if self.est_value_top == self.est_value_bot:
if self.est_value is None:
self.log.info("Height estimation converged")
self.est_value = int(self.est_value_top)
self.last_est = now
def fetch(self) -> None:
for _ in range(3):
try:
raw = self._get_report()
break
except usb.USBError:
self.log.exception("USB issue")
else:
raw = self._get_report()
# Most values are from examining the basic software.
# Never reports anything in practice.
# Destination is from observation.
self.value, unk, self.destination = self.REPORT_STRUCT.unpack(raw)
self.initalized = (unk & 0xF) != 0
if self.destination != self.last_destination:
self.log.info("Destination changed to %04x", self.destination)
self.last_destination = self.destination
self._update_estimations()
if self.fetch_callback is not None:
self.fetch_callback(self)
def _move(self, position: int) -> None:
buf = struct.pack("<H", position) * 4
self._set(5, buf)
def _move_to(self, position: int) -> None:
# Clamp
position = max(self.VALUE_BOT, position)
position = min(self.VALUE_TOP, position)
self.log.info("Start moving to %04x", position)
self.fetch()
while self.est_value != position:
self._move(position)
time.sleep(self.MOVE_CMD_REPEAT_INTERVAL)
self.fetch()
self.stop()
def move_to(self, position: float) -> None:
"""
Move the desk to the given position in cm.
If any button is held during movement, the desk will stop moving,
yet this will think it's still moving, throwing off the estimates.
It's not a bug, it's a safety feature.
Also if you try to make it move when it's already moving,
it's going to keep moving while desyncing.
That one is a bug.
"""
# Would to stop for a while before reversing course, without being able
# to read the actual height it's just too annoying to implement
return self._move_to(self._cm_to_unit(position))
def stop(self) -> None:
self.log.info("Stop moving")
self._move(self.VALUE_STOP)
time.sleep(0.5)
def get_height_bounds(self) -> tuple[float, float]:
return (
self._unit_to_cm(int(self.est_value_bot)),
self._unit_to_cm(int(self.est_value_top)),
)
def get_height(self) -> float | None:
if self.est_value is None:
return None
return self._unit_to_cm(self.est_value)
class App:
NDIGITS = 1
def __init__(self) -> None:
logging.basicConfig()
self.log = logging.getLogger(__name__)
self.desk = Desk()
self.desk.fetch_callback = self.fetch_callback
serial = "000C-34E7"
# Configure the required parameters for the MQTT broker
mqtt_settings = ha_mqtt_discoverable.Settings.MQTT(host="192.168.7.53")
self.target_height: float | None = None
device_info = ha_mqtt_discoverable.DeviceInfo(
name="Desk",
identifiers=["Linak", serial],
manufacturer="Linak",
model="CBD4P",
suggested_area="Desk",
hw_version="77402",
sw_version="1.91",
serial_number=serial,
)
common_opts = {
"device": device_info,
"icon": "mdi:desk",
"unit_of_measurement": "cm",
"device_class": "distance",
"expire_after": 10,
}
# TODO Implement proper availability in hq-mqtt-discoverable
height_info = ha_mqtt_discoverable.sensors.NumberInfo(
name="Height ",
min=self.desk.HEIGHT_BOT,
max=self.desk.HEIGHT_TOP,
mode="slider",
step=10 ** (-self.NDIGITS),
unique_id="desk_height",
**common_opts,
)
height_settings = ha_mqtt_discoverable.Settings(
mqtt=mqtt_settings, entity=height_info
)
self.height = ha_mqtt_discoverable.sensors.Number(
height_settings, self.height_callback
)
height_max_info = ha_mqtt_discoverable.sensors.SensorInfo(
name="Estimated height max",
unique_id="desk_height_max",
entity_category="diagnostic",
**common_opts,
)
height_max_settings = ha_mqtt_discoverable.Settings(
mqtt=mqtt_settings, entity=height_max_info
)
self.height_max = ha_mqtt_discoverable.sensors.Sensor(
height_max_settings
)
height_min_info = ha_mqtt_discoverable.sensors.SensorInfo(
name="Estimated height min",
unique_id="desk_height_min",
entity_category="diagnostic",
**common_opts,
)
height_min_settings = ha_mqtt_discoverable.Settings(
mqtt=mqtt_settings, entity=height_min_info
)
self.height_min = ha_mqtt_discoverable.sensors.Sensor(
height_min_settings
)
self.last_published_state: tuple[float | None, float, float] | None = (
None
)
def fetch_callback(self, desk: Desk) -> None:
hcur = desk.get_height()
hmin, hmax = desk.get_height_bounds()
state = hcur, hmin, hmax
if state == self.last_published_state:
return
self.last_published_state = state
# If none this will set as unknown
# Also readings can be a bit outside the boundaries,
# so this skips verification
if isinstance(hcur, float):
hcur = round(hcur, ndigits=self.NDIGITS)
self.height._update_state(hcur) # noqa: SLF001
self.height_max._update_state( # noqa: SLF001
round(hmax, ndigits=self.NDIGITS),
)
self.height_min._update_state( # noqa: SLF001
round(hmin, ndigits=self.NDIGITS),
)
def height_callback(
self,
_client: paho.mqtt.client.Client,
_user_data: None,
message: paho.mqtt.client.MQTTMessage,
) -> None:
self.target_height = float(message.payload.decode())
self.log.info("Requested height to %.1f", self.target_height)
def run(self) -> None:
interval = 0.2
# Need to be rective to catch
while True:
if self.target_height:
temp_target_height = self.target_height
# Allows queuing of other instructions while moving
self.target_height = None
self.desk.move_to(temp_target_height)
else:
time.sleep(interval)
self.desk.fetch()
if __name__ == "__main__":
app = App()
app.run()