3 """The Sirius Calendar Service.
5 This Python program runs on each node. It periodically downloads the schedule file and uses NodeManager's XML-RPC interface to adjust the priority increase.
7 Author: David Eisenstat (deisenst@cs.princeton.edu)
9 Original Sirius implementation by David Lowenthal.
22 from xmlrpclib import ServerProxy
25 # 0 means normal operation
26 # 1 means turn on the short time scales and read the schedule from a file
27 # 2 means additionally don't contact NodeManager
31 ########################################
34 LOGFILE = '/var/log/sirius'
38 loglock = threading.Lock()
42 """Append <msg> and a timestamp to <LOGFILE>."""
44 if not msg.endswith('\n'):
48 logfile = open(LOGFILE, 'a')
51 print >>logfile, time.asctime(time.gmtime(t))
61 """Log an exception."""
62 log(traceback.format_exc())
64 ########################################
67 # smaller time units so we can test faster
69 ONEHOUR = 10 * ONEMINUTE
72 ONEHOUR = 60 * ONEMINUTE
76 """Periodically make a function call."""
78 def __init__(self, target, interval, mindelta, maxdelta):
80 self._interval = interval
81 self._deltarange = mindelta, maxdelta+1
82 thr = threading.Thread(target=self.run, args=[target])
86 def run(self, target):
87 nextintervalstart = int(time.time() / self._interval) * self._interval
93 nextintervalstart += self._interval
94 nextfiring = nextintervalstart + random.randrange(*self._deltarange)
99 time.sleep(nextfiring - t)
105 ########################################
107 SLOTDURATION = ONEHOUR
109 SCHEDULEURL = 'XXXSITEXXX/planetlab/sirius/schedule.txt'
111 schedulelock = threading.Lock()
117 return int(time.time() / SLOTDURATION) * SLOTDURATION
120 def updateschedule():
121 """Make one attempt at downloading and updating the schedule."""
122 log('Contacting PLC...')
126 # slicename - starttime - -
129 f = open('/tmp/schedule.txt')
131 f = urllib.urlopen(SCHEDULEURL)
133 fields = line.split()
135 newschedule[fields[2]] = fields[0]
136 log('Current schedule is %s' % newschedule)
138 schedulelock.acquire()
141 schedule.update(newschedule)
143 schedulelock.release()
144 log('Updated schedule successfully')
146 ########################################
148 nodemanager = ServerProxy('http://127.0.0.1:812/')
150 recipientcond = threading.Condition()
156 log('Contacting NodeManager...')
157 schedulelock.acquire()
159 newrecipient = schedule.get(str(currentslot()), '')
161 schedulelock.release()
163 loans = [(newrecipient, 'cpu_pct', 25), (newrecipient, 'net_min_rate', 2000)]
166 log('Current loans are %s' % loans)
169 nodemanager.SetLoans('princeton_sirius', loans)
170 log('Updated loans successfully')
172 recipientcond.acquire()
174 global recipient, versionnumber
175 if recipient != newrecipient:
176 recipient = newrecipient
178 recipientcond.notifyAll()
180 recipientcond.release()
182 ########################################
196 backoff = min(backoff*2, 5*ONEMINUTE)
199 def handleclient(clientsock, clientaddress):
201 log('Connection from %s:%d' % clientaddress)
202 clientsock.shutdown(socket.SHUT_RD)
203 recipientcond.acquire()
205 recip, vn = recipient, versionnumber
206 recipientcond.release()
207 clientsock.send(recip + '\n')
209 recipientcond.acquire()
210 while versionnumber == vn:
219 sock = socket.socket()
220 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
221 sock.bind(('', 8124))
228 log('Bound server socket')
232 client = sock.accept()
233 threading.Thread(target=handleclient, args=client).start()
239 ########################################
242 PIDFILE = '/tmp/sirius.pid'
244 PIDFILE = 'sirius.pid'
249 f = open(PIDFILE, 'w')
250 fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
255 Periodic(updateschedule, SLOTDURATION, -5*ONEMINUTE, -1*ONEMINUTE)
256 Periodic(updateloans, 5*ONEMINUTE, 0, 0)