42bb138700fdf2c71edf364fbc5c331a53b957ec
[nepi.git] / src / nepi / execution / ec.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 import logging
21 import os
22 import random
23 import sys
24 import time
25 import threading
26
27 from nepi.util import guid
28 from nepi.util.parallel import ParallelRun
29 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
30 from nepi.execution.resource import ResourceFactory, ResourceAction, \
31         ResourceState
32 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
33 from nepi.execution.trace import TraceAttr
34
35 # TODO: use multiprocessing instead of threading
36 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
37
38 class ECState(object):
39     RUNNING = 1
40     FAILED = 2
41     TERMINATED = 3
42
43 class ExperimentController(object):
44     def __init__(self, exp_id = None, root_dir = "/tmp"): 
45         super(ExperimentController, self).__init__()
46         # root directory to store files
47         self._root_dir = root_dir
48
49         # experiment identifier given by the user
50         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
51
52         # generator of globally unique ids
53         self._guid_generator = guid.GuidGenerator()
54         
55         # Resource managers
56         self._resources = dict()
57
58         # Scheduler
59         self._scheduler = HeapScheduler()
60
61         # Tasks
62         self._tasks = dict()
63
64         # Event processing thread
65         self._cond = threading.Condition()
66         self._thread = threading.Thread(target = self._process)
67         self._thread.setDaemon(True)
68         self._thread.start()
69
70         # EC state
71         self._state = ECState.RUNNING
72
73         # Logging
74         self._logger = logging.getLogger("ExperimentController")
75
76     @property
77     def logger(self):
78         return self._logger
79
80     @property
81     def ecstate(self):
82         return self._state
83
84     @property
85     def exp_id(self):
86         exp_id = self._exp_id
87         if not exp_id.startswith("nepi-"):
88             exp_id = "nepi-" + exp_id
89         return exp_id
90
91     @property
92     def finished(self):
93         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
94
95     def wait_finished(self, guids):
96         while not all([self.state(guid) == ResourceState.FINISHED \
97                 for guid in guids]) and not self.finished:
98             # We keep the sleep as large as possible to 
99             # decrese the number of RM state requests
100             time.sleep(2)
101     
102     def get_task(self, tid):
103         return self._tasks.get(tid)
104
105     def get_resource(self, guid):
106         return self._resources.get(guid)
107
108     @property
109     def resources(self):
110         return self._resources.keys()
111
112     def register_resource(self, rtype, guid = None):
113         # Get next available guid
114         guid = self._guid_generator.next(guid)
115         
116         # Instantiate RM
117         rm = ResourceFactory.create(rtype, self, guid)
118
119         # Store RM
120         self._resources[guid] = rm
121
122         return guid
123
124     def get_attributes(self, guid):
125         rm = self.get_resource(guid)
126         return rm.get_attributes()
127
128     def get_filters(self, guid):
129         rm = self.get_resource(guid)
130         return rm.get_filters()
131
132     def register_connection(self, guid1, guid2):
133         rm1 = self.get_resource(guid1)
134         rm2 = self.get_resource(guid2)
135
136         rm1.connect(guid2)
137         rm2.connect(guid1)
138
139     def register_condition(self, group1, action, group2, state,
140             time = None):
141         """ Registers an action START or STOP for all RM on group1 to occur 
142             time 'time' after all elements in group2 reached state 'state'.
143
144             :param group1: List of guids of RMs subjected to action
145             :type group1: list
146
147             :param action: Action to register (either START or STOP)
148             :type action: ResourceAction
149
150             :param group2: List of guids of RMs to we waited for
151             :type group2: list
152
153             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
154             :type state: ResourceState
155
156             :param time: Time to wait after group2 has reached status 
157             :type time: string
158
159         """
160         if isinstance(group1, int):
161             group1 = [group1]
162         if isinstance(group2, int):
163             group2 = [group2]
164
165         for guid1 in group1:
166             rm = self.get_resource(guid1)
167             rm.register_condition(action, group2, state, time)
168
169     def register_trace(self, guid, name):
170         """ Enable trace
171
172         :param name: Name of the trace
173         :type name: str
174         """
175         rm = self.get_resource(guid)
176         rm.register_trace(name)
177
178     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
179         """ Get information on collected trace
180
181         :param name: Name of the trace
182         :type name: str
183
184         :param attr: Can be one of:
185                          - TraceAttr.ALL (complete trace content), 
186                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
187                          - TraceAttr.PATH (full path to the trace file),
188                          - TraceAttr.SIZE (size of trace file). 
189         :type attr: str
190
191         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
192         :type name: int
193
194         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
195         :type name: int
196
197         :rtype: str
198         """
199         rm = self.get_resource(guid)
200         return rm.trace(name, attr, block, offset)
201
202     def discover(self, guid, filters):
203         rm = self.get_resource(guid)
204         return rm.discover(filters)
205
206     def provision(self, guid, filters):
207         rm = self.get_resource(guid)
208         return rm.provision(filters)
209
210     def get(self, guid, name):
211         rm = self.get_resource(guid)
212         return rm.get(name)
213
214     def set(self, guid, name, value):
215         rm = self.get_resource(guid)
216         return rm.set(name, value)
217
218     def state(self, guid):
219         rm = self.get_resource(guid)
220         return rm.state
221
222     def stop(self, guid):
223         rm = self.get_resource(guid)
224         return rm.stop()
225
226     def start(self, guid):
227         rm = self.get_resource(guid)
228         return rm.start()
229
230     def set_with_conditions(self, name, value, group1, group2, state,
231             time = None):
232         """ Set value 'value' on attribute with name 'name' on all RMs of
233             group1 when 'time' has elapsed since all elements in group2 
234             have reached state 'state'.
235
236             :param name: Name of attribute to set in RM
237             :type name: string
238
239             :param value: Value of attribute to set in RM
240             :type name: string
241
242             :param group1: List of guids of RMs subjected to action
243             :type group1: list
244
245             :param action: Action to register (either START or STOP)
246             :type action: ResourceAction
247
248             :param group2: List of guids of RMs to we waited for
249             :type group2: list
250
251             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
252             :type state: ResourceState
253
254             :param time: Time to wait after group2 has reached status 
255             :type time: string
256
257         """
258         if isinstance(group1, int):
259             group1 = [group1]
260         if isinstance(group2, int):
261             group2 = [group2]
262
263         for guid1 in group1:
264             rm = self.get_resource(guid)
265             rm.set_with_conditions(name, value, group2, state, time)
266
267     def stop_with_conditions(self, guid):
268         rm = self.get_resource(guid)
269         return rm.stop_with_conditions()
270
271     def start_with_conditions(self, guid):
272         rm = self.get_resource(guid)
273         return rm.start_with_condition()
274
275     def deploy(self, group = None, wait_all_ready = True):
276         """ Deploy all resource manager in group
277
278         :param group: List of guids of RMs to deploy
279         :type group: list
280
281         :param wait_all_ready: Wait until all RMs are ready in
282             order to start the RMs
283         :type guid: int
284
285         """
286         self.logger.debug(" ------- DEPLOY START ------ ")
287
288         stop = []
289
290         def steps(rm):
291             try:
292                 rm.deploy()
293                 rm.start_with_conditions()
294
295                 # Only if the RM has STOP conditions we
296                 # schedule a stop. Otherwise the RM will stop immediately
297                 if rm.conditions.get(ResourceAction.STOP):
298                     rm.stop_with_conditions()
299             except:
300                 import traceback
301                 err = traceback.format_exc()
302                 
303                 self._logger.error("Error occurred while deploying resources: %s" % err)
304
305                 # stop deployment
306                 stop.append(None)
307
308         if not group:
309             group = self.resources
310
311         # Before starting deployment we disorder the group list with the
312         # purpose of speeding up the whole deployment process.
313         # It is likely that the user inserted in the 'group' list closely
314         # resources resources one after another (e.g. all applications
315         # connected to the same node can likely appear one after another).
316         # This can originate a slow down in the deployment since the N 
317         # threads the parallel runner uses to processes tasks may all
318         # be taken up by the same family of resources waiting for the 
319         # same conditions. 
320         # If we disorder the group list, this problem can be mitigated
321         random.shuffle(group)
322
323         threads = []
324         for guid in group:
325             rm = self.get_resource(guid)
326
327             if wait_all_ready:
328                 towait = list(group)
329                 towait.remove(guid)
330                 self.register_condition(guid, ResourceAction.START, 
331                         towait, ResourceState.READY)
332
333             thread = threading.Thread(target = steps, args = (rm,))
334             threads.append(thread)
335             thread.setDaemon(True)
336             thread.start()
337
338         while list(threads) and not self.finished and not stop:
339             thread = threads[0]
340             # Time out after 5 seconds to check EC not terminated
341             thread.join(1)
342             if not thread.is_alive():
343                 threads.remove(thread)
344
345         if stop:
346             # stop the scheduler
347             self._stop_scheduler()
348
349             if self._thread.is_alive():
350                self._thread.join()
351
352             raise RuntimeError, "Error occurred, interrupting deployment " 
353
354     def release(self, group = None):
355         if not group:
356             group = self.resources
357
358         threads = []
359         for guid in group:
360             rm = self.get_resource(guid)
361             thread = threading.Thread(target=rm.release)
362             threads.append(thread)
363             thread.setDaemon(True)
364             thread.start()
365
366         while list(threads) and not self.finished:
367             thread = threads[0]
368             # Time out after 5 seconds to check EC not terminated
369             thread.join(5)
370             if not thread.is_alive():
371                 threads.remove(thread)
372
373     def shutdown(self):
374         self.release()
375
376         self._stop_scheduler()
377         
378         if self._thread.is_alive():
379            self._thread.join()
380
381     def schedule(self, date, callback, track = False):
382         """ Schedule a callback to be executed at time date.
383
384             date    string containing execution time for the task.
385                     It can be expressed as an absolute time, using
386                     timestamp format, or as a relative time matching
387                     ^\d+.\d+(h|m|s|ms|us)$
388
389             callback    code to be executed for the task. Must be a
390                         Python function, and receives args and kwargs
391                         as arguments.
392
393             track   if set to True, the task will be retrivable with
394                     the get_task() method
395         """
396         timestamp = strfvalid(date)
397         
398         task = Task(timestamp, callback)
399         task = self._scheduler.schedule(task)
400
401         if track:
402             self._tasks[task.id] = task
403   
404         # Notify condition to wake up the processing thread
405         self._cond.acquire()
406         self._cond.notify()
407         self._cond.release()
408
409         return task.id
410      
411     def _process(self):
412         runner = ParallelRun(maxthreads = 50)
413         runner.start()
414
415         try:
416             while not self.finished:
417                 self._cond.acquire()
418                 task = self._scheduler.next()
419                 self._cond.release()
420
421                 if not task:
422                     # It there are not tasks in the tasks queue we need to 
423                     # wait until a call to schedule wakes us up
424                     self._cond.acquire()
425                     self._cond.wait()
426                     self._cond.release()
427                 else: 
428                     # If the task timestamp is in the future the thread needs to wait
429                     # until time elapse or until another task is scheduled
430                     now = strfnow()
431                     if now < task.timestamp:
432                         # Calculate time difference in seconds
433                         timeout = strfdiff(task.timestamp, now)
434                         # Re-schedule task with the same timestamp
435                         self._scheduler.schedule(task)
436                         # Sleep until timeout or until a new task awakes the condition
437                         self._cond.acquire()
438                         self._cond.wait(timeout)
439                         self._cond.release()
440                     else:
441                         # Process tasks in parallel
442                         runner.put(self._execute, task)
443                 
444         except: 
445             import traceback
446             err = traceback.format_exc()
447             self._logger.error("Error while processing tasks in the EC: %s" % err)
448
449             self._state = ECState.FAILED
450         finally:
451             runner.sync()
452    
453         # Mark EC state as terminated
454         if self.ecstate == ECState.RUNNING:
455             self._state = ECState.TERMINATED
456
457     def _execute(self, task):
458         # Invoke callback
459         task.status = TaskStatus.DONE
460
461         try:
462             task.result = task.callback()
463         except:
464             import traceback
465             err = traceback.format_exc()
466             task.result = err
467             task.status = TaskStatus.ERROR
468             
469             self._logger.error("Error occurred while executing task: %s" % err)
470
471             self._stop_scheduler()
472
473             # Propage error to the ParallelRunner
474             raise
475
476     def _stop_scheduler(self):
477         # Mark the EC as failed
478         self._state = ECState.FAILED
479
480         # Wake up the EC in case it was sleeping
481         self._cond.acquire()
482         self._cond.notify()
483         self._cond.release()
484
485