popolate_factory no longer requires to be invoked explicitly by the user
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
38 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39
40 class ECState(object):
41     """ State of the Experiment Controller
42    
43     """
44     RUNNING = 1
45     FAILED = 2
46     TERMINATED = 3
47
48 class ExperimentController(object):
49     """
50     .. class:: Class Args :
51       
52         :param exp_id: Human readable identifier for the experiment scenario. 
53                        It will be used in the name of the directory 
54                         where experiment related information is stored
55         :type exp_id: str
56
57     .. note::
58
59         An experiment, or scenario, is defined by a concrete use, behavior,
60         configuration and interconnection of resources that describe a single
61         experiment case (We call this the experiment description). 
62         A same experiment (scenario) can be run many times.
63
64         The ExperimentController (EC), is the entity responsible for 
65         managing an experiment instance (run). The same scenario can be 
66         recreated (and re-run) by instantiating an EC and recreating 
67         the same experiment description. 
68
69         In NEPI, an experiment is represented as a graph of interconnected
70         resources. A resource is a generic concept in the sense that any
71         component taking part of an experiment, whether physical of
72         virtual, is considered a resource. A resources could be a host, 
73         a virtual machine, an application, a simulator, a IP address.
74
75         A ResourceManager (RM), is the entity responsible for managing a 
76         single resource. ResourceManagers are specific to a resource
77         type (i.e. An RM to control a Linux application will not be
78         the same as the RM used to control a ns-3 simulation).
79         In order for a new type of resource to be supported in NEPI
80         a new RM must be implemented. NEPI already provides different
81         RMs to control basic resources, and new can be extended from
82         the existing ones.
83
84         Through the EC interface the user can create ResourceManagers (RMs),
85         configure them and interconnect them, in order to describe an experiment.
86         Describing an experiment through the EC does not run the experiment.
87         Only when the 'deploy()' method is invoked on the EC, will the EC take 
88         actions to transform the 'described' experiment into a 'running' experiment.
89
90         While the experiment is running, it is possible to continue to
91         create/configure/connect RMs, and to deploy them to involve new
92         resources in the experiment (this is known as 'interactive' deployment).
93         
94         An experiments in NEPI is identified by a string id, 
95         which is either given by the user, or automatically generated by NEPI.  
96         The purpose of this identifier is to separate files and results that 
97         belong to different experiment scenarios. 
98         However, since a same 'experiment' can be run many times, the experiment
99         id is not enough to identify an experiment instance (run).
100         For this reason, the ExperimentController has two identifier, the 
101         exp_id, which can be re-used by different ExperimentController instances,
102         and the run_id, which unique to a ExperimentController instance, and
103         is automatically generated by NEPI.
104         
105     """
106
107     def __init__(self, exp_id = None): 
108         super(ExperimentController, self).__init__()
109         # root directory to store files
110
111         # Run identifier. It identifies a concrete instance (run) of an experiment.
112         # Since a same experiment (same configuration) can be run many times,
113         # this id permits to identify concrete exoeriment run
114         self._run_id = tsformat()
115
116         # Experiment identifier. Usually assigned by the user
117         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
118
119         # generator of globally unique ids
120         self._guid_generator = guid.GuidGenerator()
121         
122         # Resource managers
123         self._resources = dict()
124
125         # Scheduler
126         self._scheduler = HeapScheduler()
127
128         # Tasks
129         self._tasks = dict()
130
131         # Event processing thread
132         self._cond = threading.Condition()
133         self._thread = threading.Thread(target = self._process)
134         self._thread.setDaemon(True)
135         self._thread.start()
136
137         # EC state
138         self._state = ECState.RUNNING
139
140         # Logging
141         self._logger = logging.getLogger("ExperimentController")
142
143     @property
144     def logger(self):
145         """ Return the logger of the Experiment Controller
146
147         """
148         return self._logger
149
150     @property
151     def ecstate(self):
152         """ Return the state of the Experiment Controller
153
154         """
155         return self._state
156
157     @property
158     def exp_id(self):
159         """ Return the experiment id assigned by the user
160
161         """
162         return self._exp_id
163
164     @property
165     def run_id(self):
166         """ Return the experiment instance (run) identifier  
167
168         """
169         return self._run_id
170
171     @property
172     def finished(self):
173         """ Put the state of the Experiment Controller into a final state :
174             Either TERMINATED or FAILED
175
176         """
177         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
178
179     def wait_finished(self, guids):
180         """ Blocking method that wait until all the RM from the 'guid' list 
181             reached the state FINISHED
182
183         :param guids: List of guids
184         :type guids: list
185         """
186         return self.wait(guids)
187
188     def wait_started(self, guids):
189         """ Blocking method that wait until all the RM from the 'guid' list 
190             reached the state STARTED
191
192         :param guids: List of guids
193         :type guids: list
194         """
195         return self.wait(guids, states = [ResourceState.STARTED,
196             ResourceState.STOPPED,
197             ResourceState.FAILED,
198             ResourceState.FINISHED])
199
200     def wait(self, guids, states = [ResourceState.FINISHED, 
201             ResourceState.FAILED,
202             ResourceState.STOPPED]):
203         """ Blocking method that waits until all the RM from the 'guid' list 
204             reached state 'state' or until a failure occurs
205             
206         :param guids: List of guids
207         :type guids: list
208         """
209         if isinstance(guids, int):
210             guids = [guids]
211
212         # we randomly alter the order of the guids to avoid ordering
213         # dependencies (e.g. LinuxApplication RMs runing on the same
214         # linux host will be synchronized by the LinuxNode SSH lock)
215         random.shuffle(guids)
216
217         while True:
218             # If no more guids to wait for or an error occured, then exit
219             if len(guids) == 0 or self.finished:
220                 break
221
222             # If a guid reached one of the target states, remove it from list
223             guid = guids[0]
224             state = self.state(guid)
225
226             if state in states:
227                 guids.remove(guid)
228             else:
229                 # Debug...
230                 self.logger.debug(" WAITING FOR %g - state %s " % (guid,
231                     self.state(guid, hr = True)))
232
233                 # Take the opportunity to 'refresh' the states of the RMs.
234                 # Query only the first up to N guids (not to overwhelm 
235                 # the local machine)
236                 n = 100
237                 lim = n if len(guids) > n else ( len(guids) -1 )
238                 nguids = guids[0: lim]
239
240                 # schedule state request for all guids (take advantage of
241                 # scheduler multi threading).
242                 for guid in nguids:
243                     callback = functools.partial(self.state, guid)
244                     self.schedule("0s", callback)
245
246                 # If the guid is not in one of the target states, wait and
247                 # continue quering. We keep the sleep big to decrease the
248                 # number of RM state queries
249                 time.sleep(2)
250   
251     def get_task(self, tid):
252         """ Get a specific task
253
254         :param tid: Id of the task
255         :type tid: int
256         :rtype: Task
257         """
258         return self._tasks.get(tid)
259
260     def get_resource(self, guid):
261         """ Get a specific Resource Manager
262
263         :param guid: Id of the task
264         :type guid: int
265         :rtype: ResourceManager
266         """
267         return self._resources.get(guid)
268
269     @property
270     def resources(self):
271         """ Returns the list of all the Resource Manager Id
272
273         :rtype: set
274
275         """
276         return self._resources.keys()
277
278     def register_resource(self, rtype, guid = None):
279         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
280         for the RM of type 'rtype' and add it to the list of Resources.
281
282         :param rtype: Type of the RM
283         :type rtype: str
284         :return: Id of the RM
285         :rtype: int
286         """
287         # Get next available guid
288         guid = self._guid_generator.next(guid)
289         
290         # Instantiate RM
291         rm = ResourceFactory.create(rtype, self, guid)
292
293         # Store RM
294         self._resources[guid] = rm
295
296         return guid
297
298     def get_attributes(self, guid):
299         """ Return all the attibutes of a specific RM
300
301         :param guid: Guid of the RM
302         :type guid: int
303         :return: List of attributes
304         :rtype: list
305         """
306         rm = self.get_resource(guid)
307         return rm.get_attributes()
308
309     def register_connection(self, guid1, guid2):
310         """ Registers a guid1 with a guid2. 
311             The declaration order is not important
312
313             :param guid1: First guid to connect
314             :type guid1: ResourceManager
315
316             :param guid2: Second guid to connect
317             :type guid: ResourceManager
318         """
319         rm1 = self.get_resource(guid1)
320         rm2 = self.get_resource(guid2)
321
322         rm1.register_connection(guid2)
323         rm2.register_connection(guid1)
324
325     def register_condition(self, group1, action, group2, state,
326             time = None):
327         """ Registers an action START or STOP for all RM on group1 to occur 
328             time 'time' after all elements in group2 reached state 'state'.
329
330             :param group1: List of guids of RMs subjected to action
331             :type group1: list
332
333             :param action: Action to register (either START or STOP)
334             :type action: ResourceAction
335
336             :param group2: List of guids of RMs to we waited for
337             :type group2: list
338
339             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
340             :type state: ResourceState
341
342             :param time: Time to wait after group2 has reached status 
343             :type time: string
344
345         """
346         if isinstance(group1, int):
347             group1 = [group1]
348         if isinstance(group2, int):
349             group2 = [group2]
350
351         for guid1 in group1:
352             rm = self.get_resource(guid1)
353             rm.register_condition(action, group2, state, time)
354
355     def enable_trace(self, guid, name):
356         """ Enable trace
357
358         :param name: Name of the trace
359         :type name: str
360         """
361         rm = self.get_resource(guid)
362         rm.enable_trace(name)
363
364     def trace_enabled(self, guid, name):
365         """ Returns True if trace is enabled
366
367         :param name: Name of the trace
368         :type name: str
369         """
370         rm = self.get_resource(guid)
371         return rm.trace_enabled(name)
372
373     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
374         """ Get information on collected trace
375
376         :param name: Name of the trace
377         :type name: str
378
379         :param attr: Can be one of:
380                          - TraceAttr.ALL (complete trace content), 
381                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
382                          - TraceAttr.PATH (full path to the trace file),
383                          - TraceAttr.SIZE (size of trace file). 
384         :type attr: str
385
386         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
387         :type name: int
388
389         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
390         :type name: int
391
392         :rtype: str
393         """
394         rm = self.get_resource(guid)
395         return rm.trace(name, attr, block, offset)
396
397     def discover(self, guid):
398         """ Discover a specific RM defined by its 'guid'
399
400             :param guid: Guid of the RM
401             :type guid: int
402
403         """
404         rm = self.get_resource(guid)
405         return rm.discover()
406
407     def provision(self, guid):
408         """ Provision a specific RM defined by its 'guid'
409
410             :param guid: Guid of the RM
411             :type guid: int
412
413         """
414         rm = self.get_resource(guid)
415         return rm.provision()
416
417     def get(self, guid, name):
418         """ Get a specific attribute 'name' from the RM 'guid'
419
420             :param guid: Guid of the RM
421             :type guid: int
422
423             :param name: attribute's name
424             :type name: str
425
426         """
427         rm = self.get_resource(guid)
428         return rm.get(name)
429
430     def set(self, guid, name, value):
431         """ Set a specific attribute 'name' from the RM 'guid' 
432             with the value 'value' 
433
434             :param guid: Guid of the RM
435             :type guid: int
436
437             :param name: attribute's name
438             :type name: str
439
440             :param value: attribute's value
441
442         """
443         rm = self.get_resource(guid)
444         return rm.set(name, value)
445
446     def state(self, guid, hr = False):
447         """ Returns the state of a resource
448
449             :param guid: Resource guid
450             :type guid: integer
451
452             :param hr: Human readable. Forces return of a 
453                 status string instead of a number 
454             :type hr: boolean
455
456         """
457         rm = self.get_resource(guid)
458         state = rm.state
459
460         if hr:
461             return ResourceState2str.get(state)
462
463         return state
464
465     def stop(self, guid):
466         """ Stop a specific RM defined by its 'guid'
467
468             :param guid: Guid of the RM
469             :type guid: int
470
471         """
472         rm = self.get_resource(guid)
473         return rm.stop()
474
475     def start(self, guid):
476         """ Start a specific RM defined by its 'guid'
477
478             :param guid: Guid of the RM
479             :type guid: int
480
481         """
482         rm = self.get_resource(guid)
483         return rm.start()
484
485     def set_with_conditions(self, name, value, group1, group2, state,
486             time = None):
487         """ Set value 'value' on attribute with name 'name' on all RMs of
488             group1 when 'time' has elapsed since all elements in group2 
489             have reached state 'state'.
490
491             :param name: Name of attribute to set in RM
492             :type name: string
493
494             :param value: Value of attribute to set in RM
495             :type name: string
496
497             :param group1: List of guids of RMs subjected to action
498             :type group1: list
499
500             :param action: Action to register (either START or STOP)
501             :type action: ResourceAction
502
503             :param group2: List of guids of RMs to we waited for
504             :type group2: list
505
506             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
507             :type state: ResourceState
508
509             :param time: Time to wait after group2 has reached status 
510             :type time: string
511
512         """
513         if isinstance(group1, int):
514             group1 = [group1]
515         if isinstance(group2, int):
516             group2 = [group2]
517
518         for guid1 in group1:
519             rm = self.get_resource(guid)
520             rm.set_with_conditions(name, value, group2, state, time)
521
522     def stop_with_conditions(self, guid):
523         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
524
525             :param guid: Guid of the RM
526             :type guid: int
527
528         """
529         rm = self.get_resource(guid)
530         return rm.stop_with_conditions()
531
532     def start_with_conditions(self, guid):
533         """ Start a specific RM defined by its 'guid' only if all the conditions are true
534
535             :param guid: Guid of the RM
536             :type guid: int
537
538         """
539         rm = self.get_resource(guid)
540         return rm.start_with_condition()
541
542     def deploy(self, group = None, wait_all_ready = True):
543         """ Deploy all resource manager in group
544
545         :param group: List of guids of RMs to deploy
546         :type group: list
547
548         :param wait_all_ready: Wait until all RMs are ready in
549             order to start the RMs
550         :type guid: int
551
552         """
553         self.logger.debug(" ------- DEPLOY START ------ ")
554
555         if not group:
556             # By default, if not deployment group is indicated, 
557             # all RMs that are undeployed will be deployed
558             group = []
559             for guid in self.resources:
560                 if self.state(guid) == ResourceState.NEW:
561                     group.append(guid)
562                 
563         if isinstance(group, int):
564             group = [group]
565
566         # Before starting deployment we disorder the group list with the
567         # purpose of speeding up the whole deployment process.
568         # It is likely that the user inserted in the 'group' list closely
569         # resources one after another (e.g. all applications
570         # connected to the same node can likely appear one after another).
571         # This can originate a slow down in the deployment since the N 
572         # threads the parallel runner uses to processes tasks may all
573         # be taken up by the same family of resources waiting for the 
574         # same conditions (e.g. LinuxApplications running on a same 
575         # node share a single lock, so they will tend to be serialized).
576         # If we disorder the group list, this problem can be mitigated.
577         random.shuffle(group)
578
579         def wait_all_and_start(group):
580             reschedule = False
581             for guid in group:
582                 if self.state(guid) < ResourceState.READY:
583                     reschedule = True
584                     break
585
586             if reschedule:
587                 callback = functools.partial(wait_all_and_start, group)
588                 self.schedule("1s", callback)
589             else:
590                 # If all resources are read, we schedule the start
591                 for guid in group:
592                     rm = self.get_resource(guid)
593                     self.schedule("0s", rm.start_with_conditions)
594
595         if wait_all_ready:
596             # Schedule the function that will check all resources are
597             # READY, and only then it will schedule the start.
598             # This is aimed to reduce the number of tasks looping in the scheduler.
599             # Intead of having N start tasks, we will have only one
600             callback = functools.partial(wait_all_and_start, group)
601             self.schedule("1s", callback)
602
603         for guid in group:
604             rm = self.get_resource(guid)
605             self.schedule("0s", rm.deploy)
606
607             if not wait_all_ready:
608                 self.schedule("1s", rm.start_with_conditions)
609
610             if rm.conditions.get(ResourceAction.STOP):
611                 # Only if the RM has STOP conditions we
612                 # schedule a stop. Otherwise the RM will stop immediately
613                 self.schedule("2s", rm.stop_with_conditions)
614
615     def release(self, group = None):
616         """ Release the elements of the list 'group' or 
617         all the resources if any group is specified
618
619             :param group: List of RM
620             :type group: list
621
622         """
623         if not group:
624             group = self.resources
625
626         threads = []
627         for guid in group:
628             rm = self.get_resource(guid)
629             thread = threading.Thread(target=rm.release)
630             threads.append(thread)
631             thread.setDaemon(True)
632             thread.start()
633
634         while list(threads) and not self.finished:
635             thread = threads[0]
636             # Time out after 5 seconds to check EC not terminated
637             thread.join(5)
638             if not thread.is_alive():
639                 threads.remove(thread)
640         
641     def shutdown(self):
642         """ Shutdown the Experiment Controller. 
643         Releases all the resources and stops task processing thread
644
645         """
646         self.release()
647
648         # Mark the EC state as TERMINATED
649         self._state = ECState.TERMINATED
650
651         # Notify condition to wake up the processing thread
652         self._notify()
653         
654         if self._thread.is_alive():
655            self._thread.join()
656
657     def schedule(self, date, callback, track = False):
658         """ Schedule a callback to be executed at time date.
659
660             :param date: string containing execution time for the task.
661                     It can be expressed as an absolute time, using
662                     timestamp format, or as a relative time matching
663                     ^\d+.\d+(h|m|s|ms|us)$
664
665             :param callback: code to be executed for the task. Must be a
666                         Python function, and receives args and kwargs
667                         as arguments.
668
669             :param track: if set to True, the task will be retrivable with
670                     the get_task() method
671
672             :return : The Id of the task
673         """
674         timestamp = stabsformat(date)
675         task = Task(timestamp, callback)
676         task = self._scheduler.schedule(task)
677
678         if track:
679             self._tasks[task.id] = task
680
681         # Notify condition to wake up the processing thread
682         self._notify()
683
684         return task.id
685      
686     def _process(self):
687         """ Process scheduled tasks.
688
689         .. note::
690
691         The _process method is executed in an independent thread held by the 
692         ExperimentController for as long as the experiment is running.
693         
694         Tasks are scheduled by invoking the schedule method with a target callback. 
695         The schedule method is given a execution time which controls the
696         order in which tasks are processed. 
697
698         Tasks are processed in parallel using multithreading. 
699         The environmental variable NEPI_NTHREADS can be used to control
700         the number of threads used to process tasks. The default value is 50.
701
702         Exception handling:
703
704         To execute tasks in parallel, an ParallelRunner (PR) object, holding
705         a pool of threads (workers), is used.
706         For each available thread in the PR, the next task popped from 
707         the scheduler queue is 'put' in the PR.
708         Upon receiving a task to execute, each PR worker (thread) invokes the 
709         _execute method of the EC, passing the task as argument. 
710         This method, calls task.callback inside a try/except block. If an 
711         exception is raised by the tasks.callback, it will be trapped by the 
712         try block, logged to standard error (usually the console), and the EC 
713         state will be set to ECState.FAILED.
714         The invocation of _notify immediately after, forces the processing
715         loop in the _process method, to wake up if it was blocked waiting for new 
716         tasks to arrived, and to check the EC state.
717         As the EC is in FAILED state, the processing loop exits and the 
718         'finally' block is invoked. In the 'finally' block, the 'sync' method
719         of the PR is invoked, which forces the PR to raise any unchecked errors
720         that might have been raised by the workers.
721
722         """
723         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
724
725         runner = ParallelRun(maxthreads = nthreads)
726         runner.start()
727
728         try:
729             while not self.finished:
730                 self._cond.acquire()
731
732                 task = self._scheduler.next()
733                 
734                 if not task:
735                     # No task to execute. Wait for a new task to be scheduled.
736                     self._cond.wait()
737                 else:
738                     # The task timestamp is in the future. Wait for timeout 
739                     # or until another task is scheduled.
740                     now = tnow()
741                     if now < task.timestamp:
742                         # Calculate timeout in seconds
743                         timeout = tdiffsec(task.timestamp, now)
744
745                         # Re-schedule task with the same timestamp
746                         self._scheduler.schedule(task)
747                         
748                         task = None
749
750                         # Wait timeout or until a new task awakes the condition
751                         self._cond.wait(timeout)
752                
753                 self._cond.release()
754
755                 if task:
756                     # Process tasks in parallel
757                     runner.put(self._execute, task)
758         except: 
759             import traceback
760             err = traceback.format_exc()
761             self.logger.error("Error while processing tasks in the EC: %s" % err)
762
763             self._state = ECState.FAILED
764         finally:   
765             self.logger.debug("Exiting the task processing loop ... ")
766             runner.sync()
767
768     def _execute(self, task):
769         """ Executes a single task. 
770
771             :param task: Object containing the callback to execute
772             :type task: Task
773
774         .. note::
775
776         If the invokation of the task callback raises an
777         exception, the processing thread of the ExperimentController
778         will be stopped and the experiment will be aborted.
779
780         """
781         # Invoke callback
782         task.status = TaskStatus.DONE
783
784         try:
785             task.result = task.callback()
786         except:
787             import traceback
788             err = traceback.format_exc()
789             task.result = err
790             task.status = TaskStatus.ERROR
791             
792             self.logger.error("Error occurred while executing task: %s" % err)
793
794             # Set the EC to FAILED state (this will force to exit the task
795             # processing thread)
796             self._state = ECState.FAILED
797
798             # Notify condition to wake up the processing thread
799             self._notify()
800
801             # Propage error to the ParallelRunner
802             raise
803
804     def _notify(self):
805         """ Awakes the processing thread in case it is blocked waiting
806         for a new task to be scheduled.
807         """
808         self._cond.acquire()
809         self._cond.notify()
810         self._cond.release()
811