Adding PlanetLab resources
[nepi.git] / src / nepi / execution / ec.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     RUNNING = 1
41     FAILED = 2
42     TERMINATED = 3
43
44 class ExperimentController(object):
45     def __init__(self, exp_id = None, root_dir = "/tmp"): 
46         super(ExperimentController, self).__init__()
47         # root directory to store files
48         self._root_dir = root_dir
49
50         # experiment identifier given by the user
51         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
52
53         # generator of globally unique ids
54         self._guid_generator = guid.GuidGenerator()
55         
56         # Resource managers
57         self._resources = dict()
58
59         # Scheduler
60         self._scheduler = HeapScheduler()
61
62         # Tasks
63         self._tasks = dict()
64
65         # Event processing thread
66         self._cond = threading.Condition()
67         self._thread = threading.Thread(target = self._process)
68         self._thread.setDaemon(True)
69         self._thread.start()
70
71         # EC state
72         self._state = ECState.RUNNING
73
74         # Logging
75         self._logger = logging.getLogger("ExperimentController")
76
77     @property
78     def logger(self):
79         return self._logger
80
81     @property
82     def ecstate(self):
83         return self._state
84
85     @property
86     def exp_id(self):
87         exp_id = self._exp_id
88         if not exp_id.startswith("nepi-"):
89             exp_id = "nepi-" + exp_id
90         return exp_id
91
92     @property
93     def finished(self):
94         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
95
96     def wait_finished(self, guids):
97         while not all([self.state(guid) == ResourceState.FINISHED \
98                 for guid in guids]) and not self.finished:
99             # We keep the sleep as large as possible to 
100             # decrese the number of RM state requests
101             time.sleep(2)
102     
103     def get_task(self, tid):
104         return self._tasks.get(tid)
105
106     def get_resource(self, guid):
107         return self._resources.get(guid)
108
109     @property
110     def resources(self):
111         return self._resources.keys()
112
113     def register_resource(self, rtype, guid = None):
114         # Get next available guid
115         guid = self._guid_generator.next(guid)
116         
117         # Instantiate RM
118         rm = ResourceFactory.create(rtype, self, guid)
119
120         # Store RM
121         self._resources[guid] = rm
122
123         return guid
124
125     def get_attributes(self, guid):
126         rm = self.get_resource(guid)
127         return rm.get_attributes()
128
129     def register_connection(self, guid1, guid2):
130         rm1 = self.get_resource(guid1)
131         rm2 = self.get_resource(guid2)
132
133         rm1.connect(guid2)
134         rm2.connect(guid1)
135
136     def register_condition(self, group1, action, group2, state,
137             time = None):
138         """ Registers an action START or STOP for all RM on group1 to occur 
139             time 'time' after all elements in group2 reached state 'state'.
140
141             :param group1: List of guids of RMs subjected to action
142             :type group1: list
143
144             :param action: Action to register (either START or STOP)
145             :type action: ResourceAction
146
147             :param group2: List of guids of RMs to we waited for
148             :type group2: list
149
150             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
151             :type state: ResourceState
152
153             :param time: Time to wait after group2 has reached status 
154             :type time: string
155
156         """
157         if isinstance(group1, int):
158             group1 = [group1]
159         if isinstance(group2, int):
160             group2 = [group2]
161
162         for guid1 in group1:
163             rm = self.get_resource(guid1)
164             rm.register_condition(action, group2, state, time)
165
166     def register_trace(self, guid, name):
167         """ Enable trace
168
169         :param name: Name of the trace
170         :type name: str
171         """
172         rm = self.get_resource(guid)
173         rm.register_trace(name)
174
175     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
176         """ Get information on collected trace
177
178         :param name: Name of the trace
179         :type name: str
180
181         :param attr: Can be one of:
182                          - TraceAttr.ALL (complete trace content), 
183                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
184                          - TraceAttr.PATH (full path to the trace file),
185                          - TraceAttr.SIZE (size of trace file). 
186         :type attr: str
187
188         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
189         :type name: int
190
191         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
192         :type name: int
193
194         :rtype: str
195         """
196         rm = self.get_resource(guid)
197         return rm.trace(name, attr, block, offset)
198
199     def discover(self, guid):
200         rm = self.get_resource(guid)
201         return rm.discover()
202
203     def provision(self, guid):
204         rm = self.get_resource(guid)
205         return rm.provision()
206
207     def get(self, guid, name):
208         rm = self.get_resource(guid)
209         return rm.get(name)
210
211     def set(self, guid, name, value):
212         rm = self.get_resource(guid)
213         return rm.set(name, value)
214
215     def state(self, guid, hr = False):
216         """ Returns the state of a resource
217
218             :param guid: Resource guid
219             :type guid: integer
220
221             :param hr: Human readable. Forces return of a 
222                 status string instead of a number 
223             :type hr: boolean
224
225         """
226         rm = self.get_resource(guid)
227         if hr:
228             return ResourceState2str.get(rm.state)
229
230         return rm.state
231
232     def stop(self, guid):
233         rm = self.get_resource(guid)
234         return rm.stop()
235
236     def start(self, guid):
237         rm = self.get_resource(guid)
238         return rm.start()
239
240     def set_with_conditions(self, name, value, group1, group2, state,
241             time = None):
242         """ Set value 'value' on attribute with name 'name' on all RMs of
243             group1 when 'time' has elapsed since all elements in group2 
244             have reached state 'state'.
245
246             :param name: Name of attribute to set in RM
247             :type name: string
248
249             :param value: Value of attribute to set in RM
250             :type name: string
251
252             :param group1: List of guids of RMs subjected to action
253             :type group1: list
254
255             :param action: Action to register (either START or STOP)
256             :type action: ResourceAction
257
258             :param group2: List of guids of RMs to we waited for
259             :type group2: list
260
261             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
262             :type state: ResourceState
263
264             :param time: Time to wait after group2 has reached status 
265             :type time: string
266
267         """
268         if isinstance(group1, int):
269             group1 = [group1]
270         if isinstance(group2, int):
271             group2 = [group2]
272
273         for guid1 in group1:
274             rm = self.get_resource(guid)
275             rm.set_with_conditions(name, value, group2, state, time)
276
277     def stop_with_conditions(self, guid):
278         rm = self.get_resource(guid)
279         return rm.stop_with_conditions()
280
281     def start_with_conditions(self, guid):
282         rm = self.get_resource(guid)
283         return rm.start_with_condition()
284
285     def deploy(self, group = None, wait_all_ready = True):
286         """ Deploy all resource manager in group
287
288         :param group: List of guids of RMs to deploy
289         :type group: list
290
291         :param wait_all_ready: Wait until all RMs are ready in
292             order to start the RMs
293         :type guid: int
294
295         """
296         self.logger.debug(" ------- DEPLOY START ------ ")
297
298         if not group:
299             group = self.resources
300
301         # Before starting deployment we disorder the group list with the
302         # purpose of speeding up the whole deployment process.
303         # It is likely that the user inserted in the 'group' list closely
304         # resources one after another (e.g. all applications
305         # connected to the same node can likely appear one after another).
306         # This can originate a slow down in the deployment since the N 
307         # threads the parallel runner uses to processes tasks may all
308         # be taken up by the same family of resources waiting for the 
309         # same conditions (e.g. LinuxApplications running on a same 
310         # node share a single lock, so they will tend to be serialized).
311         # If we disorder the group list, this problem can be mitigated.
312         random.shuffle(group)
313
314         def wait_all_and_start(group):
315             reschedule = False
316             for guid in group:
317                 rm = self.get_resource(guid)
318                 if rm.state < ResourceState.READY:
319                     reschedule = True
320                     break
321
322             if reschedule:
323                 callback = functools.partial(wait_all_and_start, group)
324                 self.schedule("1s", callback)
325             else:
326                 # If all resources are read, we schedule the start
327                 for guid in group:
328                     rm = self.get_resource(guid)
329                     self.schedule("0.01s", rm.start_with_conditions)
330
331         if wait_all_ready:
332             # Schedule the function that will check all resources are
333             # READY, and only then it will schedule the start.
334             # This is aimed to reduce the number of tasks looping in the scheduler.
335             # Intead of having N start tasks, we will have only one
336             callback = functools.partial(wait_all_and_start, group)
337             self.schedule("1s", callback)
338
339         for guid in group:
340             rm = self.get_resource(guid)
341             self.schedule("0.001s", rm.deploy)
342
343             if not wait_all_ready:
344                 self.schedule("1s", rm.start_with_conditions)
345
346             if rm.conditions.get(ResourceAction.STOP):
347                 # Only if the RM has STOP conditions we
348                 # schedule a stop. Otherwise the RM will stop immediately
349                 self.schedule("2s", rm.stop_with_conditions)
350
351
352     def release(self, group = None):
353         if not group:
354             group = self.resources
355
356         threads = []
357         for guid in group:
358             rm = self.get_resource(guid)
359             thread = threading.Thread(target=rm.release)
360             threads.append(thread)
361             thread.setDaemon(True)
362             thread.start()
363
364         while list(threads) and not self.finished:
365             thread = threads[0]
366             # Time out after 5 seconds to check EC not terminated
367             thread.join(5)
368             if not thread.is_alive():
369                 threads.remove(thread)
370         
371     def shutdown(self):
372         self.release()
373
374         self._stop_scheduler()
375         
376         if self._thread.is_alive():
377            self._thread.join()
378
379     def schedule(self, date, callback, track = False):
380         """ Schedule a callback to be executed at time date.
381
382             date    string containing execution time for the task.
383                     It can be expressed as an absolute time, using
384                     timestamp format, or as a relative time matching
385                     ^\d+.\d+(h|m|s|ms|us)$
386
387             callback    code to be executed for the task. Must be a
388                         Python function, and receives args and kwargs
389                         as arguments.
390
391             track   if set to True, the task will be retrivable with
392                     the get_task() method
393         """
394         timestamp = strfvalid(date)
395         
396         task = Task(timestamp, callback)
397         task = self._scheduler.schedule(task)
398
399         if track:
400             self._tasks[task.id] = task
401   
402         # Notify condition to wake up the processing thread
403         self._cond.acquire()
404         self._cond.notify()
405         self._cond.release()
406
407         return task.id
408      
409     def _process(self):
410         runner = ParallelRun(maxthreads = 50)
411         runner.start()
412
413         try:
414             while not self.finished:
415                 self._cond.acquire()
416                 task = self._scheduler.next()
417                 self._cond.release()
418                 
419                 if not task:
420                     # It there are not tasks in the tasks queue we need to 
421                     # wait until a call to schedule wakes us up
422                     self._cond.acquire()
423                     self._cond.wait()
424                     self._cond.release()
425                 else: 
426                     # If the task timestamp is in the future the thread needs to wait
427                     # until time elapse or until another task is scheduled
428                     now = strfnow()
429                     if now < task.timestamp:
430                         # Calculate time difference in seconds
431                         timeout = strfdiff(task.timestamp, now)
432                         # Re-schedule task with the same timestamp
433                         self._scheduler.schedule(task)
434                         # Sleep until timeout or until a new task awakes the condition
435                         self._cond.acquire()
436                         self._cond.wait(timeout)
437                         self._cond.release()
438                     else:
439                         # Process tasks in parallel
440                         runner.put(self._execute, task)
441         except: 
442             import traceback
443             err = traceback.format_exc()
444             self._logger.error("Error while processing tasks in the EC: %s" % err)
445
446             self._state = ECState.FAILED
447    
448         # Mark EC state as terminated
449         if self.ecstate == ECState.RUNNING:
450             # Synchronize to get errors if occurred
451             runner.sync()
452             self._state = ECState.TERMINATED
453
454     def _execute(self, task):
455         # Invoke callback
456         task.status = TaskStatus.DONE
457
458         try:
459             task.result = task.callback()
460         except:
461             import traceback
462             err = traceback.format_exc()
463             task.result = err
464             task.status = TaskStatus.ERROR
465             
466             self._logger.error("Error occurred while executing task: %s" % err)
467
468             self._stop_scheduler()
469
470             # Propage error to the ParallelRunner
471             raise
472
473     def _stop_scheduler(self):
474         # Mark the EC as failed
475         self._state = ECState.FAILED
476
477         # Wake up the EC in case it was sleeping
478         self._cond.acquire()
479         self._cond.notify()
480         self._cond.release()
481
482