228 lines
7.5 KiB
Python
228 lines
7.5 KiB
Python
#!/usr/bin/python3
|
|
#
|
|
# Depends on:
|
|
# - python-systemd
|
|
# - pyyaml
|
|
|
|
import logging
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
import yaml
|
|
|
|
from serial import Serial
|
|
from systemd.journal import JournalHandler
|
|
from threading import Timer
|
|
from time import sleep
|
|
|
|
|
|
class ConvertUPS(Serial):
|
|
""" Class that implements monitoring for Wöhrle Convert-[123]000 UPS
|
|
devices connected via RS232.
|
|
Has to be initialized like serial.Serial().
|
|
"""
|
|
|
|
# Class variables
|
|
_isOnBattery = False # Is UPS running on battery right now?
|
|
_powerToggled = False # Did power state (battery / AC) toggle recently?
|
|
|
|
|
|
def __init__(self, configfile = '/etc/pyupsmon.yml', port = None,
|
|
baudrate = None, interval = None, shutdownTimeout = None,
|
|
shutdownCommand = None, debug = None, *args, **kwargs):
|
|
""" Read config file and/or set default config values. """
|
|
self.serialPort = port
|
|
self.serialBaud = baudrate
|
|
self.interval = interval
|
|
self.shutdownTimeout = shutdownTimeout
|
|
self.shutdownCommand = shutdownCommand
|
|
self.debug = debug
|
|
|
|
try:
|
|
with open(configfile, 'r') as ymlfile:
|
|
cfg = yaml.load(ymlfile)
|
|
|
|
if cfg.get('serial') is not None:
|
|
if self.serialPort is None:
|
|
self.serialPort = cfg['serial'].get('port', '/dev/ttyS0')
|
|
if self.serialBaud is None:
|
|
self.serialBaud = int(cfg['serial'].get('baudrate', 2400))
|
|
if cfg.get('daemon') is not None:
|
|
if self.interval is None:
|
|
self.interval = float(cfg['daemon'].get('interval', 1.0))
|
|
if self.debug is None:
|
|
self.debug = bool(cfg['daemon'].get('debug', False))
|
|
if cfg.get('shutdown') is not None:
|
|
if self.shutdownTimeout is None:
|
|
self.shutdownTimeout = float(cfg['shutdown'].get('timeout', 3600))
|
|
if self.shutdownCommand is None:
|
|
self.shutdownCommand = shlex.split(cfg['shutdown'].get('command', '/sbin/shutdown -h +2'))
|
|
except OSError:
|
|
if debug:
|
|
print('Could not read config file %s.' % configfile)
|
|
|
|
if self.serialPort is None:
|
|
self.serialPort = '/dev/ttyS0'
|
|
if self.serialBaud is None:
|
|
self.serialBaud = 2400
|
|
if self.interval is None:
|
|
self.interval = 1.0
|
|
if self.debug is None:
|
|
self.debug = False
|
|
if self.shutdownTimeout is None:
|
|
self.shutdownTimeout = 3600.0
|
|
if self.shutdownCommand is None:
|
|
self.shutdownCommand = shlex.split('/sbin/shutdown -h +2')
|
|
|
|
if self.debug:
|
|
print('Configuration:')
|
|
print('\tSerial Port:\t\t%s' % self.serialPort)
|
|
print('\tBaud Rate:\t\t%d' % self.serialBaud)
|
|
print('\tQuery Interval:\t\t%.2f s' % self.interval)
|
|
print('\tDebugging:\t\t%s' % self.debug)
|
|
print('\tShutdown Timeout:\t%.2f' % self.shutdownTimeout)
|
|
print('\tShutdown Command:\t%s' % self.shutdownCommand)
|
|
|
|
super(ConvertUPS, self).__init__(port = self.serialPort,
|
|
baudrate = self.serialBaud, *args, **kwargs)
|
|
|
|
|
|
def upsInit(self):
|
|
""" Start communicating with the UPS. """
|
|
if not self.name:
|
|
if self.debug:
|
|
raise NameError('Please, set the serial communication parameters first.')
|
|
if self.debug:
|
|
print('Successfully connected to serial port %s' % (ups.name))
|
|
print('Sending initialization sequence (you should hear a short beep).')
|
|
|
|
self.write(b'CT\rDQ1\r')
|
|
sleep(5.0)
|
|
self.write(b'Rt\r')
|
|
sleep(3.0)
|
|
|
|
self.write(b'Q1\r')
|
|
sleep(1.0)
|
|
|
|
readbuffer = b''
|
|
data = self.read(1)
|
|
while b'\r' not in data:
|
|
readbuffer += data
|
|
data = self.read(1)
|
|
|
|
if readbuffer.strip().startswith(b'('):
|
|
if self.debug:
|
|
print('Initialization successful.')
|
|
print('Initial values returned:', readbuffer.strip())
|
|
return True
|
|
return
|
|
|
|
|
|
def upsRead(self):
|
|
""" Read the current status of the UPS. """
|
|
ups.write(b'Q1\r')
|
|
readbuffer = b''
|
|
data = ups.read(1)
|
|
|
|
while b'\r' not in data:
|
|
readbuffer += data
|
|
data = ups.read(1)
|
|
if self.debug:
|
|
print(readbuffer.strip())
|
|
return readbuffer.strip()
|
|
|
|
|
|
def isOnBattery(self):
|
|
""" Check whether UPS is on Battery. """
|
|
status = self.upsRead()
|
|
if status.startswith(b'(000.0 '):
|
|
if not self._isOnBattery:
|
|
self._isOnBattery = True
|
|
self._powerToggled = True
|
|
if self.debug:
|
|
print('UPS is on battery.')
|
|
else:
|
|
if self._isOnBattery:
|
|
self._isOnBattery = False
|
|
self._powerToggled = True
|
|
if self.debug:
|
|
print('UPS is on AC.')
|
|
return self._isOnBattery
|
|
|
|
|
|
def powerToggled(self):
|
|
""" Check if a power toggle (AC -> Battery or vice versa)
|
|
occured and reset the flag.
|
|
"""
|
|
onBattery = self.isOnBattery()
|
|
if self._powerToggled:
|
|
self._powerToggled = False
|
|
if self.debug:
|
|
print('Power toggle has occured recently.')
|
|
if onBattery:
|
|
return(True, True)
|
|
else:
|
|
return(True, False)
|
|
if self.debug:
|
|
print('No power toggle has occured recently.')
|
|
if onBattery:
|
|
return (False, True)
|
|
return (False, False)
|
|
|
|
|
|
def doShutdown(self):
|
|
""" Execute self.shutdownCommand. """
|
|
subprocess.call(self.shutdownCommand)
|
|
|
|
|
|
def startShutdownTimer(self):
|
|
""" Start a timer that executes self.shutdownCommand as soon as
|
|
self.shutdownTimeout has expired.
|
|
If self.shutdownTimeout == 0, the timer is disabled.
|
|
"""
|
|
if self.shutdownTimeout > 0:
|
|
self.timer = Timer(self.shutdownTimeout, self.doShutdown)
|
|
self.timer.start()
|
|
|
|
|
|
def cancelShutdownTimer(self):
|
|
""" Cancel a previously started shutdown timer. """
|
|
if self.timer is not None:
|
|
self.timer.cancel()
|
|
self.timer = None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
log = logging.getLogger('pyupsmon')
|
|
log.addHandler(JournalHandler())
|
|
log.setLevel(logging.INFO)
|
|
|
|
try:
|
|
ups = ConvertUPS()
|
|
except:
|
|
log.error('Error: Could not open UPS serial connection.')
|
|
log.error('\tCheck the config file!')
|
|
print('Error: Could not open UPS serial connection.')
|
|
print('\tCheck the config file!')
|
|
sys.exit(1)
|
|
|
|
if not ups.upsInit():
|
|
log.error('PyUPSMon successfully started.')
|
|
print('Error: Could not initialize UPS.')
|
|
sys.exit(1)
|
|
|
|
log.info('PyUPSMon successfully started.')
|
|
|
|
while True:
|
|
(toggled, onBatt) = ups.powerToggled()
|
|
if toggled:
|
|
if onBatt:
|
|
log.critical('UPS AC power loss. Starting shutdown timer.')
|
|
print('UPS AC power loss. Starting shutdown timer.')
|
|
ups.startShutdownTimer()
|
|
if not onBatt:
|
|
log.critical('UPS AC power restored. Cancelling shutdown timer.')
|
|
print('UPS AC power restored. Cancelling shutdown timer.')
|
|
ups.cancelShutdownTimer()
|
|
sleep(ups.interval)
|