use evdev instead of libinput, fix debug logging, fix logic
parent
b1a058deca
commit
66f9be9fd2
|
@ -14,10 +14,10 @@
|
|||
src = ./src;
|
||||
propagatedBuildInputs = with pkgs.python311Packages; [
|
||||
pkgs.brightnessctl
|
||||
pkgs.libinput
|
||||
|
||||
coloredlogs
|
||||
configargparse
|
||||
evdev
|
||||
ha-mqtt-discoverable
|
||||
psutil
|
||||
];
|
||||
|
@ -27,10 +27,10 @@
|
|||
name = "development shell";
|
||||
nativeBuildInputs = with pkgs.python311Packages; [
|
||||
pkgs.brightnessctl
|
||||
pkgs.libinput
|
||||
|
||||
coloredlogs
|
||||
configargparse
|
||||
evdev
|
||||
ha-mqtt-discoverable
|
||||
psutil
|
||||
];
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import asyncio
|
||||
import enum
|
||||
import evdev
|
||||
import logging
|
||||
import time
|
||||
|
||||
|
@ -17,13 +18,19 @@ class BrightnessLevel(enum.Enum):
|
|||
LEVEL_BRIGHT = 2
|
||||
|
||||
class DisplayBrightnessManager():
|
||||
last_input = 0
|
||||
last_input = time.time()
|
||||
brightness_mode = BrightnessMode.MODE_AUTO
|
||||
brightness_mode_new = BrightnessMode.MODE_AUTO
|
||||
brightness_level = BrightnessLevel.LEVEL_OFF
|
||||
brightness_level = BrightnessLevel.LEVEL_BRIGHT
|
||||
mqtt_entity = None
|
||||
|
||||
def __init__(self, device):
|
||||
touch_device = None
|
||||
logger = None
|
||||
|
||||
def __init__(self, device, touch_device, logger: logging.Logger):
|
||||
self.logger = logger
|
||||
self.touch_device = self.get_touch_device(touch_device)
|
||||
|
||||
force_display_on_info = ha_mqtt_discoverable.sensors.SwitchInfo(
|
||||
name="Force Display on",
|
||||
device=device.get_mqtt_device(),
|
||||
|
@ -32,6 +39,19 @@ class DisplayBrightnessManager():
|
|||
self.mqtt_entity = ha_mqtt_discoverable.sensors.Switch(switch_settings, self.switch_callback)
|
||||
self.mqtt_entity.set_availability(True)
|
||||
|
||||
def get_touch_device(self, touch_device: str) -> evdev.InputDevice:
|
||||
if '/dev/input' in touch_device:
|
||||
return evdev.InputDevice(touch_device)
|
||||
|
||||
for dev in evdev.list_devices():
|
||||
d = evdev.InputDevice(dev)
|
||||
if touch_device in d.name:
|
||||
self.logger.info(f'using {dev} as input device')
|
||||
return d
|
||||
|
||||
self.logger.error(f'no input device found matching {touch_device}')
|
||||
return None
|
||||
|
||||
def switch_callback(self, client: Client, user_data, message: MQTTMessage):
|
||||
payload = message.payload.decode()
|
||||
if payload == "ON":
|
||||
|
@ -54,19 +74,15 @@ class DisplayBrightnessManager():
|
|||
self.brightness_mode_new = mode
|
||||
logging.info(f'brightness mode: {self.brightness_mode}, new mode: {self.brightness_mode_new}')
|
||||
|
||||
async def read_stdout(self, logger, stdout):
|
||||
while True:
|
||||
buf = await stdout.readline()
|
||||
if not buf:
|
||||
break
|
||||
|
||||
async def input_device_loop(self):
|
||||
async for ev in self.touch_device.async_read_loop():
|
||||
now = time.time()
|
||||
if self.last_input + 5 <= now:
|
||||
if self.last_input + 5 >= now:
|
||||
continue
|
||||
|
||||
self.last_input = now
|
||||
|
||||
async def dim_screen(self, to_value):
|
||||
self.logger.debug(f'dimming screen to {to_value}')
|
||||
from_value = 0
|
||||
if self.brightness_level == BrightnessLevel.LEVEL_DIM:
|
||||
from_value = 5
|
||||
|
@ -79,10 +95,11 @@ class DisplayBrightnessManager():
|
|||
await asyncio.create_subprocess_exec('brightnessctl', 'set', f'{new_val}%', stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL)
|
||||
await asyncio.sleep(0.005)
|
||||
|
||||
async def brightness_loop(self, logger):
|
||||
logger.info('starting brightness loop')
|
||||
async def brightness_loop(self):
|
||||
self.logger.info('starting brightness loop')
|
||||
while True:
|
||||
if self.brightness_mode != self.brightness_mode_new:
|
||||
self.logger.info(f'brightness mode changede to {self.brightness_mode_new}')
|
||||
if self.brightness_mode_new == BrightnessMode.MODE_MAN:
|
||||
await self.dim_screen(50)
|
||||
self.brightness_level = BrightnessLevel.LEVEL_BRIGHT
|
||||
|
@ -90,7 +107,7 @@ class DisplayBrightnessManager():
|
|||
self.last_input = time.time()
|
||||
self.brightness_mode = self.brightness_mode_new
|
||||
|
||||
if self.brightness_mode != BrightnessMode.MODE_MAN:
|
||||
if self.brightness_mode == BrightnessMode.MODE_AUTO:
|
||||
if self.last_input + 30 < time.time():
|
||||
if self.brightness_level != BrightnessLevel.LEVEL_OFF:
|
||||
await self.dim_screen(0)
|
||||
|
@ -105,12 +122,12 @@ class DisplayBrightnessManager():
|
|||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async def run(self, logger, device: str):
|
||||
logger.info('starting display event dm loop')
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
'libinput', 'record', device,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.DEVNULL)
|
||||
async def run(self):
|
||||
if not self.touch_device:
|
||||
self.logger.warn('exiting display event loop: no touch device set')
|
||||
return
|
||||
|
||||
asyncio.gather(self.read_stdout(logger, proc.stdout), self.brightness_loop(logger))
|
||||
self.logger.info('starting loops')
|
||||
|
||||
asyncio.gather(self.input_device_loop(), self.brightness_loop())
|
||||
|
||||
|
|
|
@ -21,10 +21,6 @@ async def sensor_loop(logger, sensor_entities):
|
|||
await asyncio.sleep(5)
|
||||
|
||||
async def main():
|
||||
coloredlogs.install()
|
||||
logger = logging.getLogger("HA kiosk agent")
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--debug", action='store_true', default=False)
|
||||
parser.add_argument("--device-id", dest='device_id', default=os.environ.get('DEVICE_ID'), required=True)
|
||||
|
@ -39,8 +35,13 @@ async def main():
|
|||
|
||||
mqtt_args = {'host': args.mqtt_host}
|
||||
if args.debug:
|
||||
logger.setLevel(logging.DEBUG)
|
||||
coloredlogs.install(level='DEBUG')
|
||||
logger = logging.getLogger("HA Kiosk Agent")
|
||||
logger.debug('debug')
|
||||
mqtt_args['debug'] = args.debug
|
||||
else:
|
||||
coloredlogs.install()
|
||||
logger = logging.getLogger("HA Kiosk Agent")
|
||||
|
||||
if args.mqtt_port:
|
||||
mqtt_args['port'] = args.mqtt_port
|
||||
|
@ -63,8 +64,8 @@ async def main():
|
|||
|
||||
tasks = []
|
||||
if args.display_device:
|
||||
dbm = DisplayBrightnessManager(device)
|
||||
tasks.append(dbm.run(logger, args.display_device))
|
||||
dbm = DisplayBrightnessManager(device, args.display_device, logger)
|
||||
tasks.append(dbm.run())
|
||||
|
||||
tasks.append(sensor_loop(logger, sensor_entities))
|
||||
|
||||
|
|
Loading…
Reference in New Issue