add switch entity to toggle always on mode

main
Hendrik Sokolowski 2024-03-25 19:03:35 +01:00
parent 0fe090ae12
commit da5c921f80
1 changed files with 73 additions and 26 deletions

View File

@ -9,23 +9,58 @@ import os
import subprocess
import time
from paho.mqtt.client import Client, MQTTMessage
import ha_mqtt_discoverable
import sensors
class BrightnessMode(enum.Enum):
MODE_AUTO_OFF = 0
MODE_AUTO_DIM = 1
MODE_AUTO_BRIGHT = 2
MODE_MAN = 0
MODE_AUTO = 1
class InputListener():
class BrightnessLevel(enum.Enum):
LEVEL_OFF = 0
LEVEL_DIM = 1
LEVEL_BRIGHT = 2
class DisplayBrightnessManager():
last_input = 0
brightness_mode = BrightnessMode.MODE_AUTO_OFF
brightness_mode = BrightnessMode.MODE_AUTO
brightness_mode_new = BrightnessMode.MODE_AUTO
brightness_level = BrightnessLevel.LEVEL_OFF
mqtt_entity = None
def __init__(self, device):
force_display_on_info = ha_mqtt_discoverable.sensors.SwitchInfo(
name="Force Display on",
device=device.get_mqtt_device(),
unique_id=f'{device.get_device_id()}_force_display_on')
switch_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=force_display_on_info, manual_availability=True)
self.mqtt_entity = ha_mqtt_discoverable.sensors.Switch(switch_settings, self.switch_callback)
self.mqtt_entity.set_availability(True)
def switch_callback(self, client: Client, user_data, message: MQTTMessage):
payload = message.payload.decode()
if payload == "ON":
self.set_mode(BrightnessMode.MODE_MAN)
self.mqtt_entity.on()
elif payload == "OFF":
self.set_mode(BrightnessMode.MODE_AUTO)
self.mqtt_entity.off()
def get_last_input(self):
return self.last_input
def get_level(self) -> BrightnessLevel:
return self.brightness_level
def get_mode(self) -> BrightnessMode:
return self.brightness_mode
def set_mode(self, mode: BrightnessMode):
self.brightness_mode_new = mode
logging.info(f'brightness mode: {self.brightness_mode}, new mode: {self.brightness_mode_new}')
async def read_stdout(self, logger, stdout):
logger.info('reading stdout')
while True:
buf = await stdout.readline()
if not buf:
@ -33,15 +68,15 @@ class InputListener():
now = time.time()
if self.last_input + 5 <= now:
logger.info('display input received')
continue
self.last_input = now
async def dim_screen(self, to_value):
from_value = 0
if self.brightness_mode == BrightnessMode.MODE_AUTO_DIM:
if self.brightness_level == BrightnessLevel.LEVEL_DIM:
from_value = 5
elif self.brightness_mode == BrightnessMode.MODE_AUTO_BRIGHT:
elif self.brightness_level == BrightnessLevel.LEVEL_BRIGHT:
from_value = 50
step_val=(to_value-from_value)/25
@ -51,24 +86,33 @@ class InputListener():
await asyncio.sleep(0.005)
async def brightness_loop(self, logger):
logger.info('starting brightess loop')
logger.info('starting brightness loop')
while True:
if self.last_input + 30 < time.time():
if self.brightness_mode != BrightnessMode.MODE_AUTO_OFF:
await self.dim_screen(0)
self.brightness_mode = BrightnessMode.MODE_AUTO_OFF
elif self.last_input + 15 < time.time():
if self.brightness_mode != BrightnessMode.MODE_AUTO_DIM:
await self.dim_screen(5)
self.brightness_mode = BrightnessMode.MODE_AUTO_DIM
elif self.brightness_mode != BrightnessMode.MODE_AUTO_BRIGHT:
await self.dim_screen(50)
self.brightness_mode = BrightnessMode.MODE_AUTO_BRIGHT
if self.brightness_mode != self.brightness_mode_new:
if self.brightness_mode_new == BrightnessMode.MODE_MAN:
await self.dim_screen(50)
self.brightness_level = BrightnessLevel.LEVEL_BRIGHT
else:
self.last_input = time.time()
self.brightness_mode = self.brightness_mode_new
if self.brightness_mode != BrightnessMode.MODE_MAN:
if self.last_input + 30 < time.time():
if self.brightness_level != BrightnessLevel.LEVEL_OFF:
await self.dim_screen(0)
self.brightness_level = BrightnessLevel.LEVEL_OFF
elif self.last_input + 15 < time.time():
if self.brightness_level != BrightnessLevel.LEVEL_DIM:
await self.dim_screen(5)
self.brightness_level = BrightnessLevel.LEVEL_DIM
elif self.brightness_level != BrightnessLevel.LEVEL_BRIGHT:
await self.dim_screen(50)
self.brightness_level = BrightnessLevel.LEVEL_BRIGHT
await asyncio.sleep(0.1)
async def run(self, logger, device: str):
logger.info('starting display event listener loop')
logger.info('starting display event dm loop')
proc = await asyncio.create_subprocess_exec(
'libinput', 'record', device,
stdout=asyncio.subprocess.PIPE,
@ -76,6 +120,7 @@ class InputListener():
asyncio.gather(self.read_stdout(logger, proc.stdout), self.brightness_loop(logger))
async def sensor_loop(logger, sensor_entities):
logger.info('starting sensor loop')
while True:
@ -124,13 +169,15 @@ async def main():
'temperature': sensors.TemperatureSensor(device)
};
logger.info('finished setup')
tasks = [sensor_loop(logger, sensor_entities)]
tasks = []
if args.display_device:
listener = InputListener()
tasks.append(listener.run(logger, args.display_device))
dbm = DisplayBrightnessManager(device)
tasks.append(dbm.run(logger, args.display_device))
tasks.append(sensor_loop(logger, sensor_entities))
logger.info('finished setup, starting tasks')
await asyncio.gather(*tasks)
if __name__ == "__main__":