Debug the deploy part and start with condition
[nepi.git] / src / neco / execution / resource.py
1
2 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
3
4 import copy
5 import functools
6 import logging
7 import weakref
8 import time as TIME
9
10 _reschedule_delay = "1s"
11
12 class ResourceAction:
13     START = 0
14     STOP = 1
15
16 class ResourceState:
17     NEW = 0
18     DEPLOYED = 1
19     STARTED = 2
20     STOPPED = 3
21     FAILED = 4
22     RELEASED = 5
23
24 def clsinit(cls):
25     cls._clsinit()
26     return cls
27
28 # Decorator to invoke class initialization method
29 @clsinit
30 class ResourceManager(object):
31     _rtype = "Resource"
32     _filters = None
33     _attributes = None
34
35     @classmethod
36     def _register_filter(cls, attr):
37         """ Resource subclasses will invoke this method to add a 
38         filter attribute
39
40         """
41         cls._filters[attr.name] = attr
42
43     @classmethod
44     def _register_attribute(cls, attr):
45         """ Resource subclasses will invoke this method to add a 
46         resource attribute
47
48         """
49         cls._attributes[attr.name] = attr
50
51     @classmethod
52     def _register_filters(cls):
53         """ Resource subclasses will invoke this method to add a 
54         filter attribute
55
56         """
57         pass
58
59     @classmethod
60     def _register_attributes(cls):
61         """ Resource subclasses will invoke this method to add a 
62         resource attribute
63
64         """
65         pass
66
67     @classmethod
68     def _clsinit(cls):
69         """ Create a new dictionnary instance of the dictionnary 
70         with the same template.
71  
72         Each ressource should have the same registration dictionary
73         template with different instances.
74         """
75         # static template for resource filters
76         cls._filters = dict()
77         cls._register_filters()
78
79         # static template for resource attributes
80         cls._attributes = dict()
81         cls._register_attributes()
82
83     @classmethod
84     def rtype(cls):
85         return cls._rtype
86
87     @classmethod
88     def get_filters(cls):
89         """ Returns a copy of the filters
90
91         """
92         return copy.deepcopy(cls._filters.values())
93
94     @classmethod
95     def get_attributes(cls):
96         """ Returns a copy of the attributes
97
98         """
99         return copy.deepcopy(cls._attributes.values())
100
101     def __init__(self, ec, guid):
102         self._guid = guid
103         self._ec = weakref.ref(ec)
104         self._connections = set()
105         self._conditions = dict() 
106
107         # the resource instance gets a copy of all attributes
108         # that can modify
109         self._attrs = copy.deepcopy(self._attributes)
110
111         self._state = ResourceState.NEW
112
113         self._start_time = None
114         self._stop_time = None
115
116         # Logging
117         self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % 
118             self.guid)
119
120     @property
121     def logger(self):
122         return self._logger
123
124     @property
125     def guid(self):
126         return self._guid
127
128     @property
129     def ec(self):
130         return self._ec()
131
132     @property
133     def connections(self):
134         return self._connections
135
136     @property
137     def conditions(self):
138         return self._conditions
139
140     @property
141     def start_time(self):
142         """ timestamp with  """
143         return self._start_time
144
145     @property
146     def stop_time(self):
147         return self._stop_time
148
149     @property
150     def state(self):
151         return self._state
152
153     def connect(self, guid):
154         if (self._validate_connection(guid)):
155             self._connections.add(guid)
156
157     def discover(self, filters = None):
158         pass
159
160     def provision(self, filters = None):
161         pass
162
163     def start(self):
164         """ Start the Resource Manager
165
166         """
167         if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
168             self.logger.error("Wrong state %s for start" % self.state)
169
170         self._start_time = strfnow()
171         self._state = ResourceState.STARTED
172
173     def stop(self):
174         """ Start the Resource Manager
175
176         """
177         if not self._state in [ResourceState.STARTED]:
178             self.logger.error("Wrong state %s for stop" % self.state)
179
180         self._stop_time = strfnow()
181         self._state = ResourceState.STOPPED
182
183     def set(self, name, value):
184         """ Set the value of the attribute
185
186         :param name: Name of the attribute
187         :type name: str
188         :param name: Value of the attribute
189         :type name: str
190         :rtype:  Boolean
191         """
192         attr = self._attrs[name]
193         attr.value = value
194
195     def get(self, name):
196         """ Start the Resource Manager
197
198         :param name: Name of the attribute
199         :type name: str
200         :rtype: str
201         """
202         attr = self._attrs[name]
203         return attr.value
204
205     def register_condition(self, action, group, state, 
206             time = None):
207         """ Do the 'action' after 'time' on the current RM when 'group' 
208          reach the state 'state'
209
210         :param action: Action to do. Either 'START' or 'STOP'
211         :type action: str
212         :param group: group of RM
213         :type group: str
214         :param state: RM that are part of the condition
215         :type state: list
216         :param time: Time to wait after the state is reached (ex : '2s' )
217         :type time: str
218
219         """
220         if action not in self.conditions:
221             self._conditions[action] = set()
222
223         # We need to use only sequence inside a set and not a list. 
224         # As group is a list, we need to change it.
225         #print (tuple(group), state, time)
226         self.conditions.get(action).add((tuple(group), state, time))
227
228     def _needs_reschedule(self, group, state, time):
229         """ Internal method that verify if 'time' has elapsed since 
230         all elements in 'group' have reached state 'state'.
231
232         :param group: RM that are part of the condition
233         :type group: list
234         :param state: State that group need to reach for the condtion
235         :type state: str
236         :param time: time to wait after the state
237         :type time: str
238
239
240         """
241         reschedule = False
242         delay = _reschedule_delay 
243
244         # check state and time elapsed on all RMs
245         for guid in group:
246             rm = self.ec.get_resource(guid)
247             # If the RMs is lower than the requested state we must
248             # reschedule (e.g. if RM is DEPLOYED but we required STARTED)
249             if rm.state < state:
250                 reschedule = True
251                 break
252
253             if time:
254                 if state == ResourceState.STARTED:
255                     t = rm.start_time
256                 elif state == ResourceState.STOPPED:
257                     t = rm.stop_time
258                 else:
259                     # Only keep time information for START and STOP
260                     break
261
262                 d = strfdiff(strfnow(), t) 
263                 if d < time:
264                     reschedule = True
265                     delay = "%ds" % (int(time - d) +1)
266                     break
267         return reschedule, delay
268
269     def set_with_conditions(self, name, value, group, state, time):
270         """ Set value 'value' on attribute with name 'name' when 'time' 
271             has elapsed since all elements in 'group' have reached state
272            'state'.
273
274         :param name: Name of the attribute
275         :type name: str
276         :param name: Value of the attribute
277         :type name: str
278         :param group: RM that are part of the condition
279         :type group: list
280         :param state: State that group need to reach before set
281         :type state: str
282         :param time: Time to wait after the state is reached (ex : '2s' )
283         :type time: str
284
285         """
286
287         reschedule = False
288         delay = _reschedule_delay 
289
290         ## evaluate if set conditions are met
291
292         # only can set with conditions after the RM is started
293         if self.state != ResourceState.STARTED:
294             reschedule = True
295         else:
296             reschedule, delay = self._needs_reschedule(group, state, time)
297
298         if reschedule:
299             callback = functools.partial(self.set_with_conditions, 
300                     name, value, group, state, time)
301             self.ec.schedule(delay, callback)
302         else:
303             self.set(name, value)
304
305     def start_with_conditions(self):
306         """ Starts when all the conditions are reached
307
308         """
309         reschedule = False
310         delay = _reschedule_delay 
311
312         ## evaluate if set conditions are met
313
314         # only can start when RM is either STOPPED or DEPLOYED
315         if self.state not in [ResourceState.STOPPED, ResourceState.DEPLOYED]:
316             reschedule = True
317         else:
318             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") -----  start condition : " + str(self.conditions.items())
319             # Need to separate because it could have more that tuple of condition 
320             # for the same action.
321             if self.conditions.get(ResourceAction.START): 
322                 for (group, state, time) in self.conditions.get(ResourceAction.START):
323                     reschedule, delay = self._needs_reschedule(group, state, time)
324                     if reschedule:
325                         break
326
327         if reschedule:
328             callback = functools.partial(self.start_with_conditions)
329             self.ec.schedule(delay, callback)
330         else:
331             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
332 ------------------------------------------------------------------------------\
333 ----------------------------------------------------------------  STARTING -- "
334             self.start()
335
336     def stop_with_conditions(self):
337         """ Starts when all the conditions are reached
338
339         """
340         reschedule = False
341         delay = _reschedule_delay 
342
343         ## evaluate if set conditions are met
344
345         # only can stop when RM is STARTED
346         if self.state != ResourceState.STARTED:
347             reschedule = True
348         else:
349             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +")  ----  stop condition : " + str(self.conditions.items())
350             # Need to separate because it could have more that tuple of condition 
351             # for the same action.
352             conditions =  self.conditions.get(ResourceAction.STOP, []) 
353             for (group, state, time) in conditions:
354                 reschedule, delay = self._needs_reschedule(group, state, time)
355                 if reschedule:
356                     break
357
358         #else:
359         #    for action, (group, state, time) in self.conditions.iteritems():
360         #        if action == ResourceAction.STOP:
361         #            reschedule, delay = self._needs_reschedule(group, state, time)   
362         #            if reschedule:
363         #                break
364
365         if reschedule:
366             callback = functools.partial(self.stop_with_conditions)
367             self.ec.schedule(delay, callback)
368         else:
369             self.stop()
370
371     def deploy(self):
372         """Execute all the differents steps required to reach the state DEPLOYED
373
374         """
375         self.discover()
376         self.provision()
377         self._state = ResourceState.DEPLOYED
378
379     def release(self):
380         """Clean the resource at the end of the Experiment and change the status
381
382         """
383         self._state = ResourceState.RELEASED
384
385     def _validate_connection(self, guid):
386         """Check if the connection is available.
387
388         :param guid: Guid of the current Resource Manager
389         :type guid: int
390         :rtype:  Boolean
391
392         """
393         # TODO: Validate!
394         return True
395
396 class ResourceFactory(object):
397     _resource_types = dict()
398
399     @classmethod
400     def resource_types(cls):
401         return cls._resource_types
402
403     @classmethod
404     def register_type(cls, rclass):
405         cls._resource_types[rclass.rtype()] = rclass
406
407     @classmethod
408     def create(cls, rtype, ec, guid):
409         rclass = cls._resource_types[rtype]
410         return rclass(ec, guid)
411