3 """The Sirius Calendar Service.
5 This Python program runs on each node. It periodically downloads the schedule file and uses NodeManager's XML-RPC interface to adjust the priority increase.
7 Author: David Eisenstat (deisenst@cs.princeton.edu)
9 Original Sirius implementation by David Lowenthal.
22 from xmlrpclib import ServerProxy
25 # 0 means normal operation
26 # 1 means turn on the short time scales and read the schedule from a file
27 # 2 means additionally don't contact NodeManager
31 ########################################
34 LOGFILE = '/var/log/sirius'
38 loglock = threading.Lock()
42 """Append <msg> and a timestamp to <LOGFILE>."""
44 if not msg.endswith('\n'):
48 logfile = open(LOGFILE, 'a')
51 print >>logfile, time.asctime(time.gmtime(t))
61 """Log an exception."""
62 log(traceback.format_exc())
64 ########################################
67 # smaller time units so we can test faster
69 ONEHOUR = 10 * ONEMINUTE
72 ONEHOUR = 60 * ONEMINUTE
76 """Periodically make a function call."""
78 def __init__(self, target, interval, mindelta, maxdelta):
80 self._interval = interval
81 self._deltarange = mindelta, maxdelta+1
82 thr = threading.Thread(target=self.run, args=[target])
86 def run(self, target):
87 nextintervalstart = int(time.time() / self._interval) * self._interval
93 nextintervalstart += self._interval
94 nextfiring = nextintervalstart + random.randrange(*self._deltarange)
99 time.sleep(nextfiring - t)
105 ########################################
107 SLOTDURATION = ONEHOUR
109 # instances of @SITE@ and @PREFIX@ below are replaced
110 # as appropriate by db-config
111 SCHEDULEURL = '@SITE@/planetlab/sirius/schedule.txt'
113 schedulelock = threading.Lock()
119 return int(time.time() / SLOTDURATION) * SLOTDURATION
122 def updateschedule():
123 """Make one attempt at downloading and updating the schedule."""
124 log('Contacting PLC...')
128 # slicename - starttime - -
131 f = open('/tmp/schedule.txt')
133 f = urllib.urlopen(SCHEDULEURL)
135 fields = line.split()
137 newschedule[fields[2]] = fields[0]
138 log('Current schedule is %s' % newschedule)
140 schedulelock.acquire()
143 schedule.update(newschedule)
145 schedulelock.release()
146 log('Updated schedule successfully')
148 ########################################
150 nodemanager = ServerProxy('http://127.0.0.1:812/')
152 recipientcond = threading.Condition()
158 log('Contacting NodeManager...')
159 schedulelock.acquire()
161 newrecipient = schedule.get(str(currentslot()), '')
163 schedulelock.release()
165 loans = [(newrecipient, 'cpu_pct', 25), (newrecipient, 'net_min_rate', 2000)]
168 log('Current loans are %s' % loans)
171 nodemanager.SetLoans('@PREFIX@_sirius', loans)
172 log('Updated loans successfully')
174 recipientcond.acquire()
176 global recipient, versionnumber
177 if recipient != newrecipient:
178 recipient = newrecipient
180 recipientcond.notifyAll()
182 recipientcond.release()
184 ########################################
198 backoff = min(backoff*2, 5*ONEMINUTE)
201 def handleclient(clientsock, clientaddress):
203 log('Connection from %s:%d' % clientaddress)
204 clientsock.shutdown(socket.SHUT_RD)
205 recipientcond.acquire()
207 recip, vn = recipient, versionnumber
208 recipientcond.release()
209 clientsock.send(recip + '\n')
211 recipientcond.acquire()
212 while versionnumber == vn:
221 sock = socket.socket()
222 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
223 sock.bind(('', 8124))
230 log('Bound server socket')
234 client = sock.accept()
235 threading.Thread(target=handleclient, args=client).start()
241 ########################################
243 def start (slicename):
245 PIDFILE = '/tmp/%s.pid'%slicename
247 PIDFILE = '%s.pid'%slicename
251 f = open(PIDFILE, 'w')
252 fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
257 Periodic(updateschedule, SLOTDURATION, -5*ONEMINUTE, -1*ONEMINUTE)
258 Periodic(updateloans, 5*ONEMINUTE, 0, 0)
262 # at the very least, do nothing
264 print '(dummy) stopping',slicename
267 def restart(slicename):
277 print 'too many arguments to initscript',args
279 elif argc==2: (mode,slicename)=args
280 elif argc==1: mode=args[0]
282 # print "sirius initscript called with mode=%s and slicename=%s"%(mode,slicename)
284 if mode=='start': start(slicename)
285 elif mode=='stop': stop(slicename)
286 elif mode=='restart': restart(slicename)
288 print "unknown mode %s"%mode
292 if __name__ == '__main__':