Adding Linux Application scalability tests
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import random
4 import sys
5 import time
6 import threading
7
8 from neco.util import guid
9 from neco.util.parallel import ParallelRun
10 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
11 from neco.execution.resource import ResourceFactory, ResourceAction, \
12         ResourceState
13 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
14 from neco.execution.trace import TraceAttr
15
16 # TODO: use multiprocessing instead of threading
17 # TODO: Improve speed. Too slow... !!
18 # TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!!
19
20 class ECState(object):
21     RUNNING = 1
22     FAILED = 2
23     TERMINATED = 3
24
25 class ExperimentController(object):
26     def __init__(self, exp_id = None, root_dir = "/tmp"): 
27         super(ExperimentController, self).__init__()
28         # root directory to store files
29         self._root_dir = root_dir
30
31         # experiment identifier given by the user
32         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
33
34         # generator of globally unique ids
35         self._guid_generator = guid.GuidGenerator()
36         
37         # Resource managers
38         self._resources = dict()
39
40         # Scheduler
41         self._scheduler = HeapScheduler()
42
43         # Tasks
44         self._tasks = dict()
45
46         # Event processing thread
47         self._cond = threading.Condition()
48         self._thread = threading.Thread(target = self._process)
49         self._thread.setDaemon(True)
50         self._thread.start()
51
52         # EC state
53         self._state = ECState.RUNNING
54
55         # Logging
56         self._logger = logging.getLogger("ExperimentController")
57
58     @property
59     def logger(self):
60         return self._logger
61
62     @property
63     def ecstate(self):
64         return self._state
65
66     @property
67     def exp_id(self):
68         exp_id = self._exp_id
69         if not exp_id.startswith("nepi-"):
70             exp_id = "nepi-" + exp_id
71         return exp_id
72
73     @property
74     def finished(self):
75         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
76
77     def wait_finished(self, guids):
78         while not all([self.state(guid) == ResourceState.FINISHED \
79                 for guid in guids]) and not self.finished:
80             # We keep the sleep as large as possible to 
81             # decrese the number of RM state requests
82             time.sleep(2)
83     
84     def get_task(self, tid):
85         return self._tasks.get(tid)
86
87     def get_resource(self, guid):
88         return self._resources.get(guid)
89
90     @property
91     def resources(self):
92         return self._resources.keys()
93
94     def register_resource(self, rtype, guid = None):
95         # Get next available guid
96         guid = self._guid_generator.next(guid)
97         
98         # Instantiate RM
99         rm = ResourceFactory.create(rtype, self, guid)
100
101         # Store RM
102         self._resources[guid] = rm
103
104         return guid
105
106     def get_attributes(self, guid):
107         rm = self.get_resource(guid)
108         return rm.get_attributes()
109
110     def get_filters(self, guid):
111         rm = self.get_resource(guid)
112         return rm.get_filters()
113
114     def register_connection(self, guid1, guid2):
115         rm1 = self.get_resource(guid1)
116         rm2 = self.get_resource(guid2)
117
118         rm1.connect(guid2)
119         rm2.connect(guid1)
120
121     def register_condition(self, group1, action, group2, state,
122             time = None):
123         """ Registers an action START or STOP for all RM on group1 to occur 
124             time 'time' after all elements in group2 reached state 'state'.
125
126             :param group1: List of guids of RMs subjected to action
127             :type group1: list
128
129             :param action: Action to register (either START or STOP)
130             :type action: ResourceAction
131
132             :param group2: List of guids of RMs to we waited for
133             :type group2: list
134
135             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
136             :type state: ResourceState
137
138             :param time: Time to wait after group2 has reached status 
139             :type time: string
140
141         """
142         if isinstance(group1, int):
143             group1 = [group1]
144         if isinstance(group2, int):
145             group2 = [group2]
146
147         for guid1 in group1:
148             rm = self.get_resource(guid1)
149             rm.register_condition(action, group2, state, time)
150
151     def register_trace(self, guid, name):
152         """ Enable trace
153
154         :param name: Name of the trace
155         :type name: str
156         """
157         rm = self.get_resource(guid)
158         rm.register_trace(name)
159
160     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
161         """ Get information on collected trace
162
163         :param name: Name of the trace
164         :type name: str
165
166         :param attr: Can be one of:
167                          - TraceAttr.ALL (complete trace content), 
168                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
169                          - TraceAttr.PATH (full path to the trace file),
170                          - TraceAttr.SIZE (size of trace file). 
171         :type attr: str
172
173         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
174         :type name: int
175
176         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
177         :type name: int
178
179         :rtype: str
180         """
181         rm = self.get_resource(guid)
182         return rm.trace(name, attr, block, offset)
183
184     def discover(self, guid, filters):
185         rm = self.get_resource(guid)
186         return rm.discover(filters)
187
188     def provision(self, guid, filters):
189         rm = self.get_resource(guid)
190         return rm.provision(filters)
191
192     def get(self, guid, name):
193         rm = self.get_resource(guid)
194         return rm.get(name)
195
196     def set(self, guid, name, value):
197         rm = self.get_resource(guid)
198         return rm.set(name, value)
199
200     def state(self, guid):
201         rm = self.get_resource(guid)
202         return rm.state
203
204     def stop(self, guid):
205         rm = self.get_resource(guid)
206         return rm.stop()
207
208     def start(self, guid):
209         rm = self.get_resource(guid)
210         return rm.start()
211
212     def set_with_conditions(self, name, value, group1, group2, state,
213             time = None):
214         """ Set value 'value' on attribute with name 'name' on all RMs of
215             group1 when 'time' has elapsed since all elements in group2 
216             have reached state 'state'.
217
218             :param name: Name of attribute to set in RM
219             :type name: string
220
221             :param value: Value of attribute to set in RM
222             :type name: string
223
224             :param group1: List of guids of RMs subjected to action
225             :type group1: list
226
227             :param action: Action to register (either START or STOP)
228             :type action: ResourceAction
229
230             :param group2: List of guids of RMs to we waited for
231             :type group2: list
232
233             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
234             :type state: ResourceState
235
236             :param time: Time to wait after group2 has reached status 
237             :type time: string
238
239         """
240         if isinstance(group1, int):
241             group1 = [group1]
242         if isinstance(group2, int):
243             group2 = [group2]
244
245         for guid1 in group1:
246             rm = self.get_resource(guid)
247             rm.set_with_conditions(name, value, group2, state, time)
248
249     def stop_with_conditions(self, guid):
250         rm = self.get_resource(guid)
251         return rm.stop_with_conditions()
252
253     def start_with_conditions(self, guid):
254         rm = self.get_resource(guid)
255         return rm.start_with_condition()
256
257     def deploy(self, group = None, wait_all_ready = True):
258         """ Deploy all resource manager in group
259
260         :param group: List of guids of RMs to deploy
261         :type group: list
262
263         :param wait_all_ready: Wait until all RMs are ready in
264             order to start the RMs
265         :type guid: int
266
267         """
268         self.logger.debug(" ------- DEPLOY START ------ ")
269
270         stop = []
271
272         def steps(rm):
273             try:
274                 rm.deploy()
275                 rm.start_with_conditions()
276
277                 # Only if the RM has STOP consitions we
278                 # schedule a stop. Otherwise the RM will stop immediately
279                 if rm.conditions.get(ResourceAction.STOP):
280                     rm.stop_with_conditions()
281             except:
282                 import traceback
283                 err = traceback.format_exc()
284                 
285                 self._logger.error("Error occurred while deploying resources: %s" % err)
286
287                 # stop deployment
288                 stop.append(None)
289
290         if not group:
291             group = self.resources
292
293         # Before starting deployment we disorder the group list with the
294         # purpose of speeding up the whole deployment process.
295         # It is likely that the user inserted in the 'group' list closely
296         # resources resources one after another (e.g. all applications
297         # connected to the same node can likely appear one after another).
298         # This can originate a slow down in the deployment since the N 
299         # threads the parallel runner uses to processes tasks may all
300         # be taken up by the same family of resources waiting for the 
301         # same conditions. 
302         # If we disorder the group list, this problem can be mitigated
303         random.shuffle(group)
304
305         threads = []
306         for guid in group:
307             rm = self.get_resource(guid)
308
309             if wait_all_ready:
310                 towait = list(group)
311                 towait.remove(guid)
312                 self.register_condition(guid, ResourceAction.START, 
313                         towait, ResourceState.READY)
314
315             thread = threading.Thread(target = steps, args = (rm,))
316             threads.append(thread)
317             thread.setDaemon(True)
318             thread.start()
319
320         while list(threads) and not self.finished and not stop:
321             thread = threads[0]
322             # Time out after 5 seconds to check EC not terminated
323             thread.join(1)
324             if not thread.is_alive():
325                 threads.remove(thread)
326
327         if stop:
328             # stop the scheduler
329             self._stop_scheduler()
330
331             if self._thread.is_alive():
332                self._thread.join()
333
334             raise RuntimeError, "Error occurred, interrupting deployment " 
335
336     def release(self, group = None):
337         if not group:
338             group = self.resources
339
340         threads = []
341         for guid in group:
342             rm = self.get_resource(guid)
343             thread = threading.Thread(target=rm.release)
344             threads.append(thread)
345             thread.setDaemon(True)
346             thread.start()
347
348         while list(threads) and not self.finished:
349             thread = threads[0]
350             # Time out after 5 seconds to check EC not terminated
351             thread.join(5)
352             if not thread.is_alive():
353                 threads.remove(thread)
354
355     def shutdown(self):
356         self.release()
357
358         self._stop_scheduler()
359         
360         if self._thread.is_alive():
361            self._thread.join()
362
363     def schedule(self, date, callback, track = False):
364         """ Schedule a callback to be executed at time date.
365
366             date    string containing execution time for the task.
367                     It can be expressed as an absolute time, using
368                     timestamp format, or as a relative time matching
369                     ^\d+.\d+(h|m|s|ms|us)$
370
371             callback    code to be executed for the task. Must be a
372                         Python function, and receives args and kwargs
373                         as arguments.
374
375             track   if set to True, the task will be retrivable with
376                     the get_task() method
377         """
378         timestamp = strfvalid(date)
379         
380         task = Task(timestamp, callback)
381         task = self._scheduler.schedule(task)
382
383         if track:
384             self._tasks[task.id] = task
385   
386         # Notify condition to wake up the processing thread
387         self._cond.acquire()
388         self._cond.notify()
389         self._cond.release()
390
391         return task.id
392      
393     def _process(self):
394         runner = ParallelRun(maxthreads = 50)
395         runner.start()
396
397         try:
398             while not self.finished:
399                 self._cond.acquire()
400                 task = self._scheduler.next()
401                 self._cond.release()
402
403                 if not task:
404                     # It there are not tasks in the tasks queue we need to 
405                     # wait until a call to schedule wakes us up
406                     self._cond.acquire()
407                     self._cond.wait()
408                     self._cond.release()
409                 else: 
410                     # If the task timestamp is in the future the thread needs to wait
411                     # until time elapse or until another task is scheduled
412                     now = strfnow()
413                     if now < task.timestamp:
414                         # Calculate time difference in seconds
415                         timeout = strfdiff(task.timestamp, now)
416                         # Re-schedule task with the same timestamp
417                         self._scheduler.schedule(task)
418                         # Sleep until timeout or until a new task awakes the condition
419                         self._cond.acquire()
420                         self._cond.wait(timeout)
421                         self._cond.release()
422                     else:
423                         # Process tasks in parallel
424                         runner.put(self._execute, task)
425                 
426         except: 
427             import traceback
428             err = traceback.format_exc()
429             self._logger.error("Error while processing tasks in the EC: %s" % err)
430
431             self._state = ECState.FAILED
432         finally:
433             runner.sync()
434    
435         # Mark EC state as terminated
436         if self.ecstate == ECState.RUNNING:
437             self._state = ECState.TERMINATED
438
439     def _execute(self, task):
440         # Invoke callback
441         task.status = TaskStatus.DONE
442
443         try:
444             task.result = task.callback()
445         except:
446             import traceback
447             err = traceback.format_exc()
448             task.result = err
449             task.status = TaskStatus.ERROR
450             
451             self._logger.error("Error occurred while executing task: %s" % err)
452
453             self._stop_scheduler()
454
455             # Propage error to the ParallelRunner
456             raise
457
458     def _stop_scheduler(self):
459         # Mark the EC as failed
460         self._state = ECState.FAILED
461
462         # Wake up the EC in case it was sleeping
463         self._cond.acquire()
464         self._cond.notify()
465         self._cond.release()
466
467