fix slice prefix.
[myplc.git] / plc_sliceinitscripts / sirius
1 #!/usr/bin/python
2
3 """The Sirius Calendar Service.
4
5 This Python program runs on each node.  It periodically downloads the schedule file and uses NodeManager's XML-RPC interface to adjust the priority increase.
6
7 Author: David Eisenstat (deisenst@cs.princeton.edu)
8
9 Original Sirius implementation by David Lowenthal.
10 """
11
12 import fcntl
13 import os
14 import random
15 import signal
16 import socket
17 import sys
18 import threading
19 import time
20 import traceback
21 import urllib
22 from xmlrpclib import ServerProxy
23
24
25 # 0 means normal operation
26 # 1 means turn on the short time scales and read the schedule from a file
27 # 2 means additionally don't contact NodeManager
28
29 DEBUGLEVEL = 0
30
31 ########################################
32
33 if DEBUGLEVEL < 2:
34     LOGFILE = '/var/log/sirius'
35 else:
36     LOGFILE = 'log.txt'
37
38 loglock = threading.Lock()
39
40
41 def log(msg):
42     """Append <msg> and a timestamp to <LOGFILE>."""
43     try:
44         if not msg.endswith('\n'):
45             msg += '\n'
46         loglock.acquire()
47         try:
48             logfile = open(LOGFILE, 'a')
49             t = time.time()
50             print >>logfile, t
51             print >>logfile, time.asctime(time.gmtime(t))
52             print >>logfile, msg
53         finally:
54             loglock.release()
55     except:
56         if DEBUGLEVEL > 0:
57             traceback.print_exc()
58
59
60 def logexception():
61     """Log an exception."""
62     log(traceback.format_exc())
63
64 ########################################
65
66 if DEBUGLEVEL > 0:
67     # smaller time units so we can test faster
68     ONEMINUTE = 1
69     ONEHOUR = 10 * ONEMINUTE
70 else:
71     ONEMINUTE = 60
72     ONEHOUR = 60 * ONEMINUTE
73
74
75 class Periodic:
76     """Periodically make a function call."""
77
78     def __init__(self, target, interval, mindelta, maxdelta):
79         self._target = target
80         self._interval = interval
81         self._deltarange = mindelta, maxdelta+1
82         thr = threading.Thread(target=self.run, args=[target])
83         thr.setDaemon(True)
84         thr.start()
85
86     def run(self, target):
87         nextintervalstart = int(time.time() / self._interval) * self._interval
88         while True:
89             try:
90                 self._target()
91             except:
92                 logexception()
93             nextintervalstart += self._interval
94             nextfiring = nextintervalstart + random.randrange(*self._deltarange)
95             while True:
96                 t = time.time()
97                 if t < nextfiring:
98                     try:
99                         time.sleep(nextfiring - t)
100                     except:
101                         logexception()
102                 else:
103                     break
104
105 ########################################
106
107 SLOTDURATION = ONEHOUR
108
109 SCHEDULEURL = 'XXXSITEXXX/planetlab/sirius/schedule.txt'
110
111 schedulelock = threading.Lock()
112
113 schedule = {}
114
115
116 def currentslot():
117     return int(time.time() / SLOTDURATION) * SLOTDURATION
118
119
120 def updateschedule():
121     """Make one attempt at downloading and updating the schedule."""
122     log('Contacting PLC...')
123     newschedule = {}
124     # Format is:
125     # timestamp
126     # slicename - starttime - -
127     # ...
128     if DEBUGLEVEL > 0:
129         f = open('/tmp/schedule.txt')
130     else:
131         f = urllib.urlopen(SCHEDULEURL)
132     for line in f:
133         fields = line.split()
134         if len(fields) >= 3:
135             newschedule[fields[2]] = fields[0]
136     log('Current schedule is %s' % newschedule)
137
138     schedulelock.acquire()
139     try:
140         schedule.clear()
141         schedule.update(newschedule)
142     finally:
143         schedulelock.release()
144     log('Updated schedule successfully')
145
146 ########################################
147
148 nodemanager = ServerProxy('http://127.0.0.1:812/')
149
150 recipientcond = threading.Condition()
151
152 recipient = ''
153 versionnumber = 0
154
155 def updateloans():
156     log('Contacting NodeManager...')
157     schedulelock.acquire()
158     try:
159         newrecipient = schedule.get(str(currentslot()), '')
160     finally:
161         schedulelock.release()
162     if newrecipient:
163         loans = [(newrecipient, 'cpu_pct', 25), (newrecipient, 'net_min_rate', 2000)]
164     else:
165         loans = []
166     log('Current loans are %s' % loans)
167
168     if DEBUGLEVEL < 2:
169         nodemanager.SetLoans('XXXPREFIXXXX_sirius', loans)
170     log('Updated loans successfully')
171
172     recipientcond.acquire()
173     try:
174         global recipient, versionnumber
175         if recipient != newrecipient:
176             recipient = newrecipient
177             versionnumber += 1
178             recipientcond.notifyAll()
179     finally:
180         recipientcond.release()
181
182 ########################################
183
184 backoff = 1
185
186 def success():
187     global backoff
188     backoff = 1
189
190 def failure():
191     global backoff
192     try:
193         time.sleep(backoff)
194     except:
195         logexception()
196     backoff = min(backoff*2, 5*ONEMINUTE)
197
198
199 def handleclient(clientsock, clientaddress):
200     try:
201         log('Connection from %s:%d' % clientaddress)
202         clientsock.shutdown(socket.SHUT_RD)
203         recipientcond.acquire()
204         while True:
205             recip, vn = recipient, versionnumber
206             recipientcond.release()
207             clientsock.send(recip + '\n')
208
209             recipientcond.acquire()
210             while versionnumber == vn:
211                 recipientcond.wait()
212     except:
213         logexception()
214
215
216 def server():
217     while True:
218         try:
219             sock = socket.socket()
220             sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
221             sock.bind(('', 8124))
222             sock.listen(5)
223             success()
224             break
225         except:
226             logexception()
227             failure()
228     log('Bound server socket')
229
230     while True:
231         try:
232             client = sock.accept()
233             threading.Thread(target=handleclient, args=client).start()
234             success()
235         except:
236             logexception()
237             failure()
238
239 ########################################
240
241 if DEBUGLEVEL < 2:
242     PIDFILE = '/tmp/sirius.pid'
243 else:
244     PIDFILE = 'sirius.pid'
245
246 try:
247     if os.fork():
248         sys.exit(0)
249     f = open(PIDFILE, 'w')
250     fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
251 except:
252     logexception()
253     sys.exit(1)
254
255 Periodic(updateschedule, SLOTDURATION, -5*ONEMINUTE, -1*ONEMINUTE)
256 Periodic(updateloans, 5*ONEMINUTE, 0, 0)
257 server()