ha-kiosk-agent/src/sensor.py

164 lines
5.9 KiB
Python

import logging
import psutil
import socket
import subprocess
import time
from enum import Enum
import ha_mqtt_discoverable
import ha_mqtt_discoverable.sensors
from device import Device
from paho.mqtt.client import Client, MQTTMessage
class BatterySensor():
charging_status = False
level = -1
charging_entity = None
level_entity = None
def __init__(self, device: Device):
# Battery charging
battery_charging_info = ha_mqtt_discoverable.sensors.BinarySensorInfo(
name="Battery charging",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_battery_charging',
device_class='battery_charging')
battery_charging_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=battery_charging_info, manual_availability=True)
self.charging_entity = ha_mqtt_discoverable.sensors.BinarySensor(battery_charging_settings)
self.charging_entity.set_availability(True)
# Battery level
battery_level_info = ha_mqtt_discoverable.sensors.SensorInfo(
name="Battery Level",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_battery_level',
device_class='battery',
unit_of_measurement='%')
battery_level_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=battery_level_info, manual_availability=True)
self.level_entity = ha_mqtt_discoverable.sensors.Sensor(battery_level_settings)
self.level_entity.set_availability(True)
def update_value(self):
blevel = psutil.sensors_battery()
if not blevel:
logger.warn(f'failed to get battery state')
return
if self.charging_status != blevel.power_plugged:
if blevel.power_plugged:
self.charging_entity.on()
else:
self.charging_entity.off()
newLevel = "{:.0f}".format(blevel.percent)
if self.level != newLevel:
self.level_entity.set_state(newLevel)
self.charging_status = blevel.power_plugged
self.level = newLevel
class DisplayBrightnessSensor():
value = -1
mqtt_sensor = None
def __init__(self, device: Device):
display_brightness_info = ha_mqtt_discoverable.sensors.NumberInfo(
name="Display Brightness",
min=0,
max=100,
mode="slider",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_display_brightness',
unit_of_measurement='%')
display_brightness_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=display_brightness_info, manual_availability=True)
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Number(display_brightness_settings, self.display_brightness_callback)
self.mqtt_sensor.set_availability(True)
def update_value(self):
res = subprocess.run(['light', '-G'], capture_output=True)
if res.returncode != 0:
logger.warn(f'failed to get brightness: exit code { res.returncode }')
return
brightPerc = float(res.stdout.decode("utf-8"))
if brightPerc != self.value:
self.mqtt_sensor.set_value(brightPerc)
self.value = brightPerc
def set_display_brightness(self, number: float):
res = subprocess.run(['light', '-S', f'{ number }%'])
if res.returncode != 0:
logger.warn(f'failed to set brightness: exit code { res.returncode }')
# To receive number updates from HA, define a callback function:
def display_brightness_callback(self, client: Client, user_data, message: MQTTMessage):
number = int(message.payload.decode())
logging.info(f"received new value {number} for display brightness")
self.set_display_brightness(number)
self.mqtt_sensor.set_value(number)
class IPAddressSensor():
value = ""
mqtt_sensor = None
def __init__(self, device: Device):
ip_address_info = ha_mqtt_discoverable.sensors.SensorInfo(
name="IP Address",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_ip_address')
ip_address_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=ip_address_info, manual_availability=True)
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Sensor(ip_address_settings)
self.mqtt_sensor.set_availability(True)
def update_value(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
if IP != self.value:
self.mqtt_sensor.set_state(IP)
self.value = IP
class TemperatureSensor():
value = -1
mqtt_sensor = None
def __init__(self, device: Device):
temperature_info = ha_mqtt_discoverable.sensors.SensorInfo(
name="Temperature",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_temperature',
device_class='temperature',
unit_of_measurement='°C')
temperature_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=temperature_info, manual_availability=True)
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Sensor(temperature_settings)
self.mqtt_sensor.set_availability(True)
def update_value(self):
temp = psutil.sensors_temperatures()
if not temp or not temp['acpitz']:
logger.warn(f'failed to get temperature')
return
newValue = "{:.1f}".format(temp['acpitz'][0].current)
if newValue != self.value:
self.mqtt_sensor.set_state(newValue)
self.value = newValue