parsed by db-config. puts entry into default initscripts
authorFaiyaz Ahmed <faiyaza@cs.princeton.edu>
Wed, 5 Mar 2008 01:51:05 +0000 (01:51 +0000)
committerFaiyaz Ahmed <faiyaza@cs.princeton.edu>
Wed, 5 Mar 2008 01:51:05 +0000 (01:51 +0000)
plc_sliceinitscripts/sirius.py [new file with mode: 0644]

diff --git a/plc_sliceinitscripts/sirius.py b/plc_sliceinitscripts/sirius.py
new file mode 100644 (file)
index 0000000..fe68581
--- /dev/null
@@ -0,0 +1,257 @@
+#!/usr/bin/python
+
+"""The Sirius Calendar Service.
+
+This Python program runs on each node.  It periodically downloads the schedule file and uses NodeManager's XML-RPC interface to adjust the priority increase.
+
+Author: David Eisenstat (deisenst@cs.princeton.edu)
+
+Original Sirius implementation by David Lowenthal.
+"""
+
+import fcntl
+import os
+import random
+import signal
+import socket
+import sys
+import threading
+import time
+import traceback
+import urllib
+from xmlrpclib import ServerProxy
+
+
+# 0 means normal operation
+# 1 means turn on the short time scales and read the schedule from a file
+# 2 means additionally don't contact NodeManager
+
+DEBUGLEVEL = 0
+
+########################################
+
+if DEBUGLEVEL < 2:
+    LOGFILE = '/var/log/sirius'
+else:
+    LOGFILE = 'log.txt'
+
+loglock = threading.Lock()
+
+
+def log(msg):
+    """Append <msg> and a timestamp to <LOGFILE>."""
+    try:
+        if not msg.endswith('\n'):
+            msg += '\n'
+        loglock.acquire()
+        try:
+            logfile = open(LOGFILE, 'a')
+            t = time.time()
+            print >>logfile, t
+            print >>logfile, time.asctime(time.gmtime(t))
+            print >>logfile, msg
+        finally:
+            loglock.release()
+    except:
+        if DEBUGLEVEL > 0:
+            traceback.print_exc()
+
+
+def logexception():
+    """Log an exception."""
+    log(traceback.format_exc())
+
+########################################
+
+if DEBUGLEVEL > 0:
+    # smaller time units so we can test faster
+    ONEMINUTE = 1
+    ONEHOUR = 10 * ONEMINUTE
+else:
+    ONEMINUTE = 60
+    ONEHOUR = 60 * ONEMINUTE
+
+
+class Periodic:
+    """Periodically make a function call."""
+
+    def __init__(self, target, interval, mindelta, maxdelta):
+        self._target = target
+        self._interval = interval
+        self._deltarange = mindelta, maxdelta+1
+        thr = threading.Thread(target=self.run, args=[target])
+        thr.setDaemon(True)
+        thr.start()
+
+    def run(self, target):
+        nextintervalstart = int(time.time() / self._interval) * self._interval
+        while True:
+            try:
+                self._target()
+            except:
+                logexception()
+            nextintervalstart += self._interval
+            nextfiring = nextintervalstart + random.randrange(*self._deltarange)
+            while True:
+                t = time.time()
+                if t < nextfiring:
+                    try:
+                        time.sleep(nextfiring - t)
+                    except:
+                        logexception()
+                else:
+                    break
+
+########################################
+
+SLOTDURATION = ONEHOUR
+
+SCHEDULEURL = 'XXXSITEXXX/planetlab/sirius/schedule.txt'
+
+schedulelock = threading.Lock()
+
+schedule = {}
+
+
+def currentslot():
+    return int(time.time() / SLOTDURATION) * SLOTDURATION
+
+
+def updateschedule():
+    """Make one attempt at downloading and updating the schedule."""
+    log('Contacting PLC...')
+    newschedule = {}
+    # Format is:
+    # timestamp
+    # slicename - starttime - -
+    # ...
+    if DEBUGLEVEL > 0:
+        f = open('/tmp/schedule.txt')
+    else:
+        f = urllib.urlopen(SCHEDULEURL)
+    for line in f:
+        fields = line.split()
+        if len(fields) >= 3:
+            newschedule[fields[2]] = fields[0]
+    log('Current schedule is %s' % newschedule)
+
+    schedulelock.acquire()
+    try:
+        schedule.clear()
+        schedule.update(newschedule)
+    finally:
+        schedulelock.release()
+    log('Updated schedule successfully')
+
+########################################
+
+nodemanager = ServerProxy('http://127.0.0.1:812/')
+
+recipientcond = threading.Condition()
+
+recipient = ''
+versionnumber = 0
+
+def updateloans():
+    log('Contacting NodeManager...')
+    schedulelock.acquire()
+    try:
+        newrecipient = schedule.get(str(currentslot()), '')
+    finally:
+        schedulelock.release()
+    if newrecipient:
+        loans = [(newrecipient, 'cpu_pct', 25), (newrecipient, 'net_min_rate', 2000)]
+    else:
+        loans = []
+    log('Current loans are %s' % loans)
+
+    if DEBUGLEVEL < 2:
+        nodemanager.SetLoans('princeton_sirius', loans)
+    log('Updated loans successfully')
+
+    recipientcond.acquire()
+    try:
+        global recipient, versionnumber
+        if recipient != newrecipient:
+            recipient = newrecipient
+            versionnumber += 1
+            recipientcond.notifyAll()
+    finally:
+        recipientcond.release()
+
+########################################
+
+backoff = 1
+
+def success():
+    global backoff
+    backoff = 1
+
+def failure():
+    global backoff
+    try:
+        time.sleep(backoff)
+    except:
+        logexception()
+    backoff = min(backoff*2, 5*ONEMINUTE)
+
+
+def handleclient(clientsock, clientaddress):
+    try:
+        log('Connection from %s:%d' % clientaddress)
+        clientsock.shutdown(socket.SHUT_RD)
+        recipientcond.acquire()
+        while True:
+            recip, vn = recipient, versionnumber
+            recipientcond.release()
+            clientsock.send(recip + '\n')
+
+            recipientcond.acquire()
+            while versionnumber == vn:
+                recipientcond.wait()
+    except:
+        logexception()
+
+
+def server():
+    while True:
+        try:
+            sock = socket.socket()
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            sock.bind(('', 8124))
+            sock.listen(5)
+            success()
+            break
+        except:
+            logexception()
+            failure()
+    log('Bound server socket')
+
+    while True:
+        try:
+            client = sock.accept()
+            threading.Thread(target=handleclient, args=client).start()
+            success()
+        except:
+            logexception()
+            failure()
+
+########################################
+
+if DEBUGLEVEL < 2:
+    PIDFILE = '/tmp/sirius.pid'
+else:
+    PIDFILE = 'sirius.pid'
+
+try:
+    if os.fork():
+        sys.exit(0)
+    f = open(PIDFILE, 'w')
+    fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+except:
+    logexception()
+    sys.exit(1)
+
+Periodic(updateschedule, SLOTDURATION, -5*ONEMINUTE, -1*ONEMINUTE)
+Periodic(updateloans, 5*ONEMINUTE, 0, 0)
+server()