reorganize code, add support for display touch entity

main
Hendrik Sokolowski 2024-03-22 15:42:47 +01:00
parent ffad3ae3dd
commit 8c83eefcaa
4 changed files with 296 additions and 170 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
/tags /tags
/.direnv /.direnv
__pycache__

View File

@ -14,6 +14,7 @@
src = ./src; src = ./src;
propagatedBuildInputs = with pkgs.python311Packages; [ propagatedBuildInputs = with pkgs.python311Packages; [
pkgs.brightnessctl pkgs.brightnessctl
pkgs.libinput
coloredlogs coloredlogs
configargparse configargparse
@ -25,6 +26,9 @@
devShells.default = pkgs.mkShell { devShells.default = pkgs.mkShell {
name = "development shell"; name = "development shell";
nativeBuildInputs = with pkgs.python311Packages; [ nativeBuildInputs = with pkgs.python311Packages; [
pkgs.brightnessctl
pkgs.libinput
coloredlogs coloredlogs
configargparse configargparse
ha-mqtt-discoverable ha-mqtt-discoverable

View File

@ -1,199 +1,100 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import asyncio
import coloredlogs import coloredlogs
import logging import logging
import os import os
import psutil
import socket
import subprocess import subprocess
import time import time
from ha_mqtt_discoverable import Settings, DeviceInfo import ha_mqtt_discoverable
from ha_mqtt_discoverable.sensors import BinarySensor, BinarySensorInfo, Number, NumberInfo, Sensor, SensorInfo import sensors
from paho.mqtt.client import Client, MQTTMessage
coloredlogs.install() class InputListener():
logger = logging.getLogger("HA kiosk agent") last_input = 0
logger.setLevel(logging.INFO)
def get_ip_address() -> str: def get_last_input(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return self.last_input
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
def get_temperature() -> float: async def read_stdout(self, logger, stdout):
temp = psutil.sensors_temperatures() logger.info('reading stdout')
if not temp or not temp['acpitz']: while True:
return -1 buf = await stdout.readline()
if not buf:
break
return ("{:.1f}".format(temp['acpitz'][0].current)) now = time.time()
if self.last_input + 5 <= now:
logger.info('display input received')
self.last_input = now
def get_battery_level() -> float: async def run(self, logger, device: str):
blevel = psutil.sensors_battery() logger.info('starting display event listener loop')
if not blevel: proc = await asyncio.create_subprocess_exec(
return -1 'libinput', 'record', device,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL)
return ("{:.0f}".format(blevel.percent), blevel.power_plugged) asyncio.gather(self.read_stdout(logger, proc.stdout))
def get_display_brightness() -> float: async def sensor_loop(logger, sensor_entities):
res = subprocess.run(['brightnessctl', '-m', 'info'], capture_output=True) logger.info('starting sensor loop')
if res.returncode != 0: while True:
logger.warn(f'failed to get brightness: exit code { res.returncode }') for sensor in sensor_entities.values():
return -1 sensor.update_value()
stdout = res.stdout.decode("utf-8") await asyncio.sleep(0.1)
brightPerc = stdout.split(',')[3].split('%')[0]
return float(brightPerc)
def set_display_brightness(number: float) -> float: async def main():
res = subprocess.run(['brightnessctl', '-m', 'set', f'{ number }%'], capture_output=True) coloredlogs.install()
if res.returncode != 0: logger = logging.getLogger("HA kiosk agent")
logger.warn(f'failed to set brightness: exit code { res.returncode }') logger.setLevel(logging.INFO)
return -1
stdout = res.stdout.decode("utf-8") parser = argparse.ArgumentParser()
brightPerc = stdout.split(',')[3].split('%')[0] parser.add_argument("--debug", action='store_true', default=False)
return float(brightPerc) parser.add_argument("--device-id", dest='device_id', default=os.environ.get('DEVICE_ID'), required=True)
parser.add_argument("--device-name", dest='device_name', default=os.environ.get('DEVICE_NAME'), required=True)
parser.add_argument("--display-device", dest='display_device', default=os.environ.get('DISPLAY_DEVICE'))
parser.add_argument("--mqtt-host", dest='mqtt_host', default=os.environ.get('MQTT_HOST', 'localhost'))
parser.add_argument("--mqtt-port", dest='mqtt_port', default=os.environ.get('MQTT_PORT', 1883))
parser.add_argument("--mqtt-user", dest='mqtt_user', default=os.environ.get('MQTT_USER'))
parser.add_argument("--mqtt-pass", dest='mqtt_pass', default=os.environ.get('MQTT_PASS'))
# To receive number updates from HA, define a callback function: args = parser.parse_args()
def display_brightness_callback(client: Client, user_data, message: MQTTMessage):
number = int(message.payload.decode())
logging.info(f"received new value {number} for display brightness")
new_brightness = set_display_brightness(number)
display_brightness.set_value(new_brightness)
parser = argparse.ArgumentParser() mqtt_args = {'host': args.mqtt_host}
parser.add_argument("--debug", action='store_true', default=False) if args.debug:
parser.add_argument("--device-id", dest='device_id', default=os.environ.get('DEVICE_ID'), required=True) logger.setLevel(logging.DEBUG)
parser.add_argument("--device-name", dest='device_name', default=os.environ.get('DEVICE_NAME'), required=True) mqtt_args['debug'] = args.debug
parser.add_argument("--mqtt-host", dest='mqtt_host', default=os.environ.get('MQTT_HOST', 'localhost'))
parser.add_argument("--mqtt-port", dest='mqtt_port', default=os.environ.get('MQTT_PORT', 1883))
parser.add_argument("--mqtt-user", dest='mqtt_user', default=os.environ.get('MQTT_USER'))
parser.add_argument("--mqtt-pass", dest='mqtt_pass', default=os.environ.get('MQTT_PASS'))
args = parser.parse_args() if args.mqtt_port:
mqtt_args['port'] = args.mqtt_port
mqtt_args = {'host': args.mqtt_host} if args.mqtt_user:
if args.debug: mqtt_args['username'] = args.mqtt_user
logger.setLevel(logging.DEBUG)
mqtt_args['debug'] = args.debug
if args.mqtt_port: if args.mqtt_pass:
mqtt_args['port'] = args.mqtt_port mqtt_args['password'] = args.mqtt_pass
if args.mqtt_user: mqtt_settings = ha_mqtt_discoverable.Settings.MQTT(**mqtt_args)
mqtt_args['username'] = args.mqtt_user device = sensors.Device(args.device_name, args.device_id, mqtt_settings)
sensor_entities = {
'battery': sensors.BatterySensor(device),
'display_brightness': sensors.DisplayBrightnessSensor(device),
'temperature': sensors.TemperatureSensor(device)
};
if args.mqtt_pass: logger.info('finished setup')
mqtt_args['password'] = args.mqtt_pass
# Configure the required parameters for the MQTT host tasks = [sensor_loop(logger, sensor_entities)]
mqtt_settings = Settings.MQTT(**mqtt_args) if args.display_device:
listener = InputListener()
sensor_entities['display_touched'] = sensors.DisplayTouchedSensor(device, listener)
tasks.append(listener.run(logger, args.display_device))
# Define the device. At least one of `identifiers` or `connections` must be supplied await asyncio.gather(*tasks)
device_info = DeviceInfo(name=args.device_name, identifiers=args.device_id)
# Battery charging if __name__ == "__main__":
battery_charging_info = BinarySensorInfo( asyncio.run(main())
name="Battery charging",
device=device_info,
unique_id=f'{args.device_id}_battery_charging',
device_class='battery_charging')
battery_charging_settings = Settings(mqtt=mqtt_settings, entity=battery_charging_info)
battery_charging = BinarySensor(battery_charging_settings)
# Battery level
battery_level_info = SensorInfo(
name="Battery Level",
device=device_info,
unique_id=f'{args.device_id}_battery_level',
device_class='battery',
unit_of_measurement='%')
battery_level_settings = Settings(mqtt=mqtt_settings, entity=battery_level_info)
battery_level = Sensor(battery_level_settings)
# Display brightness
display_brightness_info = NumberInfo(
name="Display Brightness",
min=0,
max=100,
mode="slider",
device=device_info,
unique_id=f'{args.device_id}_display_brightness',
unit_of_measurement='%')
display_brightness_settings = Settings(mqtt=mqtt_settings, entity=display_brightness_info)
display_brightness = Number(display_brightness_settings, display_brightness_callback)
# IP Address
ip_address_info = SensorInfo(
name="IP Address",
device=device_info,
unique_id=f'{args.device_id}_ip_address')
ip_address_settings = Settings(mqtt=mqtt_settings, entity=ip_address_info)
ip_address = Sensor(ip_address_settings)
ip_address.set_state(get_ip_address())
# Temperature
temperature_info = SensorInfo(
name="Temperature",
device=device_info,
unique_id=f'{args.device_id}_temperature',
device_class='temperature',
unit_of_measurement='°C')
temperature_settings = Settings(mqtt=mqtt_settings, entity=temperature_info)
temperature = Sensor(temperature_settings)
logger.info('finished setup')
lastBattery = get_battery_level()
logger.info(f'current battery level: { lastBattery }')
battery_level.set_state(lastBattery[0])
if lastBattery[1] or lastBattery[1] is None:
battery_charging.on()
else:
battery_charging.off()
lastBrightness = get_display_brightness()
logger.info(f'current brightness: { lastBrightness }')
display_brightness.set_value(lastBrightness)
lastTemperature = get_temperature()
logger.info(f'current temperature: { temperature }')
temperature.set_state(lastTemperature)
while True:
newBrightness = get_display_brightness()
if newBrightness != lastBrightness:
logger.info(f'updating value for display brightness to { newBrightness }')
display_brightness.set_value(newBrightness)
lastBrightness = newBrightness
newBattery = get_battery_level()
if newBattery[0] != lastBattery[0]:
logger.info(f'updating value for battery level to { newBattery[0] }')
battery_level.set_state(newBattery[0])
if newBattery[1] != lastBattery[1]:
logger.info(f'updating value for battery charger plugged to { newBattery[1] }')
if newBattery[1] or newBattery[1] is None:
battery_charging.on()
else:
battery_charging.off()
lastBattery = newBattery
newTemperature = get_temperature()
if newTemperature != lastTemperature:
logger.info(f'updating value for temperature to { newTemperature }')
temperature.set_state(newTemperature)
lastTemperature = newTemperature
time.sleep(5)

219
src/sensors.py Normal file
View File

@ -0,0 +1,219 @@
import logging
import psutil
import socket
import subprocess
import time
import ha_mqtt_discoverable
import ha_mqtt_discoverable.sensors
from paho.mqtt.client import Client, MQTTMessage
from enum import Enum
class Device:
name = ""
device_id = ""
mqtt_settings = None
mqtt_device = None
def __init__(self, name, device_id, mqtt_settings: ha_mqtt_discoverable.Settings.MQTT):
self.name = name
self.device_id = device_id
self.mqtt_device = ha_mqtt_discoverable.DeviceInfo(name=name, identifiers=device_id)
self.mqtt_settings = mqtt_settings
def get_device_id(self):
return self.device_id
def get_mqtt_device(self):
return self.mqtt_device
def get_mqtt_settings(self):
return self.mqtt_settings
class Sensor:
def __init__(self, device: Device):
pass
def update_value(self):
pass
class BatterySensor(Sensor):
charging_status = False
level = -1
charging_entity = None
level_entity = None
def __init__(self, device: Device):
# Battery charging
battery_charging_info = ha_mqtt_discoverable.sensors.BinarySensorInfo(
name="Battery charging",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_battery_charging',
device_class='battery_charging')
battery_charging_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=battery_charging_info)
self.charging_entity = ha_mqtt_discoverable.sensors.BinarySensor(battery_charging_settings)
# Battery level
battery_level_info = ha_mqtt_discoverable.sensors.SensorInfo(
name="Battery Level",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_battery_level',
device_class='battery',
unit_of_measurement='%')
battery_level_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=battery_level_info)
self.level_entity = ha_mqtt_discoverable.sensors.Sensor(battery_level_settings)
def update_value(self):
blevel = psutil.sensors_battery()
if not blevel:
logger.warn(f'failed to get battery state')
return
if self.charging_status != blevel.power_plugged:
if blevel.power_plugged:
self.charging_entity.on()
else:
self.charging_entity.off()
newLevel = "{:.0f}".format(blevel.percent)
if self.level != newLevel:
self.level_entity.set_state(newLevel)
self.charging_status = blevel.power_plugged
self.level = newLevel
class DisplayBrightnessSensor(Sensor):
value = -1
mqtt_sensor = None
def __init__(self, device: Device):
display_brightness_info = ha_mqtt_discoverable.sensors.NumberInfo(
name="Display Brightness",
min=0,
max=100,
mode="slider",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_display_brightness',
unit_of_measurement='%')
display_brightness_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=display_brightness_info)
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Number(display_brightness_settings, self.display_brightness_callback)
def update_value(self):
res = subprocess.run(['brightnessctl', '-m', 'info'], capture_output=True)
if res.returncode != 0:
logger.warn(f'failed to get brightness: exit code { res.returncode }')
return
stdout = res.stdout.decode("utf-8")
brightPerc = float(stdout.split(',')[3].split('%')[0])
if brightPerc != self.value:
self.mqtt_sensor.set_value(brightPerc)
self.value = brightPerc
def set_display_brightness(self, number: float) -> float:
res = subprocess.run(['brightnessctl', '-m', 'set', f'{ number }%'], capture_output=True)
if res.returncode != 0:
logger.warn(f'failed to set brightness: exit code { res.returncode }')
return -1
stdout = res.stdout.decode("utf-8")
brightPerc = float(stdout.split(',')[3].split('%')[0])
return brightPerc
# To receive number updates from HA, define a callback function:
def display_brightness_callback(self, client: Client, user_data, message: MQTTMessage):
number = int(message.payload.decode())
logging.info(f"received new value {number} for display brightness")
new_brightness = self.set_display_brightness(number)
self.mqtt_sensor.set_value(new_brightness)
class DisplayTouchedSensor(Sensor):
device = None
listener = None
touched_entity = None
value = False
def __init__(self, device: Device, listener):
self.device = device
self.listener = listener
display_touched_info = ha_mqtt_discoverable.sensors.BinarySensorInfo(
name="Display touched",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_display_touched')
display_touched_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=display_touched_info)
self.touched_entity = ha_mqtt_discoverable.sensors.BinarySensor(display_touched_settings)
def update_value(self):
newValue = self.listener.get_last_input() + 5 >= time.time()
if newValue != self.value:
if newValue:
self.touched_entity.on()
else:
self.touched_entity.off()
self.value = newValue
class IPAddressSensor(Sensor):
value = ""
mqtt_sensor = None
def __init__(self, device: Device):
ip_address_info = ha_mqtt_discoverable.sensors.SensorInfo(
name="IP Address",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_ip_address')
ip_address_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=ip_address_info)
ip_address = ha_mqtt_discoverable.sensors.Sensor(ip_address_settings)
def update_value(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
if IP != self.value:
self.mqtt_sensor.set_state(IP)
self.value = IP
class TemperatureSensor(Sensor):
value = -1
mqtt_sensor = None
def __init__(self, device: Device):
temperature_info = ha_mqtt_discoverable.sensors.SensorInfo(
name="Temperature",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_temperature',
device_class='temperature',
unit_of_measurement='°C')
temperature_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=temperature_info)
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Sensor(temperature_settings)
def update_value(self):
temp = psutil.sensors_temperatures()
if not temp or not temp['acpitz']:
logger.warn(f'failed to get temperature')
return
newValue = "{:.1f}".format(temp['acpitz'][0].current)
if newValue != self.value:
self.mqtt_sensor.set_state(newValue)
self.value = newValue