split up files, drop Sensor class
parent
da5c921f80
commit
b1a058deca
|
@ -0,0 +1,54 @@
|
|||
import logging
|
||||
import psutil
|
||||
import socket
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
import ha_mqtt_discoverable
|
||||
import ha_mqtt_discoverable.sensors
|
||||
from paho.mqtt.client import Client, MQTTMessage
|
||||
|
||||
from enum import Enum
|
||||
|
||||
class Device:
|
||||
name = ""
|
||||
device_id = ""
|
||||
mqtt_settings = None
|
||||
mqtt_device = None
|
||||
|
||||
def __init__(self, name, device_id, mqtt_settings: ha_mqtt_discoverable.Settings.MQTT):
|
||||
self.name = name
|
||||
self.device_id = device_id
|
||||
self.mqtt_settings = mqtt_settings
|
||||
|
||||
device_manufacturer = self._read_device_manufacturer()
|
||||
device_model = self._read_device_model()
|
||||
device_hw_version = self._read_device_hw_version()
|
||||
|
||||
self.mqtt_device = ha_mqtt_discoverable.DeviceInfo(
|
||||
name=name,
|
||||
identifiers=device_id,
|
||||
manufacturer=device_manufacturer,
|
||||
model=device_model,
|
||||
hw_version=device_hw_version)
|
||||
|
||||
def _read_device_manufacturer(self) -> str:
|
||||
with open('/sys/devices/virtual/dmi/id/chassis_vendor') as f:
|
||||
return f.readline()
|
||||
|
||||
def _read_device_model(self) -> str:
|
||||
with open('/sys/devices/virtual/dmi/id/product_family') as f:
|
||||
return f.readline()
|
||||
|
||||
def _read_device_hw_version(self) -> str:
|
||||
with open('/sys/devices/virtual/dmi/id/product_name') as f:
|
||||
return f.readline()
|
||||
|
||||
def get_device_id(self):
|
||||
return self.device_id
|
||||
|
||||
def get_mqtt_device(self):
|
||||
return self.mqtt_device
|
||||
|
||||
def get_mqtt_settings(self):
|
||||
return self.mqtt_settings
|
|
@ -0,0 +1,116 @@
|
|||
import asyncio
|
||||
import enum
|
||||
import logging
|
||||
import time
|
||||
|
||||
import ha_mqtt_discoverable
|
||||
|
||||
from paho.mqtt.client import Client, MQTTMessage
|
||||
|
||||
class BrightnessMode(enum.Enum):
|
||||
MODE_MAN = 0
|
||||
MODE_AUTO = 1
|
||||
|
||||
class BrightnessLevel(enum.Enum):
|
||||
LEVEL_OFF = 0
|
||||
LEVEL_DIM = 1
|
||||
LEVEL_BRIGHT = 2
|
||||
|
||||
class DisplayBrightnessManager():
|
||||
last_input = 0
|
||||
brightness_mode = BrightnessMode.MODE_AUTO
|
||||
brightness_mode_new = BrightnessMode.MODE_AUTO
|
||||
brightness_level = BrightnessLevel.LEVEL_OFF
|
||||
mqtt_entity = None
|
||||
|
||||
def __init__(self, device):
|
||||
force_display_on_info = ha_mqtt_discoverable.sensors.SwitchInfo(
|
||||
name="Force Display on",
|
||||
device=device.get_mqtt_device(),
|
||||
unique_id=f'{device.get_device_id()}_force_display_on')
|
||||
switch_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=force_display_on_info, manual_availability=True)
|
||||
self.mqtt_entity = ha_mqtt_discoverable.sensors.Switch(switch_settings, self.switch_callback)
|
||||
self.mqtt_entity.set_availability(True)
|
||||
|
||||
def switch_callback(self, client: Client, user_data, message: MQTTMessage):
|
||||
payload = message.payload.decode()
|
||||
if payload == "ON":
|
||||
self.set_mode(BrightnessMode.MODE_MAN)
|
||||
self.mqtt_entity.on()
|
||||
elif payload == "OFF":
|
||||
self.set_mode(BrightnessMode.MODE_AUTO)
|
||||
self.mqtt_entity.off()
|
||||
|
||||
def get_last_input(self):
|
||||
return self.last_input
|
||||
|
||||
def get_level(self) -> BrightnessLevel:
|
||||
return self.brightness_level
|
||||
|
||||
def get_mode(self) -> BrightnessMode:
|
||||
return self.brightness_mode
|
||||
|
||||
def set_mode(self, mode: BrightnessMode):
|
||||
self.brightness_mode_new = mode
|
||||
logging.info(f'brightness mode: {self.brightness_mode}, new mode: {self.brightness_mode_new}')
|
||||
|
||||
async def read_stdout(self, logger, stdout):
|
||||
while True:
|
||||
buf = await stdout.readline()
|
||||
if not buf:
|
||||
break
|
||||
|
||||
now = time.time()
|
||||
if self.last_input + 5 <= now:
|
||||
continue
|
||||
|
||||
self.last_input = now
|
||||
|
||||
async def dim_screen(self, to_value):
|
||||
from_value = 0
|
||||
if self.brightness_level == BrightnessLevel.LEVEL_DIM:
|
||||
from_value = 5
|
||||
elif self.brightness_level == BrightnessLevel.LEVEL_BRIGHT:
|
||||
from_value = 50
|
||||
|
||||
step_val=(to_value-from_value)/25
|
||||
for i in range(1, 25):
|
||||
new_val = "{:.0f}".format(from_value + i*step_val)
|
||||
await asyncio.create_subprocess_exec('brightnessctl', 'set', f'{new_val}%', stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL)
|
||||
await asyncio.sleep(0.005)
|
||||
|
||||
async def brightness_loop(self, logger):
|
||||
logger.info('starting brightness loop')
|
||||
while True:
|
||||
if self.brightness_mode != self.brightness_mode_new:
|
||||
if self.brightness_mode_new == BrightnessMode.MODE_MAN:
|
||||
await self.dim_screen(50)
|
||||
self.brightness_level = BrightnessLevel.LEVEL_BRIGHT
|
||||
else:
|
||||
self.last_input = time.time()
|
||||
self.brightness_mode = self.brightness_mode_new
|
||||
|
||||
if self.brightness_mode != BrightnessMode.MODE_MAN:
|
||||
if self.last_input + 30 < time.time():
|
||||
if self.brightness_level != BrightnessLevel.LEVEL_OFF:
|
||||
await self.dim_screen(0)
|
||||
self.brightness_level = BrightnessLevel.LEVEL_OFF
|
||||
elif self.last_input + 15 < time.time():
|
||||
if self.brightness_level != BrightnessLevel.LEVEL_DIM:
|
||||
await self.dim_screen(5)
|
||||
self.brightness_level = BrightnessLevel.LEVEL_DIM
|
||||
elif self.brightness_level != BrightnessLevel.LEVEL_BRIGHT:
|
||||
await self.dim_screen(50)
|
||||
self.brightness_level = BrightnessLevel.LEVEL_BRIGHT
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async def run(self, logger, device: str):
|
||||
logger.info('starting display event dm loop')
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
'libinput', 'record', device,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.DEVNULL)
|
||||
|
||||
asyncio.gather(self.read_stdout(logger, proc.stdout), self.brightness_loop(logger))
|
||||
|
|
@ -3,123 +3,14 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
import coloredlogs
|
||||
import enum
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from paho.mqtt.client import Client, MQTTMessage
|
||||
import ha_mqtt_discoverable
|
||||
import sensors
|
||||
|
||||
class BrightnessMode(enum.Enum):
|
||||
MODE_MAN = 0
|
||||
MODE_AUTO = 1
|
||||
|
||||
class BrightnessLevel(enum.Enum):
|
||||
LEVEL_OFF = 0
|
||||
LEVEL_DIM = 1
|
||||
LEVEL_BRIGHT = 2
|
||||
|
||||
class DisplayBrightnessManager():
|
||||
last_input = 0
|
||||
brightness_mode = BrightnessMode.MODE_AUTO
|
||||
brightness_mode_new = BrightnessMode.MODE_AUTO
|
||||
brightness_level = BrightnessLevel.LEVEL_OFF
|
||||
mqtt_entity = None
|
||||
|
||||
def __init__(self, device):
|
||||
force_display_on_info = ha_mqtt_discoverable.sensors.SwitchInfo(
|
||||
name="Force Display on",
|
||||
device=device.get_mqtt_device(),
|
||||
unique_id=f'{device.get_device_id()}_force_display_on')
|
||||
switch_settings = ha_mqtt_discoverable.Settings(mqtt=device.get_mqtt_settings(), entity=force_display_on_info, manual_availability=True)
|
||||
self.mqtt_entity = ha_mqtt_discoverable.sensors.Switch(switch_settings, self.switch_callback)
|
||||
self.mqtt_entity.set_availability(True)
|
||||
|
||||
def switch_callback(self, client: Client, user_data, message: MQTTMessage):
|
||||
payload = message.payload.decode()
|
||||
if payload == "ON":
|
||||
self.set_mode(BrightnessMode.MODE_MAN)
|
||||
self.mqtt_entity.on()
|
||||
elif payload == "OFF":
|
||||
self.set_mode(BrightnessMode.MODE_AUTO)
|
||||
self.mqtt_entity.off()
|
||||
|
||||
def get_last_input(self):
|
||||
return self.last_input
|
||||
|
||||
def get_level(self) -> BrightnessLevel:
|
||||
return self.brightness_level
|
||||
|
||||
def get_mode(self) -> BrightnessMode:
|
||||
return self.brightness_mode
|
||||
|
||||
def set_mode(self, mode: BrightnessMode):
|
||||
self.brightness_mode_new = mode
|
||||
logging.info(f'brightness mode: {self.brightness_mode}, new mode: {self.brightness_mode_new}')
|
||||
|
||||
async def read_stdout(self, logger, stdout):
|
||||
while True:
|
||||
buf = await stdout.readline()
|
||||
if not buf:
|
||||
break
|
||||
|
||||
now = time.time()
|
||||
if self.last_input + 5 <= now:
|
||||
continue
|
||||
|
||||
self.last_input = now
|
||||
|
||||
async def dim_screen(self, to_value):
|
||||
from_value = 0
|
||||
if self.brightness_level == BrightnessLevel.LEVEL_DIM:
|
||||
from_value = 5
|
||||
elif self.brightness_level == BrightnessLevel.LEVEL_BRIGHT:
|
||||
from_value = 50
|
||||
|
||||
step_val=(to_value-from_value)/25
|
||||
for i in range(1, 25):
|
||||
new_val = "{:.0f}".format(from_value + i*step_val)
|
||||
await asyncio.create_subprocess_exec('brightnessctl', 'set', f'{new_val}%', stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL)
|
||||
await asyncio.sleep(0.005)
|
||||
|
||||
async def brightness_loop(self, logger):
|
||||
logger.info('starting brightness loop')
|
||||
while True:
|
||||
if self.brightness_mode != self.brightness_mode_new:
|
||||
if self.brightness_mode_new == BrightnessMode.MODE_MAN:
|
||||
await self.dim_screen(50)
|
||||
self.brightness_level = BrightnessLevel.LEVEL_BRIGHT
|
||||
else:
|
||||
self.last_input = time.time()
|
||||
self.brightness_mode = self.brightness_mode_new
|
||||
|
||||
if self.brightness_mode != BrightnessMode.MODE_MAN:
|
||||
if self.last_input + 30 < time.time():
|
||||
if self.brightness_level != BrightnessLevel.LEVEL_OFF:
|
||||
await self.dim_screen(0)
|
||||
self.brightness_level = BrightnessLevel.LEVEL_OFF
|
||||
elif self.last_input + 15 < time.time():
|
||||
if self.brightness_level != BrightnessLevel.LEVEL_DIM:
|
||||
await self.dim_screen(5)
|
||||
self.brightness_level = BrightnessLevel.LEVEL_DIM
|
||||
elif self.brightness_level != BrightnessLevel.LEVEL_BRIGHT:
|
||||
await self.dim_screen(50)
|
||||
self.brightness_level = BrightnessLevel.LEVEL_BRIGHT
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async def run(self, logger, device: str):
|
||||
logger.info('starting display event dm loop')
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
'libinput', 'record', device,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.DEVNULL)
|
||||
|
||||
asyncio.gather(self.read_stdout(logger, proc.stdout), self.brightness_loop(logger))
|
||||
|
||||
import sensor
|
||||
from device import Device
|
||||
from display import DisplayBrightnessManager
|
||||
|
||||
async def sensor_loop(logger, sensor_entities):
|
||||
logger.info('starting sensor loop')
|
||||
|
@ -161,12 +52,12 @@ async def main():
|
|||
mqtt_args['password'] = args.mqtt_pass
|
||||
|
||||
mqtt_settings = ha_mqtt_discoverable.Settings.MQTT(**mqtt_args)
|
||||
device = sensors.Device(args.device_name, args.device_id, mqtt_settings)
|
||||
device = Device(args.device_name, args.device_id, mqtt_settings)
|
||||
sensor_entities = {
|
||||
'battery': sensors.BatterySensor(device),
|
||||
'display_brightness': sensors.DisplayBrightnessSensor(device),
|
||||
'ip_address': sensors.IPAddressSensor(device),
|
||||
'temperature': sensors.TemperatureSensor(device)
|
||||
'battery': sensor.BatterySensor(device),
|
||||
'display_brightness': sensor.DisplayBrightnessSensor(device),
|
||||
'ip_address': sensor.IPAddressSensor(device),
|
||||
'temperature': sensor.TemperatureSensor(device)
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -4,65 +4,16 @@ import socket
|
|||
import subprocess
|
||||
import time
|
||||
|
||||
import ha_mqtt_discoverable
|
||||
import ha_mqtt_discoverable.sensors
|
||||
from paho.mqtt.client import Client, MQTTMessage
|
||||
|
||||
from enum import Enum
|
||||
|
||||
class Device:
|
||||
name = ""
|
||||
device_id = ""
|
||||
mqtt_settings = None
|
||||
mqtt_device = None
|
||||
import ha_mqtt_discoverable
|
||||
import ha_mqtt_discoverable.sensors
|
||||
|
||||
def __init__(self, name, device_id, mqtt_settings: ha_mqtt_discoverable.Settings.MQTT):
|
||||
self.name = name
|
||||
self.device_id = device_id
|
||||
self.mqtt_settings = mqtt_settings
|
||||
|
||||
device_manufacturer = self._read_device_manufacturer()
|
||||
device_model = self._read_device_model()
|
||||
device_hw_version = self._read_device_hw_version()
|
||||
|
||||
self.mqtt_device = ha_mqtt_discoverable.DeviceInfo(
|
||||
name=name,
|
||||
identifiers=device_id,
|
||||
manufacturer=device_manufacturer,
|
||||
model=device_model,
|
||||
hw_version=device_hw_version)
|
||||
|
||||
def _read_device_manufacturer(self) -> str:
|
||||
with open('/sys/devices/virtual/dmi/id/chassis_vendor') as f:
|
||||
return f.readline()
|
||||
|
||||
def _read_device_model(self) -> str:
|
||||
with open('/sys/devices/virtual/dmi/id/product_family') as f:
|
||||
return f.readline()
|
||||
|
||||
def _read_device_hw_version(self) -> str:
|
||||
with open('/sys/devices/virtual/dmi/id/product_name') as f:
|
||||
return f.readline()
|
||||
|
||||
def get_device_id(self):
|
||||
return self.device_id
|
||||
|
||||
def get_mqtt_device(self):
|
||||
return self.mqtt_device
|
||||
|
||||
def get_mqtt_settings(self):
|
||||
return self.mqtt_settings
|
||||
from device import Device
|
||||
from paho.mqtt.client import Client, MQTTMessage
|
||||
|
||||
|
||||
class Sensor:
|
||||
def __init__(self, device: Device):
|
||||
pass
|
||||
|
||||
def update_value(self):
|
||||
pass
|
||||
|
||||
|
||||
class BatterySensor(Sensor):
|
||||
class BatterySensor():
|
||||
charging_status = False
|
||||
level = -1
|
||||
|
||||
|
@ -106,12 +57,12 @@ class BatterySensor(Sensor):
|
|||
newLevel = "{:.0f}".format(blevel.percent)
|
||||
if self.level != newLevel:
|
||||
self.level_entity.set_state(newLevel)
|
||||
|
||||
|
||||
self.charging_status = blevel.power_plugged
|
||||
self.level = newLevel
|
||||
|
||||
|
||||
class DisplayBrightnessSensor(Sensor):
|
||||
class DisplayBrightnessSensor():
|
||||
value = -1
|
||||
mqtt_sensor = None
|
||||
|
||||
|
@ -160,7 +111,7 @@ class DisplayBrightnessSensor(Sensor):
|
|||
self.mqtt_sensor.set_value(new_brightness)
|
||||
|
||||
|
||||
class IPAddressSensor(Sensor):
|
||||
class IPAddressSensor():
|
||||
value = ""
|
||||
mqtt_sensor = None
|
||||
|
||||
|
@ -191,7 +142,7 @@ class IPAddressSensor(Sensor):
|
|||
self.value = IP
|
||||
|
||||
|
||||
class TemperatureSensor(Sensor):
|
||||
class TemperatureSensor():
|
||||
value = -1
|
||||
mqtt_sensor = None
|
||||
|
|
@ -5,6 +5,10 @@ from setuptools import setup, find_packages
|
|||
setup(name='ha-kiosk-agent',
|
||||
version='0.1.0',
|
||||
packages=find_packages(),
|
||||
py_modules=["sensors"],
|
||||
py_modules=[
|
||||
"device"
|
||||
"display"
|
||||
"sensor"
|
||||
],
|
||||
scripts=["ha-kiosk-agent"],
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue