Fixing UdpTunnel not cleaning up processes
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
38 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39
40 class ECState(object):
41     """ State of the Experiment Controller
42    
43     """
44     RUNNING = 1
45     FAILED = 2
46     TERMINATED = 3
47
48 class ExperimentController(object):
49     """
50     .. class:: Class Args :
51       
52         :param exp_id: Human readable identifier for the experiment scenario. 
53                        It will be used in the name of the directory 
54                         where experiment related information is stored
55         :type exp_id: str
56
57     .. note::
58
59         An experiment, or scenario, is defined by a concrete use, behavior,
60         configuration and interconnection of resources that describe a single
61         experiment case (We call this the experiment description). 
62         A same experiment (scenario) can be run many times.
63
64         The ExperimentController (EC), is the entity responsible for 
65         managing an experiment instance (run). The same scenario can be 
66         recreated (and re-run) by instantiating an EC and recreating 
67         the same experiment description. 
68
69         In NEPI, an experiment is represented as a graph of interconnected
70         resources. A resource is a generic concept in the sense that any
71         component taking part of an experiment, whether physical of
72         virtual, is considered a resource. A resources could be a host, 
73         a virtual machine, an application, a simulator, a IP address.
74
75         A ResourceManager (RM), is the entity responsible for managing a 
76         single resource. ResourceManagers are specific to a resource
77         type (i.e. An RM to control a Linux application will not be
78         the same as the RM used to control a ns-3 simulation).
79         In order for a new type of resource to be supported in NEPI
80         a new RM must be implemented. NEPI already provides different
81         RMs to control basic resources, and new can be extended from
82         the existing ones.
83
84         Through the EC interface the user can create ResourceManagers (RMs),
85         configure them and interconnect them, in order to describe an experiment.
86         Describing an experiment through the EC does not run the experiment.
87         Only when the 'deploy()' method is invoked on the EC, will the EC take 
88         actions to transform the 'described' experiment into a 'running' experiment.
89
90         While the experiment is running, it is possible to continue to
91         create/configure/connect RMs, and to deploy them to involve new
92         resources in the experiment (this is known as 'interactive' deployment).
93         
94         An experiments in NEPI is identified by a string id, 
95         which is either given by the user, or automatically generated by NEPI.  
96         The purpose of this identifier is to separate files and results that 
97         belong to different experiment scenarios. 
98         However, since a same 'experiment' can be run many times, the experiment
99         id is not enough to identify an experiment instance (run).
100         For this reason, the ExperimentController has two identifier, the 
101         exp_id, which can be re-used by different ExperimentController instances,
102         and the run_id, which unique to a ExperimentController instance, and
103         is automatically generated by NEPI.
104         
105     """
106
107     def __init__(self, exp_id = None): 
108         super(ExperimentController, self).__init__()
109         # root directory to store files
110
111         # Run identifier. It identifies a concrete instance (run) of an experiment.
112         # Since a same experiment (same configuration) can be run many times,
113         # this id permits to identify concrete exoeriment run
114         self._run_id = tsformat()
115
116         # Experiment identifier. Usually assigned by the user
117         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
118
119         # generator of globally unique ids
120         self._guid_generator = guid.GuidGenerator()
121         
122         # Resource managers
123         self._resources = dict()
124
125         # Scheduler
126         self._scheduler = HeapScheduler()
127
128         # Tasks
129         self._tasks = dict()
130
131         # Event processing thread
132         self._cond = threading.Condition()
133         self._thread = threading.Thread(target = self._process)
134         self._thread.setDaemon(True)
135         self._thread.start()
136
137         # EC state
138         self._state = ECState.RUNNING
139
140         # Logging
141         self._logger = logging.getLogger("ExperimentController")
142
143     @property
144     def logger(self):
145         """ Return the logger of the Experiment Controller
146
147         """
148         return self._logger
149
150     @property
151     def ecstate(self):
152         """ Return the state of the Experiment Controller
153
154         """
155         return self._state
156
157     @property
158     def exp_id(self):
159         """ Return the experiment id assigned by the user
160
161         """
162         return self._exp_id
163
164     @property
165     def run_id(self):
166         """ Return the experiment instance (run) identifier  
167
168         """
169         return self._run_id
170
171     @property
172     def finished(self):
173         """ Put the state of the Experiment Controller into a final state :
174             Either TERMINATED or FAILED
175
176         """
177         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
178
179     def wait_finished(self, guids):
180         """ Blocking method that wait until all the RM from the 'guid' list 
181             reached the state FINISHED
182
183         :param guids: List of guids
184         :type guids: list
185         """
186         return self.wait(guids)
187
188     def wait_started(self, guids):
189         """ Blocking method that wait until all the RM from the 'guid' list 
190             reached the state STARTED
191
192         :param guids: List of guids
193         :type guids: list
194         """
195         return self.wait(guids, states = [ResourceState.STARTED,
196             ResourceState.STOPPED,
197             ResourceState.FAILED,
198             ResourceState.FINISHED])
199
200     def wait_released(self, guids):
201         """ Blocking method that wait until all the RM from the 'guid' list 
202             reached the state RELEASED
203
204         :param guids: List of guids
205         :type guids: list
206         """
207         return self.wait(guids, states = [ResourceState.RELEASED,
208             ResourceState.STOPPED,
209             ResourceState.FAILED,
210             ResourceState.FINISHED])
211
212     def wait(self, guids, states = [ResourceState.FINISHED, 
213             ResourceState.FAILED,
214             ResourceState.STOPPED]):
215         """ Blocking method that waits until all the RM from the 'guid' list 
216             reached state 'state' or until a failure occurs
217             
218         :param guids: List of guids
219         :type guids: list
220         """
221         if isinstance(guids, int):
222             guids = [guids]
223
224         # we randomly alter the order of the guids to avoid ordering
225         # dependencies (e.g. LinuxApplication RMs runing on the same
226         # linux host will be synchronized by the LinuxNode SSH lock)
227         random.shuffle(guids)
228
229         while True:
230             # If no more guids to wait for or an error occured, then exit
231             if len(guids) == 0 or self.finished:
232                 break
233
234             # If a guid reached one of the target states, remove it from list
235             guid = guids[0]
236             state = self.state(guid)
237
238             if state in states:
239                 guids.remove(guid)
240             else:
241                 # Debug...
242                 self.logger.debug(" WAITING FOR %g - state %s " % (guid,
243                     self.state(guid, hr = True)))
244
245                 # Take the opportunity to 'refresh' the states of the RMs.
246                 # Query only the first up to N guids (not to overwhelm 
247                 # the local machine)
248                 n = 100
249                 lim = n if len(guids) > n else ( len(guids) -1 )
250                 nguids = guids[0: lim]
251
252                 # schedule state request for all guids (take advantage of
253                 # scheduler multi threading).
254                 for guid in nguids:
255                     callback = functools.partial(self.state, guid)
256                     self.schedule("0s", callback)
257
258                 # If the guid is not in one of the target states, wait and
259                 # continue quering. We keep the sleep big to decrease the
260                 # number of RM state queries
261                 time.sleep(2)
262   
263     def get_task(self, tid):
264         """ Get a specific task
265
266         :param tid: Id of the task
267         :type tid: int
268         :rtype: Task
269         """
270         return self._tasks.get(tid)
271
272     def get_resource(self, guid):
273         """ Get a specific Resource Manager
274
275         :param guid: Id of the task
276         :type guid: int
277         :rtype: ResourceManager
278         """
279         return self._resources.get(guid)
280
281     @property
282     def resources(self):
283         """ Returns the list of all the Resource Manager Id
284
285         :rtype: set
286
287         """
288         return self._resources.keys()
289
290     def register_resource(self, rtype, guid = None):
291         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
292         for the RM of type 'rtype' and add it to the list of Resources.
293
294         :param rtype: Type of the RM
295         :type rtype: str
296         :return: Id of the RM
297         :rtype: int
298         """
299         # Get next available guid
300         guid = self._guid_generator.next(guid)
301         
302         # Instantiate RM
303         rm = ResourceFactory.create(rtype, self, guid)
304
305         # Store RM
306         self._resources[guid] = rm
307
308         return guid
309
310     def get_attributes(self, guid):
311         """ Return all the attibutes of a specific RM
312
313         :param guid: Guid of the RM
314         :type guid: int
315         :return: List of attributes
316         :rtype: list
317         """
318         rm = self.get_resource(guid)
319         return rm.get_attributes()
320
321     def register_connection(self, guid1, guid2):
322         """ Registers a guid1 with a guid2. 
323             The declaration order is not important
324
325             :param guid1: First guid to connect
326             :type guid1: ResourceManager
327
328             :param guid2: Second guid to connect
329             :type guid: ResourceManager
330         """
331         rm1 = self.get_resource(guid1)
332         rm2 = self.get_resource(guid2)
333
334         rm1.register_connection(guid2)
335         rm2.register_connection(guid1)
336
337     def register_condition(self, group1, action, group2, state,
338             time = None):
339         """ Registers an action START or STOP for all RM on group1 to occur 
340             time 'time' after all elements in group2 reached state 'state'.
341
342             :param group1: List of guids of RMs subjected to action
343             :type group1: list
344
345             :param action: Action to register (either START or STOP)
346             :type action: ResourceAction
347
348             :param group2: List of guids of RMs to we waited for
349             :type group2: list
350
351             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
352             :type state: ResourceState
353
354             :param time: Time to wait after group2 has reached status 
355             :type time: string
356
357         """
358         if isinstance(group1, int):
359             group1 = [group1]
360         if isinstance(group2, int):
361             group2 = [group2]
362
363         for guid1 in group1:
364             rm = self.get_resource(guid1)
365             rm.register_condition(action, group2, state, time)
366
367     def enable_trace(self, guid, name):
368         """ Enable trace
369
370         :param name: Name of the trace
371         :type name: str
372         """
373         rm = self.get_resource(guid)
374         rm.enable_trace(name)
375
376     def trace_enabled(self, guid, name):
377         """ Returns True if trace is enabled
378
379         :param name: Name of the trace
380         :type name: str
381         """
382         rm = self.get_resource(guid)
383         return rm.trace_enabled(name)
384
385     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
386         """ Get information on collected trace
387
388         :param name: Name of the trace
389         :type name: str
390
391         :param attr: Can be one of:
392                          - TraceAttr.ALL (complete trace content), 
393                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
394                          - TraceAttr.PATH (full path to the trace file),
395                          - TraceAttr.SIZE (size of trace file). 
396         :type attr: str
397
398         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
399         :type name: int
400
401         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
402         :type name: int
403
404         :rtype: str
405         """
406         rm = self.get_resource(guid)
407         return rm.trace(name, attr, block, offset)
408
409     def discover(self, guid):
410         """ Discover a specific RM defined by its 'guid'
411
412             :param guid: Guid of the RM
413             :type guid: int
414
415         """
416         rm = self.get_resource(guid)
417         return rm.discover()
418
419     def provision(self, guid):
420         """ Provision a specific RM defined by its 'guid'
421
422             :param guid: Guid of the RM
423             :type guid: int
424
425         """
426         rm = self.get_resource(guid)
427         return rm.provision()
428
429     def get(self, guid, name):
430         """ Get a specific attribute 'name' from the RM 'guid'
431
432             :param guid: Guid of the RM
433             :type guid: int
434
435             :param name: attribute's name
436             :type name: str
437
438         """
439         rm = self.get_resource(guid)
440         return rm.get(name)
441
442     def set(self, guid, name, value):
443         """ Set a specific attribute 'name' from the RM 'guid' 
444             with the value 'value' 
445
446             :param guid: Guid of the RM
447             :type guid: int
448
449             :param name: attribute's name
450             :type name: str
451
452             :param value: attribute's value
453
454         """
455         rm = self.get_resource(guid)
456         return rm.set(name, value)
457
458     def state(self, guid, hr = False):
459         """ Returns the state of a resource
460
461             :param guid: Resource guid
462             :type guid: integer
463
464             :param hr: Human readable. Forces return of a 
465                 status string instead of a number 
466             :type hr: boolean
467
468         """
469         rm = self.get_resource(guid)
470         state = rm.state
471
472         if hr:
473             return ResourceState2str.get(state)
474
475         return state
476
477     def stop(self, guid):
478         """ Stop a specific RM defined by its 'guid'
479
480             :param guid: Guid of the RM
481             :type guid: int
482
483         """
484         rm = self.get_resource(guid)
485         return rm.stop()
486
487     def start(self, guid):
488         """ Start a specific RM defined by its 'guid'
489
490             :param guid: Guid of the RM
491             :type guid: int
492
493         """
494         rm = self.get_resource(guid)
495         return rm.start()
496
497     def set_with_conditions(self, name, value, group1, group2, state,
498             time = None):
499         """ Set value 'value' on attribute with name 'name' on all RMs of
500             group1 when 'time' has elapsed since all elements in group2 
501             have reached state 'state'.
502
503             :param name: Name of attribute to set in RM
504             :type name: string
505
506             :param value: Value of attribute to set in RM
507             :type name: string
508
509             :param group1: List of guids of RMs subjected to action
510             :type group1: list
511
512             :param action: Action to register (either START or STOP)
513             :type action: ResourceAction
514
515             :param group2: List of guids of RMs to we waited for
516             :type group2: list
517
518             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
519             :type state: ResourceState
520
521             :param time: Time to wait after group2 has reached status 
522             :type time: string
523
524         """
525         if isinstance(group1, int):
526             group1 = [group1]
527         if isinstance(group2, int):
528             group2 = [group2]
529
530         for guid1 in group1:
531             rm = self.get_resource(guid)
532             rm.set_with_conditions(name, value, group2, state, time)
533
534     def stop_with_conditions(self, guid):
535         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
536
537             :param guid: Guid of the RM
538             :type guid: int
539
540         """
541         rm = self.get_resource(guid)
542         return rm.stop_with_conditions()
543
544     def start_with_conditions(self, guid):
545         """ Start a specific RM defined by its 'guid' only if all the conditions are true
546
547             :param guid: Guid of the RM
548             :type guid: int
549
550         """
551         rm = self.get_resource(guid)
552         return rm.start_with_conditions()
553
554     def deploy(self, group = None, wait_all_ready = True):
555         """ Deploy all resource manager in group
556
557         :param group: List of guids of RMs to deploy
558         :type group: list
559
560         :param wait_all_ready: Wait until all RMs are ready in
561             order to start the RMs
562         :type guid: int
563
564         """
565         self.logger.debug(" ------- DEPLOY START ------ ")
566
567         if not group:
568             # By default, if not deployment group is indicated, 
569             # all RMs that are undeployed will be deployed
570             group = []
571             for guid in self.resources:
572                 if self.state(guid) == ResourceState.NEW:
573                     group.append(guid)
574                 
575         if isinstance(group, int):
576             group = [group]
577
578         # Before starting deployment we disorder the group list with the
579         # purpose of speeding up the whole deployment process.
580         # It is likely that the user inserted in the 'group' list closely
581         # resources one after another (e.g. all applications
582         # connected to the same node can likely appear one after another).
583         # This can originate a slow down in the deployment since the N 
584         # threads the parallel runner uses to processes tasks may all
585         # be taken up by the same family of resources waiting for the 
586         # same conditions (e.g. LinuxApplications running on a same 
587         # node share a single lock, so they will tend to be serialized).
588         # If we disorder the group list, this problem can be mitigated.
589         random.shuffle(group)
590
591         def wait_all_and_start(group):
592             reschedule = False
593             for guid in group:
594                 if self.state(guid) < ResourceState.READY:
595                     reschedule = True
596                     break
597
598             if reschedule:
599                 callback = functools.partial(wait_all_and_start, group)
600                 self.schedule("1s", callback)
601             else:
602                 # If all resources are read, we schedule the start
603                 for guid in group:
604                     rm = self.get_resource(guid)
605                     self.schedule("0s", rm.start_with_conditions)
606
607         if wait_all_ready:
608             # Schedule the function that will check all resources are
609             # READY, and only then it will schedule the start.
610             # This is aimed to reduce the number of tasks looping in the scheduler.
611             # Intead of having N start tasks, we will have only one
612             callback = functools.partial(wait_all_and_start, group)
613             self.schedule("1s", callback)
614
615         for guid in group:
616             rm = self.get_resource(guid)
617             self.schedule("0s", rm.deploy)
618
619             if not wait_all_ready:
620                 self.schedule("1s", rm.start_with_conditions)
621
622             if rm.conditions.get(ResourceAction.STOP):
623                 # Only if the RM has STOP conditions we
624                 # schedule a stop. Otherwise the RM will stop immediately
625                 self.schedule("2s", rm.stop_with_conditions)
626
627     def release(self, group = None):
628         """ Release the elements of the list 'group' or 
629         all the resources if any group is specified
630
631             :param group: List of RM
632             :type group: list
633
634         """
635         if not group:
636             group = self.resources
637
638         for guid in group:
639             rm = self.get_resource(guid)
640             self.schedule("0s", rm.release)
641
642         self.wait_released(group)
643         
644     def shutdown(self):
645         """ Shutdown the Experiment Controller. 
646         Releases all the resources and stops task processing thread
647
648         """
649         self.release()
650
651         # Mark the EC state as TERMINATED
652         self._state = ECState.TERMINATED
653
654         # Notify condition to wake up the processing thread
655         self._notify()
656         
657         if self._thread.is_alive():
658            self._thread.join()
659
660     def schedule(self, date, callback, track = False):
661         """ Schedule a callback to be executed at time date.
662
663             :param date: string containing execution time for the task.
664                     It can be expressed as an absolute time, using
665                     timestamp format, or as a relative time matching
666                     ^\d+.\d+(h|m|s|ms|us)$
667
668             :param callback: code to be executed for the task. Must be a
669                         Python function, and receives args and kwargs
670                         as arguments.
671
672             :param track: if set to True, the task will be retrivable with
673                     the get_task() method
674
675             :return : The Id of the task
676         """
677         timestamp = stabsformat(date)
678         task = Task(timestamp, callback)
679         task = self._scheduler.schedule(task)
680
681         if track:
682             self._tasks[task.id] = task
683
684         # Notify condition to wake up the processing thread
685         self._notify()
686
687         return task.id
688      
689     def _process(self):
690         """ Process scheduled tasks.
691
692         .. note::
693
694         The _process method is executed in an independent thread held by the 
695         ExperimentController for as long as the experiment is running.
696         
697         Tasks are scheduled by invoking the schedule method with a target callback. 
698         The schedule method is given a execution time which controls the
699         order in which tasks are processed. 
700
701         Tasks are processed in parallel using multithreading. 
702         The environmental variable NEPI_NTHREADS can be used to control
703         the number of threads used to process tasks. The default value is 50.
704
705         Exception handling:
706
707         To execute tasks in parallel, an ParallelRunner (PR) object, holding
708         a pool of threads (workers), is used.
709         For each available thread in the PR, the next task popped from 
710         the scheduler queue is 'put' in the PR.
711         Upon receiving a task to execute, each PR worker (thread) invokes the 
712         _execute method of the EC, passing the task as argument. 
713         This method, calls task.callback inside a try/except block. If an 
714         exception is raised by the tasks.callback, it will be trapped by the 
715         try block, logged to standard error (usually the console), and the EC 
716         state will be set to ECState.FAILED.
717         The invocation of _notify immediately after, forces the processing
718         loop in the _process method, to wake up if it was blocked waiting for new 
719         tasks to arrived, and to check the EC state.
720         As the EC is in FAILED state, the processing loop exits and the 
721         'finally' block is invoked. In the 'finally' block, the 'sync' method
722         of the PR is invoked, which forces the PR to raise any unchecked errors
723         that might have been raised by the workers.
724
725         """
726         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
727
728         runner = ParallelRun(maxthreads = nthreads)
729         runner.start()
730
731         try:
732             while not self.finished:
733                 self._cond.acquire()
734
735                 task = self._scheduler.next()
736                 
737                 if not task:
738                     # No task to execute. Wait for a new task to be scheduled.
739                     self._cond.wait()
740                 else:
741                     # The task timestamp is in the future. Wait for timeout 
742                     # or until another task is scheduled.
743                     now = tnow()
744                     if now < task.timestamp:
745                         # Calculate timeout in seconds
746                         timeout = tdiffsec(task.timestamp, now)
747
748                         # Re-schedule task with the same timestamp
749                         self._scheduler.schedule(task)
750                         
751                         task = None
752
753                         # Wait timeout or until a new task awakes the condition
754                         self._cond.wait(timeout)
755                
756                 self._cond.release()
757
758                 if task:
759                     # Process tasks in parallel
760                     runner.put(self._execute, task)
761         except: 
762             import traceback
763             err = traceback.format_exc()
764             self.logger.error("Error while processing tasks in the EC: %s" % err)
765
766             self._state = ECState.FAILED
767         finally:   
768             self.logger.debug("Exiting the task processing loop ... ")
769             runner.sync()
770
771     def _execute(self, task):
772         """ Executes a single task. 
773
774             :param task: Object containing the callback to execute
775             :type task: Task
776
777         .. note::
778
779         If the invokation of the task callback raises an
780         exception, the processing thread of the ExperimentController
781         will be stopped and the experiment will be aborted.
782
783         """
784         # Invoke callback
785         task.status = TaskStatus.DONE
786
787         try:
788             task.result = task.callback()
789         except:
790             import traceback
791             err = traceback.format_exc()
792             task.result = err
793             task.status = TaskStatus.ERROR
794             
795             self.logger.error("Error occurred while executing task: %s" % err)
796
797             # Set the EC to FAILED state (this will force to exit the task
798             # processing thread)
799             self._state = ECState.FAILED
800
801             # Notify condition to wake up the processing thread
802             self._notify()
803
804             # Propage error to the ParallelRunner
805             raise
806
807     def _notify(self):
808         """ Awakes the processing thread in case it is blocked waiting
809         for a new task to be scheduled.
810         """
811         self._cond.acquire()
812         self._cond.notify()
813         self._cond.release()
814