d6fcf8715f09ace3d5663fa9a78ae791663e15a9
[nepi.git] / src / nepi / execution / ec.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     RUNNING = 1
41     FAILED = 2
42     TERMINATED = 3
43
44 class ExperimentController(object):
45     def __init__(self, exp_id = None, root_dir = "/tmp"): 
46         super(ExperimentController, self).__init__()
47         # root directory to store files
48         self._root_dir = root_dir
49
50         # experiment identifier given by the user
51         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
52
53         # generator of globally unique ids
54         self._guid_generator = guid.GuidGenerator()
55         
56         # Resource managers
57         self._resources = dict()
58
59         # Scheduler
60         self._scheduler = HeapScheduler()
61
62         # Tasks
63         self._tasks = dict()
64
65         # Event processing thread
66         self._cond = threading.Condition()
67         self._thread = threading.Thread(target = self._process)
68         self._thread.setDaemon(True)
69         self._thread.start()
70
71         # EC state
72         self._state = ECState.RUNNING
73
74         # Logging
75         self._logger = logging.getLogger("ExperimentController")
76
77     @property
78     def logger(self):
79         return self._logger
80
81     @property
82     def ecstate(self):
83         return self._state
84
85     @property
86     def exp_id(self):
87         exp_id = self._exp_id
88         if not exp_id.startswith("nepi-"):
89             exp_id = "nepi-" + exp_id
90         return exp_id
91
92     @property
93     def finished(self):
94         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
95
96     def wait_finished(self, guids):
97         while not all([self.state(guid) == ResourceState.FINISHED \
98                 for guid in guids]) and not self.finished:
99             # We keep the sleep as large as possible to 
100             # decrese the number of RM state requests
101             time.sleep(2)
102     
103     def get_task(self, tid):
104         return self._tasks.get(tid)
105
106     def get_resource(self, guid):
107         return self._resources.get(guid)
108
109     @property
110     def resources(self):
111         return self._resources.keys()
112
113     def register_resource(self, rtype, guid = None):
114         # Get next available guid
115         guid = self._guid_generator.next(guid)
116         
117         # Instantiate RM
118         rm = ResourceFactory.create(rtype, self, guid)
119
120         # Store RM
121         self._resources[guid] = rm
122
123         return guid
124
125     def get_attributes(self, guid):
126         rm = self.get_resource(guid)
127         return rm.get_attributes()
128
129     def get_filters(self, guid):
130         rm = self.get_resource(guid)
131         return rm.get_filters()
132
133     def register_connection(self, guid1, guid2):
134         rm1 = self.get_resource(guid1)
135         rm2 = self.get_resource(guid2)
136
137         rm1.connect(guid2)
138         rm2.connect(guid1)
139
140     def register_condition(self, group1, action, group2, state,
141             time = None):
142         """ Registers an action START or STOP for all RM on group1 to occur 
143             time 'time' after all elements in group2 reached state 'state'.
144
145             :param group1: List of guids of RMs subjected to action
146             :type group1: list
147
148             :param action: Action to register (either START or STOP)
149             :type action: ResourceAction
150
151             :param group2: List of guids of RMs to we waited for
152             :type group2: list
153
154             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
155             :type state: ResourceState
156
157             :param time: Time to wait after group2 has reached status 
158             :type time: string
159
160         """
161         if isinstance(group1, int):
162             group1 = [group1]
163         if isinstance(group2, int):
164             group2 = [group2]
165
166         for guid1 in group1:
167             rm = self.get_resource(guid1)
168             rm.register_condition(action, group2, state, time)
169
170     def register_trace(self, guid, name):
171         """ Enable trace
172
173         :param name: Name of the trace
174         :type name: str
175         """
176         rm = self.get_resource(guid)
177         rm.register_trace(name)
178
179     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
180         """ Get information on collected trace
181
182         :param name: Name of the trace
183         :type name: str
184
185         :param attr: Can be one of:
186                          - TraceAttr.ALL (complete trace content), 
187                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
188                          - TraceAttr.PATH (full path to the trace file),
189                          - TraceAttr.SIZE (size of trace file). 
190         :type attr: str
191
192         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
193         :type name: int
194
195         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
196         :type name: int
197
198         :rtype: str
199         """
200         rm = self.get_resource(guid)
201         return rm.trace(name, attr, block, offset)
202
203     def discover(self, guid, filters):
204         rm = self.get_resource(guid)
205         return rm.discover(filters)
206
207     def provision(self, guid, filters):
208         rm = self.get_resource(guid)
209         return rm.provision(filters)
210
211     def get(self, guid, name):
212         rm = self.get_resource(guid)
213         return rm.get(name)
214
215     def set(self, guid, name, value):
216         rm = self.get_resource(guid)
217         return rm.set(name, value)
218
219     def state(self, guid):
220         rm = self.get_resource(guid)
221         return rm.state
222
223     def stop(self, guid):
224         rm = self.get_resource(guid)
225         return rm.stop()
226
227     def start(self, guid):
228         rm = self.get_resource(guid)
229         return rm.start()
230
231     def set_with_conditions(self, name, value, group1, group2, state,
232             time = None):
233         """ Set value 'value' on attribute with name 'name' on all RMs of
234             group1 when 'time' has elapsed since all elements in group2 
235             have reached state 'state'.
236
237             :param name: Name of attribute to set in RM
238             :type name: string
239
240             :param value: Value of attribute to set in RM
241             :type name: string
242
243             :param group1: List of guids of RMs subjected to action
244             :type group1: list
245
246             :param action: Action to register (either START or STOP)
247             :type action: ResourceAction
248
249             :param group2: List of guids of RMs to we waited for
250             :type group2: list
251
252             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
253             :type state: ResourceState
254
255             :param time: Time to wait after group2 has reached status 
256             :type time: string
257
258         """
259         if isinstance(group1, int):
260             group1 = [group1]
261         if isinstance(group2, int):
262             group2 = [group2]
263
264         for guid1 in group1:
265             rm = self.get_resource(guid)
266             rm.set_with_conditions(name, value, group2, state, time)
267
268     def stop_with_conditions(self, guid):
269         rm = self.get_resource(guid)
270         return rm.stop_with_conditions()
271
272     def start_with_conditions(self, guid):
273         rm = self.get_resource(guid)
274         return rm.start_with_condition()
275
276     def deploy(self, group = None, wait_all_ready = True):
277         """ Deploy all resource manager in group
278
279         :param group: List of guids of RMs to deploy
280         :type group: list
281
282         :param wait_all_ready: Wait until all RMs are ready in
283             order to start the RMs
284         :type guid: int
285
286         """
287         self.logger.debug(" ------- DEPLOY START ------ ")
288
289         if not group:
290             group = self.resources
291
292         # Before starting deployment we disorder the group list with the
293         # purpose of speeding up the whole deployment process.
294         # It is likely that the user inserted in the 'group' list closely
295         # resources one after another (e.g. all applications
296         # connected to the same node can likely appear one after another).
297         # This can originate a slow down in the deployment since the N 
298         # threads the parallel runner uses to processes tasks may all
299         # be taken up by the same family of resources waiting for the 
300         # same conditions (e.g. LinuxApplications running on a same 
301         # node share a single lock, so they will tend to be serialized).
302         # If we disorder the group list, this problem can be mitigated.
303         random.shuffle(group)
304
305         def wait_all_and_start(group):
306             reschedule = False
307             for guid in group:
308                 rm = self.get_resource(guid)
309                 if rm.state < ResourceState.READY:
310                     reschedule = True
311                     break
312
313             if reschedule:
314                 callback = functools.partial(wait_all_and_start, group)
315                 self.schedule("1s", callback)
316             else:
317                 # If all resources are read, we schedule the start
318                 for guid in group:
319                     rm = self.get_resource(guid)
320                     self.schedule("0.01s", rm.start_with_conditions)
321
322         if wait_all_ready:
323             # Schedule the function that will check all resources are
324             # READY, and only then it will schedule the start.
325             # This is aimed to reduce the number of tasks looping in the scheduler.
326             # Intead of having N start tasks, we will have only one
327             callback = functools.partial(wait_all_and_start, group)
328             self.schedule("1s", callback)
329
330         for guid in group:
331             rm = self.get_resource(guid)
332             self.schedule("0.001s", rm.deploy)
333
334             if not wait_all_ready:
335                 self.schedule("1s", rm.start_with_conditions)
336
337             if rm.conditions.get(ResourceAction.STOP):
338                 # Only if the RM has STOP conditions we
339                 # schedule a stop. Otherwise the RM will stop immediately
340                 self.schedule("2s", rm.stop_with_conditions)
341
342
343     def release(self, group = None):
344         if not group:
345             group = self.resources
346
347         threads = []
348         for guid in group:
349             rm = self.get_resource(guid)
350             thread = threading.Thread(target=rm.release)
351             threads.append(thread)
352             thread.setDaemon(True)
353             thread.start()
354
355         while list(threads) and not self.finished:
356             thread = threads[0]
357             # Time out after 5 seconds to check EC not terminated
358             thread.join(5)
359             if not thread.is_alive():
360                 threads.remove(thread)
361         
362     def shutdown(self):
363         self.release()
364
365         self._stop_scheduler()
366         
367         if self._thread.is_alive():
368            self._thread.join()
369
370     def schedule(self, date, callback, track = False):
371         """ Schedule a callback to be executed at time date.
372
373             date    string containing execution time for the task.
374                     It can be expressed as an absolute time, using
375                     timestamp format, or as a relative time matching
376                     ^\d+.\d+(h|m|s|ms|us)$
377
378             callback    code to be executed for the task. Must be a
379                         Python function, and receives args and kwargs
380                         as arguments.
381
382             track   if set to True, the task will be retrivable with
383                     the get_task() method
384         """
385         timestamp = strfvalid(date)
386         
387         task = Task(timestamp, callback)
388         task = self._scheduler.schedule(task)
389
390         if track:
391             self._tasks[task.id] = task
392   
393         # Notify condition to wake up the processing thread
394         self._cond.acquire()
395         self._cond.notify()
396         self._cond.release()
397
398         return task.id
399      
400     def _process(self):
401         runner = ParallelRun(maxthreads = 50)
402         runner.start()
403
404         try:
405             while not self.finished:
406                 self._cond.acquire()
407                 task = self._scheduler.next()
408                 self._cond.release()
409                 
410                 if not task:
411                     # It there are not tasks in the tasks queue we need to 
412                     # wait until a call to schedule wakes us up
413                     self._cond.acquire()
414                     self._cond.wait()
415                     self._cond.release()
416                 else: 
417                     # If the task timestamp is in the future the thread needs to wait
418                     # until time elapse or until another task is scheduled
419                     now = strfnow()
420                     if now < task.timestamp:
421                         # Calculate time difference in seconds
422                         timeout = strfdiff(task.timestamp, now)
423                         # Re-schedule task with the same timestamp
424                         self._scheduler.schedule(task)
425                         # Sleep until timeout or until a new task awakes the condition
426                         self._cond.acquire()
427                         self._cond.wait(timeout)
428                         self._cond.release()
429                     else:
430                         # Process tasks in parallel
431                         runner.put(self._execute, task)
432         except: 
433             import traceback
434             err = traceback.format_exc()
435             self._logger.error("Error while processing tasks in the EC: %s" % err)
436
437             self._state = ECState.FAILED
438    
439         # Mark EC state as terminated
440         if self.ecstate == ECState.RUNNING:
441             # Synchronize to get errors if occurred
442             runner.sync()
443             self._state = ECState.TERMINATED
444
445     def _execute(self, task):
446         # Invoke callback
447         task.status = TaskStatus.DONE
448
449         try:
450             task.result = task.callback()
451         except:
452             import traceback
453             err = traceback.format_exc()
454             task.result = err
455             task.status = TaskStatus.ERROR
456             
457             self._logger.error("Error occurred while executing task: %s" % err)
458
459             self._stop_scheduler()
460
461             # Propage error to the ParallelRunner
462             raise
463
464     def _stop_scheduler(self):
465         # Mark the EC as failed
466         self._state = ECState.FAILED
467
468         # Wake up the EC in case it was sleeping
469         self._cond.acquire()
470         self._cond.notify()
471         self._cond.release()
472
473