Files
meb-battery/python-reference/daly-bms-cli
Giuseppe Raffa 4d5d51c018 Una repositoy che integra un plugin custom su SignalK per ottenere tutti i dati del BMS della batteria
- Introdotta l'implementazione JavaScript per la comunicazione BMS in bmscore.js, inclusi i metodi per il recupero dati e la gestione degli errori.
- Creato errors.js per mappare i codici di errore dal formato Python a quello JavaScript.
2026-05-11 19:45:07 +02:00

255 lines
8.1 KiB
Python
Executable File

#!/usr/bin/python3
import argparse
import json
import logging
import sys
from dalybms import DalyBMS
from dalybms import DalyBMSSinowealth
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--device",
help="RS485 device, e.g. /dev/ttyUSB0",
type=str, required=True)
parser.add_argument("--uart", help="UART instead of RS485", action="store_true")
parser.add_argument("--sinowealth", help="BMS with Sinowealth chip", action="store_true")
parser.add_argument("--status", help="show status", action="store_true")
parser.add_argument("--soc", help="show voltage, current, SOC", action="store_true")
parser.add_argument("--mosfet", help="show mosfet status", action="store_true")
parser.add_argument("--cell-voltages", help="show cell voltages", action="store_true")
parser.add_argument("--temperatures", help="show temperature sensor values", action="store_true")
parser.add_argument("--balancing", help="show cell balancing status", action="store_true")
parser.add_argument("--errors", help="show BMS errors", action="store_true")
parser.add_argument("--all", help="show all", action="store_true")
parser.add_argument("--check", help="Nagios style check", action="store_true")
parser.add_argument("--set-charge-mosfet", help="'on' or 'off'", type=str)
parser.add_argument("--set-discharge-mosfet", help="'on' or 'off'", type=str)
parser.add_argument("--set-soc", help="'0.0' to '100.0'", type=str)
parser.add_argument("--restart", help="restart bms", action="store_true")
parser.add_argument("--retry", help="retry X times if the request fails, default 5", type=int, default=5)
parser.add_argument("--verbose", help="Verbose output", action="store_true")
parser.add_argument("--mqtt", help="Write output to MQTT", action="store_true")
parser.add_argument("--mqtt-hass", help="MQTT Home Assistant Mode", action="store_true")
parser.add_argument("--mqtt-topic",
help="MQTT topic to write to. default daly_bms",
type=str,
default="daly_bms")
parser.add_argument("--mqtt-broker",
help="MQTT broker (server). default localhost",
type=str,
default="localhost")
parser.add_argument("--mqtt-port",
help="MQTT port. default 1883",
type=int,
default=1883)
parser.add_argument("--mqtt-user",
help="Username to authenticate MQTT with",
type=str)
parser.add_argument("--mqtt-password",
help="Password to authenticate MQTT with",
type=str)
args = parser.parse_args()
log_format = '%(levelname)-8s [%(filename)s:%(lineno)d] %(message)s'
if args.verbose:
level = logging.DEBUG
else:
level = logging.WARNING
logging.basicConfig(level=level, format=log_format, datefmt='%H:%M:%S')
logger = logging.getLogger()
if args.uart:
address = 8
else:
address = 4
if args.sinowealth:
bms = DalyBMSSinowealth(request_retries=args.retry, logger=logger)
else:
bms = DalyBMS(request_retries=args.retry, address=address, logger=logger)
bms.connect(device=args.device)
result = False
mqtt_client = None
if args.mqtt:
import paho.mqtt.client as paho
mqtt_client = paho.Client()
mqtt_client.enable_logger(logger)
mqtt_client.username_pw_set(args.mqtt_user, args.mqtt_password)
mqtt_client.connect(args.mqtt_broker, port=args.mqtt_port)
def build_mqtt_hass_config_discovery(base):
# Instead of daly_bms should be here added a proper name (unique), like serial or something
# At this point it can be used only one daly_bms system with hass discovery
hass_config_topic = f'homeassistant/sensor/daly_bms/{base.replace("/", "_")}/config'
hass_config_data = {}
hass_config_data["unique_id"] = f'daly_bms_{base.replace("/", "_")}'
hass_config_data["name"] = f'Daly BMS {base.replace("/", " ")}'
if 'soc_percent' in base:
hass_config_data["device_class"] = 'battery'
hass_config_data["unit_of_measurement"] = '%'
elif 'voltage' in base and not ('lowest_cell' in base or 'highest_cell' in base):
hass_config_data["device_class"] = 'voltage'
hass_config_data["unit_of_measurement"] = 'V'
elif 'current' in base:
hass_config_data["device_class"] = 'current'
hass_config_data["unit_of_measurement"] = 'A'
elif 'temperatures' in base:
hass_config_data["device_class"] = 'temperature'
hass_config_data["unit_of_measurement"] = '°C'
elif 'capacity' in 'base':
hass_config_data["device_class"] = 'energy'
hass_config_data["unit_of_measurement"] = 'Ah'
else:
pass
hass_config_data["json_attributes_topic"] = f'{args.mqtt_topic}{base}'
hass_config_data["state_topic"] = f'{args.mqtt_topic}{base}'
hass_device = {
"identifiers": ['daly_bms'],
"manufacturer": 'Daly',
"model": 'Currently not available',
"name": 'Daly BMS',
"sw_version": 'Currently not available'
}
hass_config_data["device"] = hass_device
return hass_config_topic, json.dumps(hass_config_data)
def mqtt_single_out(topic, data, retain=False):
logger.debug(f'Send data: {data} on topic: {topic}, retain flag: {retain}')
mqtt_client.publish(topic, data, retain=retain)
def mqtt_iterator(result, base=''):
for key in result.keys():
if type(result[key]) == dict:
mqtt_iterator(result[key], f'{base}/{key}')
else:
if args.mqtt_hass:
logger.debug('Sending out hass discovery message')
topic, output = build_mqtt_hass_config_discovery(f'{base}/{key}')
mqtt_single_out(topic, output, retain=True)
if type(result[key]) == list:
val = json.dumps(result[key])
else:
val = result[key]
mqtt_single_out(f'{args.mqtt_topic}{base}/{key}', val)
def print_result(result):
if args.mqtt:
mqtt_iterator(result)
else:
print(json.dumps(result, indent=2))
if args.status:
result = bms.get_status()
print_result(result)
if args.soc:
result = bms.get_soc()
print_result(result)
if args.mosfet:
result = bms.get_mosfet_status()
print_result(result)
if args.cell_voltages:
if not args.status:
bms.get_status()
result = bms.get_cell_voltages()
print_result(result)
if args.temperatures:
result = bms.get_temperatures()
print_result(result)
if args.balancing:
result = bms.get_balancing_status()
print_result(result)
if args.errors:
result = bms.get_errors()
print_result(result)
if args.all:
result = bms.get_all()
print_result(result)
if args.check:
status = bms.get_status()
status_code = 0 # OK
status_codes = ('OK', 'WARNING', 'CRITICAL', 'UNKNOWN')
status_line = ''
data = bms.get_soc()
perfdata = []
if data:
for key, value in data.items():
perfdata.append('%s=%s' % (key, value))
# todo: read errors
if status_code == 0:
status_line = '%0.1f volt, %0.1f amper' % (data['total_voltage'], data['current'])
print("%s - %s | %s" % (status_codes[status_code], status_line, " ".join(perfdata)))
sys.exit(status_code)
if args.set_charge_mosfet:
if args.set_charge_mosfet == 'on':
on = True
elif args.set_charge_mosfet == 'off':
on = False
else:
print("invalid value '%s', expected 'on' or 'off'" % args.set_charge_mosfet)
sys.exit(1)
result = bms.set_charge_mosfet(on=on)
if args.set_discharge_mosfet:
if args.set_discharge_mosfet == 'on':
on = True
elif args.set_discharge_mosfet == 'off':
on = False
else:
print("invalid value '%s', expected 'on' or 'off'" % args.set_discharge_mosfet)
sys.exit(1)
result = bms.set_discharge_mosfet(on=on)
if args.set_soc:
try :
v = float(args.set_soc)
except :
print("invalid value '%s', expected float value betwen 0 and 100" % args.set_soc)
sys.exit(1)
result = bms.set_soc(v)
if args.restart:
result = bms.restart()
if mqtt_client:
mqtt_client.disconnect()
bms.disconnect()
if not result:
sys.exit(1)