From: Faiyaz Ahmed Date: Wed, 5 Mar 2008 01:51:05 +0000 (+0000) Subject: parsed by db-config. puts entry into default initscripts X-Git-Tag: MyPLC-4.2-4~20 X-Git-Url: http://git.onelab.eu/?p=myplc.git;a=commitdiff_plain;h=4071dbf895f2796aba775c718f1a9f3a2497957e parsed by db-config. puts entry into default initscripts --- diff --git a/plc_sliceinitscripts/sirius.py b/plc_sliceinitscripts/sirius.py new file mode 100644 index 0000000..fe68581 --- /dev/null +++ b/plc_sliceinitscripts/sirius.py @@ -0,0 +1,257 @@ +#!/usr/bin/python + +"""The Sirius Calendar Service. + +This Python program runs on each node. It periodically downloads the schedule file and uses NodeManager's XML-RPC interface to adjust the priority increase. + +Author: David Eisenstat (deisenst@cs.princeton.edu) + +Original Sirius implementation by David Lowenthal. +""" + +import fcntl +import os +import random +import signal +import socket +import sys +import threading +import time +import traceback +import urllib +from xmlrpclib import ServerProxy + + +# 0 means normal operation +# 1 means turn on the short time scales and read the schedule from a file +# 2 means additionally don't contact NodeManager + +DEBUGLEVEL = 0 + +######################################## + +if DEBUGLEVEL < 2: + LOGFILE = '/var/log/sirius' +else: + LOGFILE = 'log.txt' + +loglock = threading.Lock() + + +def log(msg): + """Append and a timestamp to .""" + try: + if not msg.endswith('\n'): + msg += '\n' + loglock.acquire() + try: + logfile = open(LOGFILE, 'a') + t = time.time() + print >>logfile, t + print >>logfile, time.asctime(time.gmtime(t)) + print >>logfile, msg + finally: + loglock.release() + except: + if DEBUGLEVEL > 0: + traceback.print_exc() + + +def logexception(): + """Log an exception.""" + log(traceback.format_exc()) + +######################################## + +if DEBUGLEVEL > 0: + # smaller time units so we can test faster + ONEMINUTE = 1 + ONEHOUR = 10 * ONEMINUTE +else: + ONEMINUTE = 60 + ONEHOUR = 60 * ONEMINUTE + + +class Periodic: + """Periodically make a function call.""" + + def __init__(self, target, interval, mindelta, maxdelta): + self._target = target + self._interval = interval + self._deltarange = mindelta, maxdelta+1 + thr = threading.Thread(target=self.run, args=[target]) + thr.setDaemon(True) + thr.start() + + def run(self, target): + nextintervalstart = int(time.time() / self._interval) * self._interval + while True: + try: + self._target() + except: + logexception() + nextintervalstart += self._interval + nextfiring = nextintervalstart + random.randrange(*self._deltarange) + while True: + t = time.time() + if t < nextfiring: + try: + time.sleep(nextfiring - t) + except: + logexception() + else: + break + +######################################## + +SLOTDURATION = ONEHOUR + +SCHEDULEURL = 'XXXSITEXXX/planetlab/sirius/schedule.txt' + +schedulelock = threading.Lock() + +schedule = {} + + +def currentslot(): + return int(time.time() / SLOTDURATION) * SLOTDURATION + + +def updateschedule(): + """Make one attempt at downloading and updating the schedule.""" + log('Contacting PLC...') + newschedule = {} + # Format is: + # timestamp + # slicename - starttime - - + # ... + if DEBUGLEVEL > 0: + f = open('/tmp/schedule.txt') + else: + f = urllib.urlopen(SCHEDULEURL) + for line in f: + fields = line.split() + if len(fields) >= 3: + newschedule[fields[2]] = fields[0] + log('Current schedule is %s' % newschedule) + + schedulelock.acquire() + try: + schedule.clear() + schedule.update(newschedule) + finally: + schedulelock.release() + log('Updated schedule successfully') + +######################################## + +nodemanager = ServerProxy('http://127.0.0.1:812/') + +recipientcond = threading.Condition() + +recipient = '' +versionnumber = 0 + +def updateloans(): + log('Contacting NodeManager...') + schedulelock.acquire() + try: + newrecipient = schedule.get(str(currentslot()), '') + finally: + schedulelock.release() + if newrecipient: + loans = [(newrecipient, 'cpu_pct', 25), (newrecipient, 'net_min_rate', 2000)] + else: + loans = [] + log('Current loans are %s' % loans) + + if DEBUGLEVEL < 2: + nodemanager.SetLoans('princeton_sirius', loans) + log('Updated loans successfully') + + recipientcond.acquire() + try: + global recipient, versionnumber + if recipient != newrecipient: + recipient = newrecipient + versionnumber += 1 + recipientcond.notifyAll() + finally: + recipientcond.release() + +######################################## + +backoff = 1 + +def success(): + global backoff + backoff = 1 + +def failure(): + global backoff + try: + time.sleep(backoff) + except: + logexception() + backoff = min(backoff*2, 5*ONEMINUTE) + + +def handleclient(clientsock, clientaddress): + try: + log('Connection from %s:%d' % clientaddress) + clientsock.shutdown(socket.SHUT_RD) + recipientcond.acquire() + while True: + recip, vn = recipient, versionnumber + recipientcond.release() + clientsock.send(recip + '\n') + + recipientcond.acquire() + while versionnumber == vn: + recipientcond.wait() + except: + logexception() + + +def server(): + while True: + try: + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('', 8124)) + sock.listen(5) + success() + break + except: + logexception() + failure() + log('Bound server socket') + + while True: + try: + client = sock.accept() + threading.Thread(target=handleclient, args=client).start() + success() + except: + logexception() + failure() + +######################################## + +if DEBUGLEVEL < 2: + PIDFILE = '/tmp/sirius.pid' +else: + PIDFILE = 'sirius.pid' + +try: + if os.fork(): + sys.exit(0) + f = open(PIDFILE, 'w') + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) +except: + logexception() + sys.exit(1) + +Periodic(updateschedule, SLOTDURATION, -5*ONEMINUTE, -1*ONEMINUTE) +Periodic(updateloans, 5*ONEMINUTE, 0, 0) +server()