241 lines
8.9 KiB
Python
241 lines
8.9 KiB
Python
import logging
|
|
import psutil
|
|
import re
|
|
import socket
|
|
import subprocess
|
|
import time
|
|
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
|
|
import ha_mqtt_discoverable
|
|
import ha_mqtt_discoverable.sensors
|
|
|
|
from device import Device
|
|
from display import DisplayBrightnessManager
|
|
from paho.mqtt.client import Client, MQTTMessage
|
|
|
|
|
|
class BatterySensor():
|
|
charger_connected = None
|
|
charging_status = None
|
|
level = None
|
|
|
|
charging_entity = None
|
|
connected_entity = None
|
|
level_entity = None
|
|
|
|
def __init__(self, device: Device):
|
|
# Battery charger connected
|
|
charger_connected_info = ha_mqtt_discoverable.sensors.BinarySensorInfo(
|
|
name="Battery charger connected",
|
|
device=device.get_mqtt_device(),
|
|
unique_id=f'{device.get_device_id()}_battery_charger_connected',
|
|
device_class='plug')
|
|
charger_connected_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=charger_connected_info, manual_availability=True)
|
|
self.connected_entity = ha_mqtt_discoverable.sensors.BinarySensor(charger_connected_settings)
|
|
self.connected_entity.set_availability(True)
|
|
|
|
# Battery charging
|
|
battery_charging_info = ha_mqtt_discoverable.sensors.BinarySensorInfo(
|
|
name="Battery charging",
|
|
device=device.get_mqtt_device(),
|
|
unique_id=f'{device.get_device_id()}_battery_charging',
|
|
device_class='battery_charging')
|
|
battery_charging_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=battery_charging_info, manual_availability=True)
|
|
self.charging_entity = ha_mqtt_discoverable.sensors.BinarySensor(battery_charging_settings)
|
|
self.charging_entity.set_availability(True)
|
|
|
|
# Battery level
|
|
battery_level_info = ha_mqtt_discoverable.sensors.SensorInfo(
|
|
name="Battery Level",
|
|
device=device.get_mqtt_device(),
|
|
unique_id=f'{device.get_device_id()}_battery_level',
|
|
device_class='battery',
|
|
unit_of_measurement='%')
|
|
battery_level_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=battery_level_info, manual_availability=True)
|
|
self.level_entity = ha_mqtt_discoverable.sensors.Sensor(battery_level_settings)
|
|
self.level_entity.set_availability(True)
|
|
|
|
def update_value(self):
|
|
blevel = psutil.sensors_battery()
|
|
if not blevel:
|
|
logger.warn(f'failed to get battery state')
|
|
return
|
|
|
|
chc = False
|
|
chs = False
|
|
if blevel.power_plugged == True:
|
|
chc = True
|
|
if blevel.secsleft != psutil.POWER_TIME_UNKNOWN:
|
|
chs = True
|
|
elif blevel.power_plugged is None and blevel.secsleft == psutil.POWER_TIME_UNKNOWN:
|
|
chc = True
|
|
chs = False
|
|
|
|
if self.charger_connected is None or self.charger_connected != chc:
|
|
if chc:
|
|
self.connected_entity.on()
|
|
else:
|
|
self.connected_entity.off()
|
|
|
|
if self.charging_status is None or self.charging_status != chs:
|
|
if chs:
|
|
self.charging_entity.on()
|
|
else:
|
|
self.charging_entity.off()
|
|
|
|
newLevel = "{:.0f}".format(blevel.percent)
|
|
if self.level is None or self.level != newLevel:
|
|
self.level_entity.set_state(newLevel)
|
|
|
|
self.charging_status = chs
|
|
self.charger_connected = chc
|
|
self.level = newLevel
|
|
|
|
|
|
class DisplayBrightnessSensor():
|
|
value = 0
|
|
dbm = None
|
|
mqtt_sensor = None
|
|
|
|
def __init__(self, device: Device, dbm: DisplayBrightnessManager):
|
|
self.dbm = dbm
|
|
|
|
display_brightness_info = ha_mqtt_discoverable.sensors.NumberInfo(
|
|
name="Display Brightness",
|
|
min=0,
|
|
max=100,
|
|
mode="slider",
|
|
device=device.get_mqtt_device(),
|
|
unique_id=f'{device.get_device_id()}_display_brightness',
|
|
unit_of_measurement='%')
|
|
display_brightness_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=display_brightness_info, manual_availability=True)
|
|
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Number(display_brightness_settings, self.display_brightness_callback)
|
|
self.mqtt_sensor.set_availability(True)
|
|
|
|
def update_value(self):
|
|
brightPerc = self.dbm.get_brightness_value()
|
|
if brightPerc != self.value:
|
|
self.mqtt_sensor.set_value(brightPerc)
|
|
|
|
self.value = brightPerc
|
|
|
|
def set_display_brightness(self, number: float):
|
|
self.dbm.set_brightness_value(number)
|
|
|
|
# To receive number updates from HA, define a callback function:
|
|
def display_brightness_callback(self, client: Client, user_data, message: MQTTMessage):
|
|
number = int(message.payload.decode())
|
|
logging.info(f"received new value {number} for display brightness")
|
|
self.set_display_brightness(number)
|
|
self.mqtt_sensor.set_value(number)
|
|
|
|
|
|
class IPAddressSensor():
|
|
value = ""
|
|
mqtt_sensor = None
|
|
|
|
def __init__(self, device: Device):
|
|
ip_address_info = ha_mqtt_discoverable.sensors.SensorInfo(
|
|
name="IP Address",
|
|
device=device.get_mqtt_device(),
|
|
unique_id=f'{device.get_device_id()}_ip_address')
|
|
ip_address_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=ip_address_info, manual_availability=True)
|
|
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Sensor(ip_address_settings)
|
|
self.mqtt_sensor.set_availability(True)
|
|
|
|
def update_value(self):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.settimeout(0)
|
|
try:
|
|
# doesn't even have to be reachable
|
|
s.connect(('10.254.254.254', 1))
|
|
IP = s.getsockname()[0]
|
|
except Exception:
|
|
IP = '127.0.0.1'
|
|
finally:
|
|
s.close()
|
|
|
|
if IP != self.value:
|
|
self.mqtt_sensor.set_state(IP)
|
|
|
|
self.value = IP
|
|
|
|
|
|
class LastBootSensor():
|
|
value = -1
|
|
mqtt_sensor = None
|
|
|
|
def __init__(self, device: Device):
|
|
boot_time_info = ha_mqtt_discoverable.sensors.SensorInfo(
|
|
name="Last boot",
|
|
device=device.get_mqtt_device(),
|
|
unique_id=f'{device.get_device_id()}_last_boot',
|
|
device_class='timestamp')
|
|
boot_time_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=boot_time_info, manual_availability=True)
|
|
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Sensor(boot_time_settings)
|
|
self.mqtt_sensor.set_availability(True)
|
|
self.mqtt_sensor.set_state(datetime.fromtimestamp(psutil.boot_time()).astimezone().isoformat())
|
|
|
|
def update_value(self):
|
|
pass
|
|
|
|
|
|
class TemperatureSensor():
|
|
value = -1
|
|
mqtt_sensor = None
|
|
|
|
def __init__(self, device: Device):
|
|
temperature_info = ha_mqtt_discoverable.sensors.SensorInfo(
|
|
name="Temperature",
|
|
device=device.get_mqtt_device(),
|
|
unique_id=f'{device.get_device_id()}_temperature',
|
|
device_class='temperature',
|
|
unit_of_measurement='°C')
|
|
temperature_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=temperature_info, manual_availability=True)
|
|
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Sensor(temperature_settings)
|
|
self.mqtt_sensor.set_availability(True)
|
|
|
|
def update_value(self):
|
|
temp = psutil.sensors_temperatures()
|
|
if not temp or not temp['acpitz']:
|
|
logger.warn(f'failed to get temperature')
|
|
return
|
|
|
|
newValue = "{:.1f}".format(temp['acpitz'][0].current)
|
|
if newValue != self.value:
|
|
self.mqtt_sensor.set_state(newValue)
|
|
|
|
self.value = newValue
|
|
|
|
class WifiSignalStrengthSensor():
|
|
value = -1
|
|
mqtt_sensor = None
|
|
|
|
def __init__(self, device: Device):
|
|
wifi_strength_info = ha_mqtt_discoverable.sensors.SensorInfo(
|
|
name="Wifi Signal Strength",
|
|
device=device.get_mqtt_device(),
|
|
unique_id=f'{device.get_device_id()}_wifi_signal_strength',
|
|
device_class='signal_strength',
|
|
unit_of_measurement='dBm')
|
|
wifi_strength_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=wifi_strength_info, manual_availability=True)
|
|
self.mqtt_sensor = ha_mqtt_discoverable.sensors.Sensor(wifi_strength_settings)
|
|
self.update_value()
|
|
|
|
def update_value(self):
|
|
res = subprocess.run(['iwconfig'], capture_output=True)
|
|
m = re.findall('Signal level=(-[0-9]+) dBm', res.stdout.decode('utf-8'))
|
|
if len(m) == 0:
|
|
self.mqtt_sensor.set_availability(False)
|
|
return
|
|
else:
|
|
self.mqtt_sensor.set_availability(True)
|
|
|
|
newValue = float(m[0])
|
|
if newValue != self.value:
|
|
self.mqtt_sensor.set_state(newValue)
|
|
|
|
self.value = newValue
|