From 8c83eefcaa885f68e052069c638fd1e4d0a27353 Mon Sep 17 00:00:00 2001 From: Hendrik Sokolowski Date: Fri, 22 Mar 2024 15:42:47 +0100 Subject: [PATCH] reorganize code, add support for display touch entity --- .gitignore | 2 + flake.nix | 4 + src/ha-kiosk-agent | 241 +++++++++++++-------------------------------- src/sensors.py | 219 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 296 insertions(+), 170 deletions(-) create mode 100644 src/sensors.py diff --git a/.gitignore b/.gitignore index 37318a7..afb50a7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /tags /.direnv + +__pycache__ diff --git a/flake.nix b/flake.nix index 0e173d4..529a495 100644 --- a/flake.nix +++ b/flake.nix @@ -14,6 +14,7 @@ src = ./src; propagatedBuildInputs = with pkgs.python311Packages; [ pkgs.brightnessctl + pkgs.libinput coloredlogs configargparse @@ -25,6 +26,9 @@ devShells.default = pkgs.mkShell { name = "development shell"; nativeBuildInputs = with pkgs.python311Packages; [ + pkgs.brightnessctl + pkgs.libinput + coloredlogs configargparse ha-mqtt-discoverable diff --git a/src/ha-kiosk-agent b/src/ha-kiosk-agent index 5bc31cd..c919ed0 100755 --- a/src/ha-kiosk-agent +++ b/src/ha-kiosk-agent @@ -1,199 +1,100 @@ #!/usr/bin/env python3 import argparse +import asyncio import coloredlogs import logging import os -import psutil -import socket import subprocess import time -from ha_mqtt_discoverable import Settings, DeviceInfo -from ha_mqtt_discoverable.sensors import BinarySensor, BinarySensorInfo, Number, NumberInfo, Sensor, SensorInfo -from paho.mqtt.client import Client, MQTTMessage +import ha_mqtt_discoverable +import sensors -coloredlogs.install() -logger = logging.getLogger("HA kiosk agent") -logger.setLevel(logging.INFO) +class InputListener(): + last_input = 0 -def get_ip_address() -> str: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.settimeout(0) - try: - # doesn't even have to be reachable - s.connect(('10.254.254.254', 1)) - IP = s.getsockname()[0] - except Exception: - IP = '127.0.0.1' - finally: - s.close() - return IP + def get_last_input(self): + return self.last_input -def get_temperature() -> float: - temp = psutil.sensors_temperatures() - if not temp or not temp['acpitz']: - return -1 + async def read_stdout(self, logger, stdout): + logger.info('reading stdout') + while True: + buf = await stdout.readline() + if not buf: + break - return ("{:.1f}".format(temp['acpitz'][0].current)) + now = time.time() + if self.last_input + 5 <= now: + logger.info('display input received') + + self.last_input = now -def get_battery_level() -> float: - blevel = psutil.sensors_battery() - if not blevel: - return -1 + async def run(self, logger, device: str): + logger.info('starting display event listener loop') + proc = await asyncio.create_subprocess_exec( + 'libinput', 'record', device, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL) - return ("{:.0f}".format(blevel.percent), blevel.power_plugged) + asyncio.gather(self.read_stdout(logger, proc.stdout)) -def get_display_brightness() -> float: - res = subprocess.run(['brightnessctl', '-m', 'info'], capture_output=True) - if res.returncode != 0: - logger.warn(f'failed to get brightness: exit code { res.returncode }') - return -1 +async def sensor_loop(logger, sensor_entities): + logger.info('starting sensor loop') + while True: + for sensor in sensor_entities.values(): + sensor.update_value() - stdout = res.stdout.decode("utf-8") - brightPerc = stdout.split(',')[3].split('%')[0] - return float(brightPerc) + await asyncio.sleep(0.1) -def set_display_brightness(number: float) -> float: - res = subprocess.run(['brightnessctl', '-m', 'set', f'{ number }%'], capture_output=True) - if res.returncode != 0: - logger.warn(f'failed to set brightness: exit code { res.returncode }') - return -1 +async def main(): + coloredlogs.install() + logger = logging.getLogger("HA kiosk agent") + logger.setLevel(logging.INFO) - stdout = res.stdout.decode("utf-8") - brightPerc = stdout.split(',')[3].split('%')[0] - return float(brightPerc) + parser = argparse.ArgumentParser() + parser.add_argument("--debug", action='store_true', default=False) + parser.add_argument("--device-id", dest='device_id', default=os.environ.get('DEVICE_ID'), required=True) + parser.add_argument("--device-name", dest='device_name', default=os.environ.get('DEVICE_NAME'), required=True) + parser.add_argument("--display-device", dest='display_device', default=os.environ.get('DISPLAY_DEVICE')) + parser.add_argument("--mqtt-host", dest='mqtt_host', default=os.environ.get('MQTT_HOST', 'localhost')) + parser.add_argument("--mqtt-port", dest='mqtt_port', default=os.environ.get('MQTT_PORT', 1883)) + parser.add_argument("--mqtt-user", dest='mqtt_user', default=os.environ.get('MQTT_USER')) + parser.add_argument("--mqtt-pass", dest='mqtt_pass', default=os.environ.get('MQTT_PASS')) -# To receive number updates from HA, define a callback function: -def display_brightness_callback(client: Client, user_data, message: MQTTMessage): - number = int(message.payload.decode()) - logging.info(f"received new value {number} for display brightness") - new_brightness = set_display_brightness(number) - display_brightness.set_value(new_brightness) + args = parser.parse_args() -parser = argparse.ArgumentParser() -parser.add_argument("--debug", action='store_true', default=False) -parser.add_argument("--device-id", dest='device_id', default=os.environ.get('DEVICE_ID'), required=True) -parser.add_argument("--device-name", dest='device_name', default=os.environ.get('DEVICE_NAME'), required=True) -parser.add_argument("--mqtt-host", dest='mqtt_host', default=os.environ.get('MQTT_HOST', 'localhost')) -parser.add_argument("--mqtt-port", dest='mqtt_port', default=os.environ.get('MQTT_PORT', 1883)) -parser.add_argument("--mqtt-user", dest='mqtt_user', default=os.environ.get('MQTT_USER')) -parser.add_argument("--mqtt-pass", dest='mqtt_pass', default=os.environ.get('MQTT_PASS')) + mqtt_args = {'host': args.mqtt_host} + if args.debug: + logger.setLevel(logging.DEBUG) + mqtt_args['debug'] = args.debug -args = parser.parse_args() + if args.mqtt_port: + mqtt_args['port'] = args.mqtt_port -mqtt_args = {'host': args.mqtt_host} -if args.debug: - logger.setLevel(logging.DEBUG) - mqtt_args['debug'] = args.debug + if args.mqtt_user: + mqtt_args['username'] = args.mqtt_user -if args.mqtt_port: - mqtt_args['port'] = args.mqtt_port + if args.mqtt_pass: + mqtt_args['password'] = args.mqtt_pass -if args.mqtt_user: - mqtt_args['username'] = args.mqtt_user + mqtt_settings = ha_mqtt_discoverable.Settings.MQTT(**mqtt_args) + device = sensors.Device(args.device_name, args.device_id, mqtt_settings) + sensor_entities = { + 'battery': sensors.BatterySensor(device), + 'display_brightness': sensors.DisplayBrightnessSensor(device), + 'temperature': sensors.TemperatureSensor(device) + }; -if args.mqtt_pass: - mqtt_args['password'] = args.mqtt_pass + logger.info('finished setup') -# Configure the required parameters for the MQTT host -mqtt_settings = Settings.MQTT(**mqtt_args) + tasks = [sensor_loop(logger, sensor_entities)] + if args.display_device: + listener = InputListener() + sensor_entities['display_touched'] = sensors.DisplayTouchedSensor(device, listener) + tasks.append(listener.run(logger, args.display_device)) -# Define the device. At least one of `identifiers` or `connections` must be supplied -device_info = DeviceInfo(name=args.device_name, identifiers=args.device_id) + await asyncio.gather(*tasks) -# Battery charging -battery_charging_info = BinarySensorInfo( - name="Battery charging", - device=device_info, - unique_id=f'{args.device_id}_battery_charging', - device_class='battery_charging') -battery_charging_settings = Settings(mqtt=mqtt_settings, entity=battery_charging_info) -battery_charging = BinarySensor(battery_charging_settings) - -# Battery level -battery_level_info = SensorInfo( - name="Battery Level", - device=device_info, - unique_id=f'{args.device_id}_battery_level', - device_class='battery', - unit_of_measurement='%') -battery_level_settings = Settings(mqtt=mqtt_settings, entity=battery_level_info) -battery_level = Sensor(battery_level_settings) - -# Display brightness -display_brightness_info = NumberInfo( - name="Display Brightness", - min=0, - max=100, - mode="slider", - device=device_info, - unique_id=f'{args.device_id}_display_brightness', - unit_of_measurement='%') -display_brightness_settings = Settings(mqtt=mqtt_settings, entity=display_brightness_info) -display_brightness = Number(display_brightness_settings, display_brightness_callback) - -# IP Address -ip_address_info = SensorInfo( - name="IP Address", - device=device_info, - unique_id=f'{args.device_id}_ip_address') -ip_address_settings = Settings(mqtt=mqtt_settings, entity=ip_address_info) -ip_address = Sensor(ip_address_settings) -ip_address.set_state(get_ip_address()) - -# Temperature -temperature_info = SensorInfo( - name="Temperature", - device=device_info, - unique_id=f'{args.device_id}_temperature', - device_class='temperature', - unit_of_measurement='°C') -temperature_settings = Settings(mqtt=mqtt_settings, entity=temperature_info) -temperature = Sensor(temperature_settings) - -logger.info('finished setup') - -lastBattery = get_battery_level() -logger.info(f'current battery level: { lastBattery }') -battery_level.set_state(lastBattery[0]) -if lastBattery[1] or lastBattery[1] is None: - battery_charging.on() -else: - battery_charging.off() - -lastBrightness = get_display_brightness() -logger.info(f'current brightness: { lastBrightness }') -display_brightness.set_value(lastBrightness) - -lastTemperature = get_temperature() -logger.info(f'current temperature: { temperature }') -temperature.set_state(lastTemperature) - -while True: - newBrightness = get_display_brightness() - if newBrightness != lastBrightness: - logger.info(f'updating value for display brightness to { newBrightness }') - display_brightness.set_value(newBrightness) - lastBrightness = newBrightness - - newBattery = get_battery_level() - if newBattery[0] != lastBattery[0]: - logger.info(f'updating value for battery level to { newBattery[0] }') - battery_level.set_state(newBattery[0]) - - if newBattery[1] != lastBattery[1]: - logger.info(f'updating value for battery charger plugged to { newBattery[1] }') - if newBattery[1] or newBattery[1] is None: - battery_charging.on() - else: - battery_charging.off() - lastBattery = newBattery - - newTemperature = get_temperature() - if newTemperature != lastTemperature: - logger.info(f'updating value for temperature to { newTemperature }') - temperature.set_state(newTemperature) - lastTemperature = newTemperature - - time.sleep(5) +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/sensors.py b/src/sensors.py new file mode 100644 index 0000000..8d73147 --- /dev/null +++ b/src/sensors.py @@ -0,0 +1,219 @@ +import logging +import psutil +import socket +import subprocess +import time + +import ha_mqtt_discoverable +import ha_mqtt_discoverable.sensors +from paho.mqtt.client import Client, MQTTMessage + +from enum import Enum + +class Device: + name = "" + device_id = "" + mqtt_settings = None + mqtt_device = None + + def __init__(self, name, device_id, mqtt_settings: ha_mqtt_discoverable.Settings.MQTT): + self.name = name + self.device_id = device_id + self.mqtt_device = ha_mqtt_discoverable.DeviceInfo(name=name, identifiers=device_id) + self.mqtt_settings = mqtt_settings + + def get_device_id(self): + return self.device_id + + def get_mqtt_device(self): + return self.mqtt_device + + def get_mqtt_settings(self): + return self.mqtt_settings + + +class Sensor: + def __init__(self, device: Device): + pass + + def update_value(self): + pass + + +class BatterySensor(Sensor): + charging_status = False + level = -1 + + charging_entity = None + level_entity = None + + def __init__(self, device: Device): + # Battery charging + battery_charging_info = ha_mqtt_discoverable.sensors.BinarySensorInfo( + name="Battery charging", + device=device.get_mqtt_device(), + unique_id=f'{device.get_device_id()}_battery_charging', + device_class='battery_charging') + battery_charging_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=battery_charging_info) + self.charging_entity = ha_mqtt_discoverable.sensors.BinarySensor(battery_charging_settings) + + # Battery level + battery_level_info = ha_mqtt_discoverable.sensors.SensorInfo( + name="Battery Level", + device=device.get_mqtt_device(), + unique_id=f'{device.get_device_id()}_battery_level', + device_class='battery', + unit_of_measurement='%') + battery_level_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=battery_level_info) + self.level_entity = ha_mqtt_discoverable.sensors.Sensor(battery_level_settings) + + def update_value(self): + blevel = psutil.sensors_battery() + if not blevel: + logger.warn(f'failed to get battery state') + return + + if self.charging_status != blevel.power_plugged: + if blevel.power_plugged: + self.charging_entity.on() + else: + self.charging_entity.off() + + newLevel = "{:.0f}".format(blevel.percent) + if self.level != newLevel: + self.level_entity.set_state(newLevel) + + self.charging_status = blevel.power_plugged + self.level = newLevel + + +class DisplayBrightnessSensor(Sensor): + value = -1 + mqtt_sensor = None + + def __init__(self, device: Device): + display_brightness_info = ha_mqtt_discoverable.sensors.NumberInfo( + name="Display Brightness", + min=0, + max=100, + mode="slider", + device=device.get_mqtt_device(), + unique_id=f'{device.get_device_id()}_display_brightness', + unit_of_measurement='%') + display_brightness_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=display_brightness_info) + self.mqtt_sensor = ha_mqtt_discoverable.sensors.Number(display_brightness_settings, self.display_brightness_callback) + + def update_value(self): + res = subprocess.run(['brightnessctl', '-m', 'info'], capture_output=True) + if res.returncode != 0: + logger.warn(f'failed to get brightness: exit code { res.returncode }') + return + + stdout = res.stdout.decode("utf-8") + brightPerc = float(stdout.split(',')[3].split('%')[0]) + + if brightPerc != self.value: + self.mqtt_sensor.set_value(brightPerc) + + self.value = brightPerc + + def set_display_brightness(self, number: float) -> float: + res = subprocess.run(['brightnessctl', '-m', 'set', f'{ number }%'], capture_output=True) + if res.returncode != 0: + logger.warn(f'failed to set brightness: exit code { res.returncode }') + return -1 + + stdout = res.stdout.decode("utf-8") + brightPerc = float(stdout.split(',')[3].split('%')[0]) + return brightPerc + + # To receive number updates from HA, define a callback function: + def display_brightness_callback(self, client: Client, user_data, message: MQTTMessage): + number = int(message.payload.decode()) + logging.info(f"received new value {number} for display brightness") + new_brightness = self.set_display_brightness(number) + self.mqtt_sensor.set_value(new_brightness) + + +class DisplayTouchedSensor(Sensor): + device = None + listener = None + touched_entity = None + value = False + + def __init__(self, device: Device, listener): + self.device = device + self.listener = listener + + display_touched_info = ha_mqtt_discoverable.sensors.BinarySensorInfo( + name="Display touched", + device=device.get_mqtt_device(), + unique_id=f'{device.get_device_id()}_display_touched') + display_touched_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=display_touched_info) + self.touched_entity = ha_mqtt_discoverable.sensors.BinarySensor(display_touched_settings) + + def update_value(self): + newValue = self.listener.get_last_input() + 5 >= time.time() + if newValue != self.value: + if newValue: + self.touched_entity.on() + else: + self.touched_entity.off() + self.value = newValue + + +class IPAddressSensor(Sensor): + value = "" + mqtt_sensor = None + + def __init__(self, device: Device): + ip_address_info = ha_mqtt_discoverable.sensors.SensorInfo( + name="IP Address", + device=device.get_mqtt_device(), + unique_id=f'{device.get_device_id()}_ip_address') + ip_address_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=ip_address_info) + ip_address = ha_mqtt_discoverable.sensors.Sensor(ip_address_settings) + + def update_value(self): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(0) + try: + # doesn't even have to be reachable + s.connect(('10.254.254.254', 1)) + IP = s.getsockname()[0] + except Exception: + IP = '127.0.0.1' + finally: + s.close() + + if IP != self.value: + self.mqtt_sensor.set_state(IP) + + self.value = IP + + +class TemperatureSensor(Sensor): + value = -1 + mqtt_sensor = None + + def __init__(self, device: Device): + temperature_info = ha_mqtt_discoverable.sensors.SensorInfo( + name="Temperature", + device=device.get_mqtt_device(), + unique_id=f'{device.get_device_id()}_temperature', + device_class='temperature', + unit_of_measurement='°C') + temperature_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=temperature_info) + self.mqtt_sensor = ha_mqtt_discoverable.sensors.Sensor(temperature_settings) + + def update_value(self): + temp = psutil.sensors_temperatures() + if not temp or not temp['acpitz']: + logger.warn(f'failed to get temperature') + return + + newValue = "{:.1f}".format(temp['acpitz'][0].current) + if newValue != self.value: + self.mqtt_sensor.set_state(newValue) + + self.value = newValue