ha-mqtt-agent/ha_mqtt_agent/device.py

138 lines
4.1 KiB
Python

import psutil
import re
import subprocess
import ha_mqtt_discoverable
import ha_mqtt_discoverable.sensors
from functools import cache, cached_property
from typing import List
class Device:
name = ""
device_id = ""
mqtt_settings = None
_mqtt_device = None
def __init__(
self, name, device_id, mqtt_settings: ha_mqtt_discoverable.Settings.MQTT
):
self.name = name
self.device_id = device_id
self.mqtt_settings = mqtt_settings
@cache
def _read_file(self, path: str) -> List[str]:
try:
f = open(path)
except FileNotFoundError:
return []
else:
with f:
return f.readlines()
def _read_first_file(self, paths: List[str]) -> List[str]:
for p in paths:
res = self._read_file(p)
if res:
return res
return []
@property
def mqtt_device(self):
if not self._mqtt_device:
self._mqtt_device = ha_mqtt_discoverable.DeviceInfo(
name=self.name,
identifiers=self.device_id,
manufacturer=self.manufacturer,
model=self.model,
hw_version=self.hardware_version,
sw_version=self.software_version,
)
return self._mqtt_device
@cached_property
def manufacturer(self) -> str:
# read dmi data
dmi_files = [
"/sys/devices/virtual/dmi/id/sys_vendor",
"/sys/devices/virtual/dmi/id/chassis_vendor",
"/sys/devices/virtual/dmi/id/board_vendor",
]
res = self._read_first_file(dmi_files)
if res:
return res[0].strip()
# try to get manufacturer from cpu info, works with rpi kernel
res = [l for l in self._read_file("/proc/cpuinfo") if l.startswith("Model")]
if res:
model = res[0].split(":")[1].strip()
if model.startswith("Raspberry"):
return "Raspberry Pi Foundation"
@cached_property
def model(self) -> str:
# read dmi data
dmi_files = [
"/sys/devices/virtual/dmi/id/product_family",
"/sys/devices/virtual/dmi/id/product_version",
]
res = self._read_first_file(dmi_files)
if res:
return res[0].strip()
# try to get model from cpu info, works with rpi kernel
res = [l for l in self._read_file("/proc/cpuinfo") if l.startswith("Model")]
if res:
model = res[0].split(":")[1].strip().split("Rev")
if model:
return model[0]
@cached_property
def software_version(self) -> str:
res = [
l for l in self._read_file("/etc/os-release") if l.startswith("PRETTY_NAME")
]
if res:
sfv = res[0].split("=")[1].strip()
return sfv[1:-1]
@cached_property
def hardware_version(self) -> str:
# read dmi data, works with many x86_64 devices
dmi_files = [
"/sys/devices/virtual/dmi/id/product_name",
"/sys/devices/virtual/dmi/id/board_name",
]
res = self._read_first_file(dmi_files)
if res:
return res[0].strip()
# try to get hardware version from cpu info, works with rpi kernel
# TODO: find way of retrieving hardware data on rpi with upstream kernel
res = [l for l in self._read_file("/proc/cpuinfo") if l.startswith("Model")]
if res:
model = res[0].split(":")[1].strip().split("Rev")
if model and len(model) == 2:
return model[1].strip()
@cached_property
def has_battery(self) -> bool:
return psutil.sensors_battery() != None
@cached_property
def has_wifi(self) -> str:
for ifname in [
ifname for ifname, opts in psutil.net_if_stats().items() if opts.isup
]:
if not re.match("^(wl|wlan)", ifname):
continue
# TODO: check if default gateway is configured on this interface
return ifname
return None