Monitor for Wöhrle CONVERT-1000 UPS written in Python
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

227 lines
7.5 KiB

#!/usr/bin/python3
#
# Depends on:
# - python-systemd
# - pyyaml
import logging
import shlex
import subprocess
import sys
import yaml
from serial import Serial
from systemd.journal import JournalHandler
from threading import Timer
from time import sleep
class ConvertUPS(Serial):
""" Class that implements monitoring for Wöhrle Convert-[123]000 UPS
devices connected via RS232.
Has to be initialized like serial.Serial().
"""
# Class variables
_isOnBattery = False # Is UPS running on battery right now?
_powerToggled = False # Did power state (battery / AC) toggle recently?
def __init__(self, configfile = '/etc/pyupsmon.yml', port = None,
baudrate = None, interval = None, shutdownTimeout = None,
shutdownCommand = None, debug = None, *args, **kwargs):
""" Read config file and/or set default config values. """
self.serialPort = port
self.serialBaud = baudrate
self.interval = interval
self.shutdownTimeout = shutdownTimeout
self.shutdownCommand = shutdownCommand
self.debug = debug
try:
with open(configfile, 'r') as ymlfile:
cfg = yaml.load(ymlfile)
if cfg.get('serial') is not None:
if self.serialPort is None:
self.serialPort = cfg['serial'].get('port', '/dev/ttyS0')
if self.serialBaud is None:
self.serialBaud = int(cfg['serial'].get('baudrate', 2400))
if cfg.get('daemon') is not None:
if self.interval is None:
self.interval = float(cfg['daemon'].get('interval', 1.0))
if self.debug is None:
self.debug = bool(cfg['daemon'].get('debug', False))
if cfg.get('shutdown') is not None:
if self.shutdownTimeout is None:
self.shutdownTimeout = float(cfg['shutdown'].get('timeout', 3600))
if self.shutdownCommand is None:
self.shutdownCommand = shlex.split(cfg['shutdown'].get('command', '/sbin/shutdown -h +2'))
except OSError:
if debug:
print('Could not read config file %s.' % configfile)
if self.serialPort is None:
self.serialPort = '/dev/ttyS0'
if self.serialBaud is None:
self.serialBaud = 2400
if self.interval is None:
self.interval = 1.0
if self.debug is None:
self.debug = False
if self.shutdownTimeout is None:
self.shutdownTimeout = 3600.0
if self.shutdownCommand is None:
self.shutdownCommand = shlex.split('/sbin/shutdown -h +2')
if self.debug:
print('Configuration:')
print('\tSerial Port:\t\t%s' % self.serialPort)
print('\tBaud Rate:\t\t%d' % self.serialBaud)
print('\tQuery Interval:\t\t%.2f s' % self.interval)
print('\tDebugging:\t\t%s' % self.debug)
print('\tShutdown Timeout:\t%.2f' % self.shutdownTimeout)
print('\tShutdown Command:\t%s' % self.shutdownCommand)
super(ConvertUPS, self).__init__(port = self.serialPort,
baudrate = self.serialBaud, *args, **kwargs)
def upsInit(self):
""" Start communicating with the UPS. """
if not self.name:
if self.debug:
raise NameError('Please, set the serial communication parameters first.')
if self.debug:
print('Successfully connected to serial port %s' % (ups.name))
print('Sending initialization sequence (you should hear a short beep).')
self.write(b'CT\rDQ1\r')
sleep(5.0)
self.write(b'Rt\r')
sleep(3.0)
self.write(b'Q1\r')
sleep(1.0)
readbuffer = b''
data = self.read(1)
while b'\r' not in data:
readbuffer += data
data = self.read(1)
if readbuffer.strip().startswith(b'('):
if self.debug:
print('Initialization successful.')
print('Initial values returned:', readbuffer.strip())
return True
return
def upsRead(self):
""" Read the current status of the UPS. """
ups.write(b'Q1\r')
readbuffer = b''
data = ups.read(1)
while b'\r' not in data:
readbuffer += data
data = ups.read(1)
if self.debug:
print(readbuffer.strip())
return readbuffer.strip()
def isOnBattery(self):
""" Check whether UPS is on Battery. """
status = self.upsRead()
if status.startswith(b'(000.0 '):
if not self._isOnBattery:
self._isOnBattery = True
self._powerToggled = True
if self.debug:
print('UPS is on battery.')
else:
if self._isOnBattery:
self._isOnBattery = False
self._powerToggled = True
if self.debug:
print('UPS is on AC.')
return self._isOnBattery
def powerToggled(self):
""" Check if a power toggle (AC -> Battery or vice versa)
occured and reset the flag.
"""
onBattery = self.isOnBattery()
if self._powerToggled:
self._powerToggled = False
if self.debug:
print('Power toggle has occured recently.')
if onBattery:
return(True, True)
else:
return(True, False)
if self.debug:
print('No power toggle has occured recently.')
if onBattery:
return (False, True)
return (False, False)
def doShutdown(self):
""" Execute self.shutdownCommand. """
subprocess.call(self.shutdownCommand)
def startShutdownTimer(self):
""" Start a timer that executes self.shutdownCommand as soon as
self.shutdownTimeout has expired.
If self.shutdownTimeout == 0, the timer is disabled.
"""
if self.shutdownTimeout > 0:
self.timer = Timer(self.shutdownTimeout, self.doShutdown)
self.timer.start()
def cancelShutdownTimer(self):
""" Cancel a previously started shutdown timer. """
if self.timer is not None:
self.timer.cancel()
self.timer = None
if __name__ == "__main__":
log = logging.getLogger('pyupsmon')
log.addHandler(JournalHandler())
log.setLevel(logging.INFO)
try:
ups = ConvertUPS()
except:
log.error('Error: Could not open UPS serial connection.')
log.error('\tCheck the config file!')
print('Error: Could not open UPS serial connection.')
print('\tCheck the config file!')
sys.exit(1)
if not ups.upsInit():
log.error('PyUPSMon successfully started.')
print('Error: Could not initialize UPS.')
sys.exit(1)
log.info('PyUPSMon successfully started.')
while True:
(toggled, onBatt) = ups.powerToggled()
if toggled:
if onBatt:
log.critical('UPS AC power loss. Starting shutdown timer.')
print('UPS AC power loss. Starting shutdown timer.')
ups.startShutdownTimer()
if not onBatt:
log.critical('UPS AC power restored. Cancelling shutdown timer.')
print('UPS AC power restored. Cancelling shutdown timer.')
ups.cancelShutdownTimer()
sleep(ups.interval)