+#!/usr/bin/python
+
+"""The Sirius Calendar Service.
+
+This Python program runs on each node. It periodically downloads the schedule file and uses NodeManager's XML-RPC interface to adjust the priority increase.
+
+Author: David Eisenstat (deisenst@cs.princeton.edu)
+
+Original Sirius implementation by David Lowenthal.
+"""
+
+import fcntl
+import os
+import random
+import signal
+import socket
+import sys
+import threading
+import time
+import traceback
+import urllib
+from xmlrpclib import ServerProxy
+
+
+# 0 means normal operation
+# 1 means turn on the short time scales and read the schedule from a file
+# 2 means additionally don't contact NodeManager
+
+DEBUGLEVEL = 0
+
+########################################
+
+if DEBUGLEVEL < 2:
+ LOGFILE = '/var/log/sirius'
+else:
+ LOGFILE = 'log.txt'
+
+loglock = threading.Lock()
+
+
+def log(msg):
+ """Append <msg> and a timestamp to <LOGFILE>."""
+ try:
+ if not msg.endswith('\n'):
+ msg += '\n'
+ loglock.acquire()
+ try:
+ logfile = open(LOGFILE, 'a')
+ t = time.time()
+ print >>logfile, t
+ print >>logfile, time.asctime(time.gmtime(t))
+ print >>logfile, msg
+ finally:
+ loglock.release()
+ except:
+ if DEBUGLEVEL > 0:
+ traceback.print_exc()
+
+
+def logexception():
+ """Log an exception."""
+ log(traceback.format_exc())
+
+########################################
+
+if DEBUGLEVEL > 0:
+ # smaller time units so we can test faster
+ ONEMINUTE = 1
+ ONEHOUR = 10 * ONEMINUTE
+else:
+ ONEMINUTE = 60
+ ONEHOUR = 60 * ONEMINUTE
+
+
+class Periodic:
+ """Periodically make a function call."""
+
+ def __init__(self, target, interval, mindelta, maxdelta):
+ self._target = target
+ self._interval = interval
+ self._deltarange = mindelta, maxdelta+1
+ thr = threading.Thread(target=self.run, args=[target])
+ thr.setDaemon(True)
+ thr.start()
+
+ def run(self, target):
+ nextintervalstart = int(time.time() / self._interval) * self._interval
+ while True:
+ try:
+ self._target()
+ except:
+ logexception()
+ nextintervalstart += self._interval
+ nextfiring = nextintervalstart + random.randrange(*self._deltarange)
+ while True:
+ t = time.time()
+ if t < nextfiring:
+ try:
+ time.sleep(nextfiring - t)
+ except:
+ logexception()
+ else:
+ break
+
+########################################
+
+SLOTDURATION = ONEHOUR
+
+SCHEDULEURL = 'XXXSITEXXX/planetlab/sirius/schedule.txt'
+
+schedulelock = threading.Lock()
+
+schedule = {}
+
+
+def currentslot():
+ return int(time.time() / SLOTDURATION) * SLOTDURATION
+
+
+def updateschedule():
+ """Make one attempt at downloading and updating the schedule."""
+ log('Contacting PLC...')
+ newschedule = {}
+ # Format is:
+ # timestamp
+ # slicename - starttime - -
+ # ...
+ if DEBUGLEVEL > 0:
+ f = open('/tmp/schedule.txt')
+ else:
+ f = urllib.urlopen(SCHEDULEURL)
+ for line in f:
+ fields = line.split()
+ if len(fields) >= 3:
+ newschedule[fields[2]] = fields[0]
+ log('Current schedule is %s' % newschedule)
+
+ schedulelock.acquire()
+ try:
+ schedule.clear()
+ schedule.update(newschedule)
+ finally:
+ schedulelock.release()
+ log('Updated schedule successfully')
+
+########################################
+
+nodemanager = ServerProxy('http://127.0.0.1:812/')
+
+recipientcond = threading.Condition()
+
+recipient = ''
+versionnumber = 0
+
+def updateloans():
+ log('Contacting NodeManager...')
+ schedulelock.acquire()
+ try:
+ newrecipient = schedule.get(str(currentslot()), '')
+ finally:
+ schedulelock.release()
+ if newrecipient:
+ loans = [(newrecipient, 'cpu_pct', 25), (newrecipient, 'net_min_rate', 2000)]
+ else:
+ loans = []
+ log('Current loans are %s' % loans)
+
+ if DEBUGLEVEL < 2:
+ nodemanager.SetLoans('princeton_sirius', loans)
+ log('Updated loans successfully')
+
+ recipientcond.acquire()
+ try:
+ global recipient, versionnumber
+ if recipient != newrecipient:
+ recipient = newrecipient
+ versionnumber += 1
+ recipientcond.notifyAll()
+ finally:
+ recipientcond.release()
+
+########################################
+
+backoff = 1
+
+def success():
+ global backoff
+ backoff = 1
+
+def failure():
+ global backoff
+ try:
+ time.sleep(backoff)
+ except:
+ logexception()
+ backoff = min(backoff*2, 5*ONEMINUTE)
+
+
+def handleclient(clientsock, clientaddress):
+ try:
+ log('Connection from %s:%d' % clientaddress)
+ clientsock.shutdown(socket.SHUT_RD)
+ recipientcond.acquire()
+ while True:
+ recip, vn = recipient, versionnumber
+ recipientcond.release()
+ clientsock.send(recip + '\n')
+
+ recipientcond.acquire()
+ while versionnumber == vn:
+ recipientcond.wait()
+ except:
+ logexception()
+
+
+def server():
+ while True:
+ try:
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(('', 8124))
+ sock.listen(5)
+ success()
+ break
+ except:
+ logexception()
+ failure()
+ log('Bound server socket')
+
+ while True:
+ try:
+ client = sock.accept()
+ threading.Thread(target=handleclient, args=client).start()
+ success()
+ except:
+ logexception()
+ failure()
+
+########################################
+
+if DEBUGLEVEL < 2:
+ PIDFILE = '/tmp/sirius.pid'
+else:
+ PIDFILE = 'sirius.pid'
+
+try:
+ if os.fork():
+ sys.exit(0)
+ f = open(PIDFILE, 'w')
+ fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+except:
+ logexception()
+ sys.exit(1)
+
+Periodic(updateschedule, SLOTDURATION, -5*ONEMINUTE, -1*ONEMINUTE)
+Periodic(updateloans, 5*ONEMINUTE, 0, 0)
+server()