Setting tag myplc-5.3-5
[myplc.git] / plc_sliceinitscripts / sirius
1 #!/usr/bin/python
2
3 """The Sirius Calendar Service.
4
5 This Python program runs on each node.  It periodically downloads the schedule file and uses NodeManager's XML-RPC interface to adjust the priority increase.
6
7 Author: David Eisenstat (deisenst@cs.princeton.edu)
8
9 Original Sirius implementation by David Lowenthal.
10 """
11
12 import fcntl
13 import os
14 import random
15 import signal
16 import socket
17 import sys
18 import threading
19 import time
20 import traceback
21 import urllib
22 from xmlrpclib import ServerProxy
23
24
25 # 0 means normal operation
26 # 1 means turn on the short time scales and read the schedule from a file
27 # 2 means additionally don't contact NodeManager
28
29 DEBUGLEVEL = 0
30
31 ########################################
32
33 if DEBUGLEVEL < 2:
34     LOGFILE = '/var/log/sirius'
35 else:
36     LOGFILE = 'log.txt'
37
38 loglock = threading.Lock()
39
40
41 def log(msg):
42     """Append <msg> and a timestamp to <LOGFILE>."""
43     try:
44         if not msg.endswith('\n'):
45             msg += '\n'
46         loglock.acquire()
47         try:
48             logfile = open(LOGFILE, 'a')
49             t = time.time()
50             print >>logfile, t
51             print >>logfile, time.asctime(time.gmtime(t))
52             print >>logfile, msg
53         finally:
54             loglock.release()
55     except:
56         if DEBUGLEVEL > 0:
57             traceback.print_exc()
58
59
60 def logexception():
61     """Log an exception."""
62     log(traceback.format_exc())
63
64 ########################################
65
66 if DEBUGLEVEL > 0:
67     # smaller time units so we can test faster
68     ONEMINUTE = 1
69     ONEHOUR = 10 * ONEMINUTE
70 else:
71     ONEMINUTE = 60
72     ONEHOUR = 60 * ONEMINUTE
73
74
75 class Periodic:
76     """Periodically make a function call."""
77
78     def __init__(self, target, interval, mindelta, maxdelta):
79         self._target = target
80         self._interval = interval
81         self._deltarange = mindelta, maxdelta+1
82         thr = threading.Thread(target=self.run, args=[target])
83         thr.setDaemon(True)
84         thr.start()
85
86     def run(self, target):
87         nextintervalstart = int(time.time() / self._interval) * self._interval
88         while True:
89             try:
90                 self._target()
91             except:
92                 logexception()
93             nextintervalstart += self._interval
94             nextfiring = nextintervalstart + random.randrange(*self._deltarange)
95             while True:
96                 t = time.time()
97                 if t < nextfiring:
98                     try:
99                         time.sleep(nextfiring - t)
100                     except:
101                         logexception()
102                 else:
103                     break
104
105 ########################################
106
107 SLOTDURATION = ONEHOUR
108
109 # instances of @SITE@ and @PREFIX@ below are replaced 
110 # as appropriate by db-config
111 SCHEDULEURL = '@SITE@/planetlab/sirius/schedule.txt'
112
113 schedulelock = threading.Lock()
114
115 schedule = {}
116
117
118 def currentslot():
119     return int(time.time() / SLOTDURATION) * SLOTDURATION
120
121
122 def updateschedule():
123     """Make one attempt at downloading and updating the schedule."""
124     log('Contacting PLC...')
125     newschedule = {}
126     # Format is:
127     # timestamp
128     # slicename - starttime - -
129     # ...
130     if DEBUGLEVEL > 0:
131         f = open('/tmp/schedule.txt')
132     else:
133         f = urllib.urlopen(SCHEDULEURL)
134     for line in f:
135         fields = line.split()
136         if len(fields) >= 3:
137             newschedule[fields[2]] = fields[0]
138     log('Current schedule is %s' % newschedule)
139
140     schedulelock.acquire()
141     try:
142         schedule.clear()
143         schedule.update(newschedule)
144     finally:
145         schedulelock.release()
146     log('Updated schedule successfully')
147
148 ########################################
149
150 nodemanager = ServerProxy('http://127.0.0.1:812/')
151
152 recipientcond = threading.Condition()
153
154 recipient = ''
155 versionnumber = 0
156
157 def updateloans():
158     log('Contacting NodeManager...')
159     schedulelock.acquire()
160     try:
161         newrecipient = schedule.get(str(currentslot()), '')
162     finally:
163         schedulelock.release()
164     if newrecipient:
165         loans = [(newrecipient, 'cpu_pct', 25), (newrecipient, 'net_min_rate', 2000)]
166     else:
167         loans = []
168     log('Current loans are %s' % loans)
169
170     if DEBUGLEVEL < 2:
171         nodemanager.SetLoans('@PREFIX@_sirius', loans)
172     log('Updated loans successfully')
173
174     recipientcond.acquire()
175     try:
176         global recipient, versionnumber
177         if recipient != newrecipient:
178             recipient = newrecipient
179             versionnumber += 1
180             recipientcond.notifyAll()
181     finally:
182         recipientcond.release()
183
184 ########################################
185
186 backoff = 1
187
188 def success():
189     global backoff
190     backoff = 1
191
192 def failure():
193     global backoff
194     try:
195         time.sleep(backoff)
196     except:
197         logexception()
198     backoff = min(backoff*2, 5*ONEMINUTE)
199
200
201 def handleclient(clientsock, clientaddress):
202     try:
203         log('Connection from %s:%d' % clientaddress)
204         clientsock.shutdown(socket.SHUT_RD)
205         recipientcond.acquire()
206         while True:
207             recip, vn = recipient, versionnumber
208             recipientcond.release()
209             clientsock.send(recip + '\n')
210
211             recipientcond.acquire()
212             while versionnumber == vn:
213                 recipientcond.wait()
214     except:
215         logexception()
216
217
218 def server():
219     while True:
220         try:
221             sock = socket.socket()
222             sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
223             sock.bind(('', 8124))
224             sock.listen(5)
225             success()
226             break
227         except:
228             logexception()
229             failure()
230     log('Bound server socket')
231
232     while True:
233         try:
234             client = sock.accept()
235             threading.Thread(target=handleclient, args=client).start()
236             success()
237         except:
238             logexception()
239             failure()
240
241 ########################################
242
243 def start (slicename):
244     if DEBUGLEVEL < 2:
245         PIDFILE = '/tmp/%s.pid'%slicename
246     else:
247         PIDFILE = '%s.pid'%slicename
248     try:
249         if os.fork():
250             sys.exit(0)
251         f = open(PIDFILE, 'w')
252         fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
253     except:
254         logexception()
255         sys.exit(1)
256
257     Periodic(updateschedule, SLOTDURATION, -5*ONEMINUTE, -1*ONEMINUTE)
258     Periodic(updateloans, 5*ONEMINUTE, 0, 0)
259     server()
260
261 # xxx fixme
262 # at the very least, do nothing 
263 def stop(slicename):
264     print '(dummy) stopping',slicename
265     pass
266
267 def restart(slicename):
268     stop(slicename)
269     start(slicename)
270
271 def main():
272     args=sys.argv[1:]
273     mode='start'
274     slicename='sirius'
275     argc=len(args)
276     if argc>=3: 
277         print 'too many arguments to initscript',args
278         sys.exit(1)
279     elif argc==2: (mode,slicename)=args
280     elif argc==1: mode=args[0]
281     
282 #    print "sirius initscript called with mode=%s and slicename=%s"%(mode,slicename)
283
284     if mode=='start': start(slicename)
285     elif mode=='stop': stop(slicename)
286     elif mode=='restart': restart(slicename)
287     else: 
288         print "unknown mode %s"%mode
289         sys.exit(1)
290     sys.exit(0)
291
292 if __name__ == '__main__':
293     main()