#!/usr/bin/python """The Sirius Calendar Service. This Python program runs on each node. It periodically downloads the schedule file and uses NodeManager's XML-RPC interface to adjust the priority increase. Author: David Eisenstat (deisenst@cs.princeton.edu) Original Sirius implementation by David Lowenthal. """ import fcntl import os import random import signal import socket import sys import threading import time import traceback import urllib from xmlrpclib import ServerProxy # 0 means normal operation # 1 means turn on the short time scales and read the schedule from a file # 2 means additionally don't contact NodeManager DEBUGLEVEL = 0 ######################################## if DEBUGLEVEL < 2: LOGFILE = '/var/log/sirius' else: LOGFILE = 'log.txt' loglock = threading.Lock() def log(msg): """Append and a timestamp to .""" try: if not msg.endswith('\n'): msg += '\n' loglock.acquire() try: logfile = open(LOGFILE, 'a') t = time.time() print >>logfile, t print >>logfile, time.asctime(time.gmtime(t)) print >>logfile, msg finally: loglock.release() except: if DEBUGLEVEL > 0: traceback.print_exc() def logexception(): """Log an exception.""" log(traceback.format_exc()) ######################################## if DEBUGLEVEL > 0: # smaller time units so we can test faster ONEMINUTE = 1 ONEHOUR = 10 * ONEMINUTE else: ONEMINUTE = 60 ONEHOUR = 60 * ONEMINUTE class Periodic: """Periodically make a function call.""" def __init__(self, target, interval, mindelta, maxdelta): self._target = target self._interval = interval self._deltarange = mindelta, maxdelta+1 thr = threading.Thread(target=self.run, args=[target]) thr.setDaemon(True) thr.start() def run(self, target): nextintervalstart = int(time.time() / self._interval) * self._interval while True: try: self._target() except: logexception() nextintervalstart += self._interval nextfiring = nextintervalstart + random.randrange(*self._deltarange) while True: t = time.time() if t < nextfiring: try: time.sleep(nextfiring - t) except: logexception() else: break ######################################## SLOTDURATION = ONEHOUR # instances of @SITE@ and @PREFIX@ below are replaced # as appropriate by db-config SCHEDULEURL = '@SITE@/planetlab/sirius/schedule.txt' schedulelock = threading.Lock() schedule = {} def currentslot(): return int(time.time() / SLOTDURATION) * SLOTDURATION def updateschedule(): """Make one attempt at downloading and updating the schedule.""" log('Contacting PLC...') newschedule = {} # Format is: # timestamp # slicename - starttime - - # ... if DEBUGLEVEL > 0: f = open('/tmp/schedule.txt') else: f = urllib.urlopen(SCHEDULEURL) for line in f: fields = line.split() if len(fields) >= 3: newschedule[fields[2]] = fields[0] log('Current schedule is %s' % newschedule) schedulelock.acquire() try: schedule.clear() schedule.update(newschedule) finally: schedulelock.release() log('Updated schedule successfully') ######################################## nodemanager = ServerProxy('http://127.0.0.1:812/') recipientcond = threading.Condition() recipient = '' versionnumber = 0 def updateloans(): log('Contacting NodeManager...') schedulelock.acquire() try: newrecipient = schedule.get(str(currentslot()), '') finally: schedulelock.release() if newrecipient: loans = [(newrecipient, 'cpu_pct', 25), (newrecipient, 'net_min_rate', 2000)] else: loans = [] log('Current loans are %s' % loans) if DEBUGLEVEL < 2: nodemanager.SetLoans('@PREFIX@_sirius', loans) log('Updated loans successfully') recipientcond.acquire() try: global recipient, versionnumber if recipient != newrecipient: recipient = newrecipient versionnumber += 1 recipientcond.notifyAll() finally: recipientcond.release() ######################################## backoff = 1 def success(): global backoff backoff = 1 def failure(): global backoff try: time.sleep(backoff) except: logexception() backoff = min(backoff*2, 5*ONEMINUTE) def handleclient(clientsock, clientaddress): try: log('Connection from %s:%d' % clientaddress) clientsock.shutdown(socket.SHUT_RD) recipientcond.acquire() while True: recip, vn = recipient, versionnumber recipientcond.release() clientsock.send(recip + '\n') recipientcond.acquire() while versionnumber == vn: recipientcond.wait() except: logexception() def server(): while True: try: sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('', 8124)) sock.listen(5) success() break except: logexception() failure() log('Bound server socket') while True: try: client = sock.accept() threading.Thread(target=handleclient, args=client).start() success() except: logexception() failure() ######################################## def start (slicename): if DEBUGLEVEL < 2: PIDFILE = '/tmp/%s.pid'%slicename else: PIDFILE = '%s.pid'%slicename try: if os.fork(): sys.exit(0) f = open(PIDFILE, 'w') fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except: logexception() sys.exit(1) Periodic(updateschedule, SLOTDURATION, -5*ONEMINUTE, -1*ONEMINUTE) Periodic(updateloans, 5*ONEMINUTE, 0, 0) server() # xxx fixme # at the very least, do nothing def stop(slicename): print '(dummy) stopping',slicename pass def restart(slicename): stop(slicename) start(slicename) def main(): args=sys.argv[1:] mode='start' slicename='sirius' argc=len(args) if argc>=3: print 'too many arguments to initscript',args sys.exit(1) elif argc==2: (mode,slicename)=args elif argc==1: mode=args[0] # print "sirius initscript called with mode=%s and slicename=%s"%(mode,slicename) if mode=='start': start(slicename) elif mode=='stop': stop(slicename) elif mode=='restart': restart(slicename) else: print "unknown mode %s"%mode sys.exit(1) sys.exit(0) if __name__ == '__main__': main()