#!/usr/bin/python3 # # Depends on: # - python-systemd # - pyyaml import logging import shlex import subprocess import sys import yaml from serial import Serial from systemd.journal import JournalHandler from threading import Timer from time import sleep class ConvertUPS(Serial): """ Class that implements monitoring for Wöhrle Convert-[123]000 UPS devices connected via RS232. Has to be initialized like serial.Serial(). """ # Class variables _isOnBattery = False # Is UPS running on battery right now? _powerToggled = False # Did power state (battery / AC) toggle recently? def __init__(self, configfile = '/etc/pyupsmon.yml', port = None, baudrate = None, interval = None, shutdownTimeout = None, shutdownCommand = None, debug = None, *args, **kwargs): """ Read config file and/or set default config values. """ self.serialPort = port self.serialBaud = baudrate self.interval = interval self.shutdownTimeout = shutdownTimeout self.shutdownCommand = shutdownCommand self.debug = debug try: with open(configfile, 'r') as ymlfile: cfg = yaml.load(ymlfile) if cfg.get('serial') is not None: if self.serialPort is None: self.serialPort = cfg['serial'].get('port', '/dev/ttyS0') if self.serialBaud is None: self.serialBaud = int(cfg['serial'].get('baudrate', 2400)) if cfg.get('daemon') is not None: if self.interval is None: self.interval = float(cfg['daemon'].get('interval', 1.0)) if self.debug is None: self.debug = bool(cfg['daemon'].get('debug', False)) if cfg.get('shutdown') is not None: if self.shutdownTimeout is None: self.shutdownTimeout = float(cfg['shutdown'].get('timeout', 3600)) if self.shutdownCommand is None: self.shutdownCommand = shlex.split(cfg['shutdown'].get('command', '/sbin/shutdown -h +2')) except OSError: if debug: print('Could not read config file %s.' % configfile) if self.serialPort is None: self.serialPort = '/dev/ttyS0' if self.serialBaud is None: self.serialBaud = 2400 if self.interval is None: self.interval = 1.0 if self.debug is None: self.debug = False if self.shutdownTimeout is None: self.shutdownTimeout = 3600.0 if self.shutdownCommand is None: self.shutdownCommand = shlex.split('/sbin/shutdown -h +2') if self.debug: print('Configuration:') print('\tSerial Port:\t\t%s' % self.serialPort) print('\tBaud Rate:\t\t%d' % self.serialBaud) print('\tQuery Interval:\t\t%.2f s' % self.interval) print('\tDebugging:\t\t%s' % self.debug) print('\tShutdown Timeout:\t%.2f' % self.shutdownTimeout) print('\tShutdown Command:\t%s' % self.shutdownCommand) super(ConvertUPS, self).__init__(port = self.serialPort, baudrate = self.serialBaud, *args, **kwargs) def upsInit(self): """ Start communicating with the UPS. """ if not self.name: if self.debug: raise NameError('Please, set the serial communication parameters first.') if self.debug: print('Successfully connected to serial port %s' % (ups.name)) print('Sending initialization sequence (you should hear a short beep).') self.write(b'CT\rDQ1\r') sleep(5.0) self.write(b'Rt\r') sleep(3.0) self.write(b'Q1\r') sleep(1.0) readbuffer = b'' data = self.read(1) while b'\r' not in data: readbuffer += data data = self.read(1) if readbuffer.strip().startswith(b'('): if self.debug: print('Initialization successful.') print('Initial values returned:', readbuffer.strip()) return True return def upsRead(self): """ Read the current status of the UPS. """ ups.write(b'Q1\r') readbuffer = b'' data = ups.read(1) while b'\r' not in data: readbuffer += data data = ups.read(1) if self.debug: print(readbuffer.strip()) return readbuffer.strip() def isOnBattery(self): """ Check whether UPS is on Battery. """ status = self.upsRead() if status.startswith(b'(000.0 '): if not self._isOnBattery: self._isOnBattery = True self._powerToggled = True if self.debug: print('UPS is on battery.') else: if self._isOnBattery: self._isOnBattery = False self._powerToggled = True if self.debug: print('UPS is on AC.') return self._isOnBattery def powerToggled(self): """ Check if a power toggle (AC -> Battery or vice versa) occured and reset the flag. """ onBattery = self.isOnBattery() if self._powerToggled: self._powerToggled = False if self.debug: print('Power toggle has occured recently.') if onBattery: return(True, True) else: return(True, False) if self.debug: print('No power toggle has occured recently.') if onBattery: return (False, True) return (False, False) def doShutdown(self): """ Execute self.shutdownCommand. """ subprocess.call(self.shutdownCommand) def startShutdownTimer(self): """ Start a timer that executes self.shutdownCommand as soon as self.shutdownTimeout has expired. If self.shutdownTimeout == 0, the timer is disabled. """ if self.shutdownTimeout > 0: self.timer = Timer(self.shutdownTimeout, self.doShutdown) self.timer.start() def cancelShutdownTimer(self): """ Cancel a previously started shutdown timer. """ if self.timer is not None: self.timer.cancel() self.timer = None if __name__ == "__main__": log = logging.getLogger('pyupsmon') log.addHandler(JournalHandler()) log.setLevel(logging.INFO) try: ups = ConvertUPS() except: log.error('Error: Could not open UPS serial connection.') log.error('\tCheck the config file!') print('Error: Could not open UPS serial connection.') print('\tCheck the config file!') sys.exit(1) if not ups.upsInit(): log.error('PyUPSMon successfully started.') print('Error: Could not initialize UPS.') sys.exit(1) log.info('PyUPSMon successfully started.') while True: (toggled, onBatt) = ups.powerToggled() if toggled: if onBatt: log.critical('UPS AC power loss. Starting shutdown timer.') print('UPS AC power loss. Starting shutdown timer.') ups.startShutdownTimer() if not onBatt: log.critical('UPS AC power restored. Cancelling shutdown timer.') print('UPS AC power restored. Cancelling shutdown timer.') ups.cancelShutdownTimer() sleep(ups.interval)