Adding authors and correcting licence information
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     RUNNING = 1
41     FAILED = 2
42     TERMINATED = 3
43
44 class ExperimentController(object):
45     def __init__(self, exp_id = None, root_dir = "/tmp"): 
46         super(ExperimentController, self).__init__()
47         # root directory to store files
48         self._root_dir = root_dir
49
50         # experiment identifier given by the user
51         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
52
53         # generator of globally unique ids
54         self._guid_generator = guid.GuidGenerator()
55         
56         # Resource managers
57         self._resources = dict()
58
59         # Scheduler
60         self._scheduler = HeapScheduler()
61
62         # Tasks
63         self._tasks = dict()
64
65         # Event processing thread
66         self._cond = threading.Condition()
67         self._thread = threading.Thread(target = self._process)
68         self._thread.setDaemon(True)
69         self._thread.start()
70
71         # EC state
72         self._state = ECState.RUNNING
73
74         # Logging
75         self._logger = logging.getLogger("ExperimentController")
76
77     @property
78     def logger(self):
79         return self._logger
80
81     @property
82     def ecstate(self):
83         return self._state
84
85     @property
86     def exp_id(self):
87         exp_id = self._exp_id
88         if not exp_id.startswith("nepi-"):
89             exp_id = "nepi-" + exp_id
90         return exp_id
91
92     @property
93     def finished(self):
94         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
95
96     def wait_finished(self, guids):
97         while not all([self.state(guid) in [ResourceState.FINISHED, 
98             ResourceState.STOPPED, 
99             ResourceState.FAILED] \
100                 for guid in guids]) and not self.finished:
101             # We keep the sleep as large as possible to 
102             # decrese the number of RM state requests
103             time.sleep(2)
104     
105     def get_task(self, tid):
106         return self._tasks.get(tid)
107
108     def get_resource(self, guid):
109         return self._resources.get(guid)
110
111     @property
112     def resources(self):
113         return self._resources.keys()
114
115     def register_resource(self, rtype, guid = None):
116         # Get next available guid
117         guid = self._guid_generator.next(guid)
118         
119         # Instantiate RM
120         rm = ResourceFactory.create(rtype, self, guid)
121
122         # Store RM
123         self._resources[guid] = rm
124
125         return guid
126
127     def get_attributes(self, guid):
128         rm = self.get_resource(guid)
129         return rm.get_attributes()
130
131     def register_connection(self, guid1, guid2):
132         rm1 = self.get_resource(guid1)
133         rm2 = self.get_resource(guid2)
134
135         rm1.connect(guid2)
136         rm2.connect(guid1)
137
138     def register_condition(self, group1, action, group2, state,
139             time = None):
140         """ Registers an action START or STOP for all RM on group1 to occur 
141             time 'time' after all elements in group2 reached state 'state'.
142
143             :param group1: List of guids of RMs subjected to action
144             :type group1: list
145
146             :param action: Action to register (either START or STOP)
147             :type action: ResourceAction
148
149             :param group2: List of guids of RMs to we waited for
150             :type group2: list
151
152             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
153             :type state: ResourceState
154
155             :param time: Time to wait after group2 has reached status 
156             :type time: string
157
158         """
159         if isinstance(group1, int):
160             group1 = [group1]
161         if isinstance(group2, int):
162             group2 = [group2]
163
164         for guid1 in group1:
165             rm = self.get_resource(guid1)
166             rm.register_condition(action, group2, state, time)
167
168     def register_trace(self, guid, name):
169         """ Enable trace
170
171         :param name: Name of the trace
172         :type name: str
173         """
174         rm = self.get_resource(guid)
175         rm.register_trace(name)
176
177     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
178         """ Get information on collected trace
179
180         :param name: Name of the trace
181         :type name: str
182
183         :param attr: Can be one of:
184                          - TraceAttr.ALL (complete trace content), 
185                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
186                          - TraceAttr.PATH (full path to the trace file),
187                          - TraceAttr.SIZE (size of trace file). 
188         :type attr: str
189
190         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
191         :type name: int
192
193         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
194         :type name: int
195
196         :rtype: str
197         """
198         rm = self.get_resource(guid)
199         return rm.trace(name, attr, block, offset)
200
201     def discover(self, guid):
202         rm = self.get_resource(guid)
203         return rm.discover()
204
205     def provision(self, guid):
206         rm = self.get_resource(guid)
207         return rm.provision()
208
209     def get(self, guid, name):
210         rm = self.get_resource(guid)
211         return rm.get(name)
212
213     def set(self, guid, name, value):
214         rm = self.get_resource(guid)
215         return rm.set(name, value)
216
217     def state(self, guid, hr = False):
218         """ Returns the state of a resource
219
220             :param guid: Resource guid
221             :type guid: integer
222
223             :param hr: Human readable. Forces return of a 
224                 status string instead of a number 
225             :type hr: boolean
226
227         """
228         rm = self.get_resource(guid)
229         if hr:
230             return ResourceState2str.get(rm.state)
231
232         return rm.state
233
234     def stop(self, guid):
235         rm = self.get_resource(guid)
236         return rm.stop()
237
238     def start(self, guid):
239         rm = self.get_resource(guid)
240         return rm.start()
241
242     def set_with_conditions(self, name, value, group1, group2, state,
243             time = None):
244         """ Set value 'value' on attribute with name 'name' on all RMs of
245             group1 when 'time' has elapsed since all elements in group2 
246             have reached state 'state'.
247
248             :param name: Name of attribute to set in RM
249             :type name: string
250
251             :param value: Value of attribute to set in RM
252             :type name: string
253
254             :param group1: List of guids of RMs subjected to action
255             :type group1: list
256
257             :param action: Action to register (either START or STOP)
258             :type action: ResourceAction
259
260             :param group2: List of guids of RMs to we waited for
261             :type group2: list
262
263             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
264             :type state: ResourceState
265
266             :param time: Time to wait after group2 has reached status 
267             :type time: string
268
269         """
270         if isinstance(group1, int):
271             group1 = [group1]
272         if isinstance(group2, int):
273             group2 = [group2]
274
275         for guid1 in group1:
276             rm = self.get_resource(guid)
277             rm.set_with_conditions(name, value, group2, state, time)
278
279     def stop_with_conditions(self, guid):
280         rm = self.get_resource(guid)
281         return rm.stop_with_conditions()
282
283     def start_with_conditions(self, guid):
284         rm = self.get_resource(guid)
285         return rm.start_with_condition()
286
287     def deploy(self, group = None, wait_all_ready = True):
288         """ Deploy all resource manager in group
289
290         :param group: List of guids of RMs to deploy
291         :type group: list
292
293         :param wait_all_ready: Wait until all RMs are ready in
294             order to start the RMs
295         :type guid: int
296
297         """
298         self.logger.debug(" ------- DEPLOY START ------ ")
299
300         if not group:
301             group = self.resources
302
303         # Before starting deployment we disorder the group list with the
304         # purpose of speeding up the whole deployment process.
305         # It is likely that the user inserted in the 'group' list closely
306         # resources one after another (e.g. all applications
307         # connected to the same node can likely appear one after another).
308         # This can originate a slow down in the deployment since the N 
309         # threads the parallel runner uses to processes tasks may all
310         # be taken up by the same family of resources waiting for the 
311         # same conditions (e.g. LinuxApplications running on a same 
312         # node share a single lock, so they will tend to be serialized).
313         # If we disorder the group list, this problem can be mitigated.
314         random.shuffle(group)
315
316         def wait_all_and_start(group):
317             reschedule = False
318             for guid in group:
319                 rm = self.get_resource(guid)
320                 if rm.state < ResourceState.READY:
321                     reschedule = True
322                     break
323
324             if reschedule:
325                 callback = functools.partial(wait_all_and_start, group)
326                 self.schedule("1s", callback)
327             else:
328                 # If all resources are read, we schedule the start
329                 for guid in group:
330                     rm = self.get_resource(guid)
331                     self.schedule("0.01s", rm.start_with_conditions)
332
333         if wait_all_ready:
334             # Schedule the function that will check all resources are
335             # READY, and only then it will schedule the start.
336             # This is aimed to reduce the number of tasks looping in the scheduler.
337             # Intead of having N start tasks, we will have only one
338             callback = functools.partial(wait_all_and_start, group)
339             self.schedule("1s", callback)
340
341         for guid in group:
342             rm = self.get_resource(guid)
343             self.schedule("0.001s", rm.deploy)
344
345             if not wait_all_ready:
346                 self.schedule("1s", rm.start_with_conditions)
347
348             if rm.conditions.get(ResourceAction.STOP):
349                 # Only if the RM has STOP conditions we
350                 # schedule a stop. Otherwise the RM will stop immediately
351                 self.schedule("2s", rm.stop_with_conditions)
352
353
354     def release(self, group = None):
355         if not group:
356             group = self.resources
357
358         threads = []
359         for guid in group:
360             rm = self.get_resource(guid)
361             thread = threading.Thread(target=rm.release)
362             threads.append(thread)
363             thread.setDaemon(True)
364             thread.start()
365
366         while list(threads) and not self.finished:
367             thread = threads[0]
368             # Time out after 5 seconds to check EC not terminated
369             thread.join(5)
370             if not thread.is_alive():
371                 threads.remove(thread)
372         
373     def shutdown(self):
374         self.release()
375
376         self._stop_scheduler()
377         
378         if self._thread.is_alive():
379            self._thread.join()
380
381     def schedule(self, date, callback, track = False):
382         """ Schedule a callback to be executed at time date.
383
384             date    string containing execution time for the task.
385                     It can be expressed as an absolute time, using
386                     timestamp format, or as a relative time matching
387                     ^\d+.\d+(h|m|s|ms|us)$
388
389             callback    code to be executed for the task. Must be a
390                         Python function, and receives args and kwargs
391                         as arguments.
392
393             track   if set to True, the task will be retrivable with
394                     the get_task() method
395         """
396         timestamp = strfvalid(date)
397         
398         task = Task(timestamp, callback)
399         task = self._scheduler.schedule(task)
400
401         if track:
402             self._tasks[task.id] = task
403   
404         # Notify condition to wake up the processing thread
405         self._cond.acquire()
406         self._cond.notify()
407         self._cond.release()
408
409         return task.id
410      
411     def _process(self):
412         runner = ParallelRun(maxthreads = 50)
413         runner.start()
414
415         try:
416             while not self.finished:
417                 self._cond.acquire()
418                 task = self._scheduler.next()
419                 self._cond.release()
420                 
421                 if not task:
422                     # It there are not tasks in the tasks queue we need to 
423                     # wait until a call to schedule wakes us up
424                     self._cond.acquire()
425                     self._cond.wait()
426                     self._cond.release()
427                 else: 
428                     # If the task timestamp is in the future the thread needs to wait
429                     # until time elapse or until another task is scheduled
430                     now = strfnow()
431                     if now < task.timestamp:
432                         # Calculate time difference in seconds
433                         timeout = strfdiff(task.timestamp, now)
434                         # Re-schedule task with the same timestamp
435                         self._scheduler.schedule(task)
436                         # Sleep until timeout or until a new task awakes the condition
437                         self._cond.acquire()
438                         self._cond.wait(timeout)
439                         self._cond.release()
440                     else:
441                         # Process tasks in parallel
442                         runner.put(self._execute, task)
443         except: 
444             import traceback
445             err = traceback.format_exc()
446             self._logger.error("Error while processing tasks in the EC: %s" % err)
447
448             self._state = ECState.FAILED
449    
450         # Mark EC state as terminated
451         if self.ecstate == ECState.RUNNING:
452             # Synchronize to get errors if occurred
453             runner.sync()
454             self._state = ECState.TERMINATED
455
456     def _execute(self, task):
457         # Invoke callback
458         task.status = TaskStatus.DONE
459
460         try:
461             task.result = task.callback()
462         except:
463             import traceback
464             err = traceback.format_exc()
465             task.result = err
466             task.status = TaskStatus.ERROR
467             
468             self._logger.error("Error occurred while executing task: %s" % err)
469
470             self._stop_scheduler()
471
472             # Propage error to the ParallelRunner
473             raise
474
475     def _stop_scheduler(self):
476         # Mark the EC as failed
477         self._state = ECState.FAILED
478
479         # Wake up the EC in case it was sleeping
480         self._cond.acquire()
481         self._cond.notify()
482         self._cond.release()
483
484