From b1a058deca9852a980443673d6825e790961780c Mon Sep 17 00:00:00 2001 From: Hendrik Sokolowski Date: Sat, 6 Apr 2024 15:44:45 +0200 Subject: [PATCH] split up files, drop Sensor class --- src/device.py | 54 +++++++++++++++ src/display.py | 116 +++++++++++++++++++++++++++++++ src/ha-kiosk-agent | 125 +++------------------------------- src/{sensors.py => sensor.py} | 67 +++--------------- src/setup.py | 6 +- 5 files changed, 192 insertions(+), 176 deletions(-) create mode 100644 src/device.py create mode 100644 src/display.py rename src/{sensors.py => sensor.py} (79%) diff --git a/src/device.py b/src/device.py new file mode 100644 index 0000000..709314e --- /dev/null +++ b/src/device.py @@ -0,0 +1,54 @@ +import logging +import psutil +import socket +import subprocess +import time + +import ha_mqtt_discoverable +import ha_mqtt_discoverable.sensors +from paho.mqtt.client import Client, MQTTMessage + +from enum import Enum + +class Device: + name = "" + device_id = "" + mqtt_settings = None + mqtt_device = None + + def __init__(self, name, device_id, mqtt_settings: ha_mqtt_discoverable.Settings.MQTT): + self.name = name + self.device_id = device_id + self.mqtt_settings = mqtt_settings + + device_manufacturer = self._read_device_manufacturer() + device_model = self._read_device_model() + device_hw_version = self._read_device_hw_version() + + self.mqtt_device = ha_mqtt_discoverable.DeviceInfo( + name=name, + identifiers=device_id, + manufacturer=device_manufacturer, + model=device_model, + hw_version=device_hw_version) + + def _read_device_manufacturer(self) -> str: + with open('/sys/devices/virtual/dmi/id/chassis_vendor') as f: + return f.readline() + + def _read_device_model(self) -> str: + with open('/sys/devices/virtual/dmi/id/product_family') as f: + return f.readline() + + def _read_device_hw_version(self) -> str: + with open('/sys/devices/virtual/dmi/id/product_name') as f: + return f.readline() + + def get_device_id(self): + return self.device_id + + def get_mqtt_device(self): + return self.mqtt_device + + def get_mqtt_settings(self): + return self.mqtt_settings diff --git a/src/display.py b/src/display.py new file mode 100644 index 0000000..bce88d1 --- /dev/null +++ b/src/display.py @@ -0,0 +1,116 @@ +import asyncio +import enum +import logging +import time + +import ha_mqtt_discoverable + +from paho.mqtt.client import Client, MQTTMessage + +class BrightnessMode(enum.Enum): + MODE_MAN = 0 + MODE_AUTO = 1 + +class BrightnessLevel(enum.Enum): + LEVEL_OFF = 0 + LEVEL_DIM = 1 + LEVEL_BRIGHT = 2 + +class DisplayBrightnessManager(): + last_input = 0 + brightness_mode = BrightnessMode.MODE_AUTO + brightness_mode_new = BrightnessMode.MODE_AUTO + brightness_level = BrightnessLevel.LEVEL_OFF + mqtt_entity = None + + def __init__(self, device): + force_display_on_info = ha_mqtt_discoverable.sensors.SwitchInfo( + name="Force Display on", + device=device.get_mqtt_device(), + unique_id=f'{device.get_device_id()}_force_display_on') + switch_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=force_display_on_info, manual_availability=True) + self.mqtt_entity = ha_mqtt_discoverable.sensors.Switch(switch_settings, self.switch_callback) + self.mqtt_entity.set_availability(True) + + def switch_callback(self, client: Client, user_data, message: MQTTMessage): + payload = message.payload.decode() + if payload == "ON": + self.set_mode(BrightnessMode.MODE_MAN) + self.mqtt_entity.on() + elif payload == "OFF": + self.set_mode(BrightnessMode.MODE_AUTO) + self.mqtt_entity.off() + + def get_last_input(self): + return self.last_input + + def get_level(self) -> BrightnessLevel: + return self.brightness_level + + def get_mode(self) -> BrightnessMode: + return self.brightness_mode + + def set_mode(self, mode: BrightnessMode): + self.brightness_mode_new = mode + logging.info(f'brightness mode: {self.brightness_mode}, new mode: {self.brightness_mode_new}') + + async def read_stdout(self, logger, stdout): + while True: + buf = await stdout.readline() + if not buf: + break + + now = time.time() + if self.last_input + 5 <= now: + continue + + self.last_input = now + + async def dim_screen(self, to_value): + from_value = 0 + if self.brightness_level == BrightnessLevel.LEVEL_DIM: + from_value = 5 + elif self.brightness_level == BrightnessLevel.LEVEL_BRIGHT: + from_value = 50 + + step_val=(to_value-from_value)/25 + for i in range(1, 25): + new_val = "{:.0f}".format(from_value + i*step_val) + await asyncio.create_subprocess_exec('brightnessctl', 'set', f'{new_val}%', stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL) + await asyncio.sleep(0.005) + + async def brightness_loop(self, logger): + logger.info('starting brightness loop') + while True: + if self.brightness_mode != self.brightness_mode_new: + if self.brightness_mode_new == BrightnessMode.MODE_MAN: + await self.dim_screen(50) + self.brightness_level = BrightnessLevel.LEVEL_BRIGHT + else: + self.last_input = time.time() + self.brightness_mode = self.brightness_mode_new + + if self.brightness_mode != BrightnessMode.MODE_MAN: + if self.last_input + 30 < time.time(): + if self.brightness_level != BrightnessLevel.LEVEL_OFF: + await self.dim_screen(0) + self.brightness_level = BrightnessLevel.LEVEL_OFF + elif self.last_input + 15 < time.time(): + if self.brightness_level != BrightnessLevel.LEVEL_DIM: + await self.dim_screen(5) + self.brightness_level = BrightnessLevel.LEVEL_DIM + elif self.brightness_level != BrightnessLevel.LEVEL_BRIGHT: + await self.dim_screen(50) + self.brightness_level = BrightnessLevel.LEVEL_BRIGHT + + await asyncio.sleep(0.1) + + async def run(self, logger, device: str): + logger.info('starting display event dm loop') + proc = await asyncio.create_subprocess_exec( + 'libinput', 'record', device, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL) + + asyncio.gather(self.read_stdout(logger, proc.stdout), self.brightness_loop(logger)) + diff --git a/src/ha-kiosk-agent b/src/ha-kiosk-agent index 3bc4859..48f386a 100755 --- a/src/ha-kiosk-agent +++ b/src/ha-kiosk-agent @@ -3,123 +3,14 @@ import argparse import asyncio import coloredlogs -import enum import logging import os -import subprocess -import time -from paho.mqtt.client import Client, MQTTMessage import ha_mqtt_discoverable -import sensors - -class BrightnessMode(enum.Enum): - MODE_MAN = 0 - MODE_AUTO = 1 - -class BrightnessLevel(enum.Enum): - LEVEL_OFF = 0 - LEVEL_DIM = 1 - LEVEL_BRIGHT = 2 - -class DisplayBrightnessManager(): - last_input = 0 - brightness_mode = BrightnessMode.MODE_AUTO - brightness_mode_new = BrightnessMode.MODE_AUTO - brightness_level = BrightnessLevel.LEVEL_OFF - mqtt_entity = None - - def __init__(self, device): - force_display_on_info = ha_mqtt_discoverable.sensors.SwitchInfo( - name="Force Display on", - device=device.get_mqtt_device(), - unique_id=f'{device.get_device_id()}_force_display_on') - switch_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=force_display_on_info, manual_availability=True) - self.mqtt_entity = ha_mqtt_discoverable.sensors.Switch(switch_settings, self.switch_callback) - self.mqtt_entity.set_availability(True) - - def switch_callback(self, client: Client, user_data, message: MQTTMessage): - payload = message.payload.decode() - if payload == "ON": - self.set_mode(BrightnessMode.MODE_MAN) - self.mqtt_entity.on() - elif payload == "OFF": - self.set_mode(BrightnessMode.MODE_AUTO) - self.mqtt_entity.off() - - def get_last_input(self): - return self.last_input - - def get_level(self) -> BrightnessLevel: - return self.brightness_level - - def get_mode(self) -> BrightnessMode: - return self.brightness_mode - - def set_mode(self, mode: BrightnessMode): - self.brightness_mode_new = mode - logging.info(f'brightness mode: {self.brightness_mode}, new mode: {self.brightness_mode_new}') - - async def read_stdout(self, logger, stdout): - while True: - buf = await stdout.readline() - if not buf: - break - - now = time.time() - if self.last_input + 5 <= now: - continue - - self.last_input = now - - async def dim_screen(self, to_value): - from_value = 0 - if self.brightness_level == BrightnessLevel.LEVEL_DIM: - from_value = 5 - elif self.brightness_level == BrightnessLevel.LEVEL_BRIGHT: - from_value = 50 - - step_val=(to_value-from_value)/25 - for i in range(1, 25): - new_val = "{:.0f}".format(from_value + i*step_val) - await asyncio.create_subprocess_exec('brightnessctl', 'set', f'{new_val}%', stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL) - await asyncio.sleep(0.005) - - async def brightness_loop(self, logger): - logger.info('starting brightness loop') - while True: - if self.brightness_mode != self.brightness_mode_new: - if self.brightness_mode_new == BrightnessMode.MODE_MAN: - await self.dim_screen(50) - self.brightness_level = BrightnessLevel.LEVEL_BRIGHT - else: - self.last_input = time.time() - self.brightness_mode = self.brightness_mode_new - - if self.brightness_mode != BrightnessMode.MODE_MAN: - if self.last_input + 30 < time.time(): - if self.brightness_level != BrightnessLevel.LEVEL_OFF: - await self.dim_screen(0) - self.brightness_level = BrightnessLevel.LEVEL_OFF - elif self.last_input + 15 < time.time(): - if self.brightness_level != BrightnessLevel.LEVEL_DIM: - await self.dim_screen(5) - self.brightness_level = BrightnessLevel.LEVEL_DIM - elif self.brightness_level != BrightnessLevel.LEVEL_BRIGHT: - await self.dim_screen(50) - self.brightness_level = BrightnessLevel.LEVEL_BRIGHT - - await asyncio.sleep(0.1) - - async def run(self, logger, device: str): - logger.info('starting display event dm loop') - proc = await asyncio.create_subprocess_exec( - 'libinput', 'record', device, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL) - - asyncio.gather(self.read_stdout(logger, proc.stdout), self.brightness_loop(logger)) +import sensor +from device import Device +from display import DisplayBrightnessManager async def sensor_loop(logger, sensor_entities): logger.info('starting sensor loop') @@ -161,12 +52,12 @@ async def main(): mqtt_args['password'] = args.mqtt_pass mqtt_settings = ha_mqtt_discoverable.Settings.MQTT(**mqtt_args) - device = sensors.Device(args.device_name, args.device_id, mqtt_settings) + device = Device(args.device_name, args.device_id, mqtt_settings) sensor_entities = { - 'battery': sensors.BatterySensor(device), - 'display_brightness': sensors.DisplayBrightnessSensor(device), - 'ip_address': sensors.IPAddressSensor(device), - 'temperature': sensors.TemperatureSensor(device) + 'battery': sensor.BatterySensor(device), + 'display_brightness': sensor.DisplayBrightnessSensor(device), + 'ip_address': sensor.IPAddressSensor(device), + 'temperature': sensor.TemperatureSensor(device) }; diff --git a/src/sensors.py b/src/sensor.py similarity index 79% rename from src/sensors.py rename to src/sensor.py index 1f53a04..4b3156b 100644 --- a/src/sensors.py +++ b/src/sensor.py @@ -4,65 +4,16 @@ import socket import subprocess import time -import ha_mqtt_discoverable -import ha_mqtt_discoverable.sensors -from paho.mqtt.client import Client, MQTTMessage - from enum import Enum -class Device: - name = "" - device_id = "" - mqtt_settings = None - mqtt_device = None +import ha_mqtt_discoverable +import ha_mqtt_discoverable.sensors - def __init__(self, name, device_id, mqtt_settings: ha_mqtt_discoverable.Settings.MQTT): - self.name = name - self.device_id = device_id - self.mqtt_settings = mqtt_settings - - device_manufacturer = self._read_device_manufacturer() - device_model = self._read_device_model() - device_hw_version = self._read_device_hw_version() - - self.mqtt_device = ha_mqtt_discoverable.DeviceInfo( - name=name, - identifiers=device_id, - manufacturer=device_manufacturer, - model=device_model, - hw_version=device_hw_version) - - def _read_device_manufacturer(self) -> str: - with open('/sys/devices/virtual/dmi/id/chassis_vendor') as f: - return f.readline() - - def _read_device_model(self) -> str: - with open('/sys/devices/virtual/dmi/id/product_family') as f: - return f.readline() - - def _read_device_hw_version(self) -> str: - with open('/sys/devices/virtual/dmi/id/product_name') as f: - return f.readline() - - def get_device_id(self): - return self.device_id - - def get_mqtt_device(self): - return self.mqtt_device - - def get_mqtt_settings(self): - return self.mqtt_settings +from device import Device +from paho.mqtt.client import Client, MQTTMessage -class Sensor: - def __init__(self, device: Device): - pass - - def update_value(self): - pass - - -class BatterySensor(Sensor): +class BatterySensor(): charging_status = False level = -1 @@ -106,12 +57,12 @@ class BatterySensor(Sensor): newLevel = "{:.0f}".format(blevel.percent) if self.level != newLevel: self.level_entity.set_state(newLevel) - + self.charging_status = blevel.power_plugged self.level = newLevel -class DisplayBrightnessSensor(Sensor): +class DisplayBrightnessSensor(): value = -1 mqtt_sensor = None @@ -160,7 +111,7 @@ class DisplayBrightnessSensor(Sensor): self.mqtt_sensor.set_value(new_brightness) -class IPAddressSensor(Sensor): +class IPAddressSensor(): value = "" mqtt_sensor = None @@ -191,7 +142,7 @@ class IPAddressSensor(Sensor): self.value = IP -class TemperatureSensor(Sensor): +class TemperatureSensor(): value = -1 mqtt_sensor = None diff --git a/src/setup.py b/src/setup.py index 575e557..3442fb0 100644 --- a/src/setup.py +++ b/src/setup.py @@ -5,6 +5,10 @@ from setuptools import setup, find_packages setup(name='ha-kiosk-agent', version='0.1.0', packages=find_packages(), - py_modules=["sensors"], + py_modules=[ + "device" + "display" + "sensor" + ], scripts=["ha-kiosk-agent"], )