Adding start_with_condition, stop_with_condition and set_with_condition to the EC...
[nepi.git] / src / neco / execution / resource.py
1
2 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
3
4 import copy
5 import functools
6 import logging
7 import weakref
8
9 _reschedule_delay = "1s"
10
11 class ResourceAction:
12     START = 0
13     STOP = 1
14
15 class ResourceState:
16     NEW = 0
17     DEPLOYED = 1
18     STARTED = 2
19     STOPPED = 3
20     FAILED = 4
21     RELEASED = 5
22
23 def clsinit(cls):
24     cls._clsinit()
25     return cls
26
27 # Decorator to invoke class initialization method
28 @clsinit
29 class ResourceManager(object):
30     _rtype = "Resource"
31     _filters = None
32     _attributes = None
33
34     @classmethod
35     def _register_filter(cls, attr):
36         """ Resource subclasses will invoke this method to add a 
37         filter attribute"""
38         cls._filters[attr.name] = attr
39
40     @classmethod
41     def _register_attribute(cls, attr):
42         """ Resource subclasses will invoke this method to add a 
43         resource attribute"""
44         cls._attributes[attr.name] = attr
45
46     @classmethod
47     def _register_filters(cls):
48         """ Resource subclasses will invoke this method to add a 
49         filter attribute"""
50         pass
51
52     @classmethod
53     def _register_attributes(cls):
54         """ Resource subclasses will invoke this method to add a 
55         resource attribute"""
56         pass
57
58     @classmethod
59     def _clsinit(cls):
60         # static template for resource filters
61         cls._filters = dict()
62         cls._register_filters()
63
64         # static template for resource attributes
65         cls._attributes = dict()
66         cls._register_attributes()
67
68     @classmethod
69     def rtype(cls):
70         return cls._rtype
71
72     @classmethod
73     def get_filters(cls):
74         return copy.deepcopy(cls._filters.values())
75
76     @classmethod
77     def get_attributes(cls):
78         return copy.deepcopy(cls._attributes.values())
79
80     def __init__(self, ec, guid):
81         self._guid = guid
82         self._ec = weakref.ref(ec)
83         self._connections = set()
84         self._conditions = dict() 
85
86         # the resource instance gets a copy of all attributes
87         # that can modify
88         self._attrs = copy.deepcopy(self._attributes)
89
90         self._state = ResourceState.NEW
91
92         self._start_time = None
93         self._stop_time = None
94
95         # Logging
96         self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % 
97             self.guid)
98
99     @property
100     def logger(self):
101         return self._logger
102
103     @property
104     def guid(self):
105         return self._guid
106
107     @property
108     def ec(self):
109         return self._ec()
110
111     @property
112     def connections(self):
113         return self._connections
114
115     @property
116     def conditons(self):
117         return self._conditions
118
119     @property
120     def start_time(self):
121         """ timestamp with  """
122         return self._start_time
123
124     @property
125     def stop_time(self):
126         return self._stop_time
127
128     @property
129     def state(self):
130         return self._state
131
132     def connect(self, guid):
133         if (self._validate_connection(guid)):
134             self._connections.add(guid)
135
136     def discover(self, filters = None):
137         pass
138
139     def provision(self, filters = None):
140         pass
141
142     def start(self):
143         if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
144             self.logger.error("Wrong state %s for start" % self.state)
145
146         self._start_time = strfnow()
147         self._state = ResourceState.STARTED
148
149     def stop(self):
150         if not self._state in [ResourceState.STARTED]:
151             self.logger.error("Wrong state %s for stop" % self.state)
152
153         self._stop_time = strfnow()
154         self._state = ResourceState.STOPPED
155
156     def set(self, name, value):
157         attr = self._attrs[name]
158         attr.value = value
159
160     def get(self, name):
161         attr = self._attrs[name]
162         return attr.value
163
164     def register_condition(self, action, group, state, 
165             time = None):
166         if action not in self.conditions:
167             self._conditions[action] = set()
168
169         self.conditions.get(action).add((group, state, time))
170
171     def _needs_reschedule(self, group, state, time):
172         reschedule = False
173         delay = _reschedule_delay 
174
175         # check state and time elapsed on all RMs
176         for guid in group:
177             rm = self.ec.get_resource(guid)
178             # If the RMs is lower than the requested state we must
179             # reschedule (e.g. if RM is DEPLOYED but we required STARTED)
180             if rm.state < state:
181                 reschedule = True
182                 break
183
184             if time:
185                 if state == ResourceAction.START:
186                     t = rm.start_time
187                 elif state == ResourceAction.STOP:
188                     t = rm.stop_time
189                 else:
190                     # Only keep time information for START and STOP
191                     break
192
193                 delay = strfdiff(t, strnow()) 
194                 if delay < time:
195                     reschedule = True
196                     break
197
198         return reschedule, delay
199
200     def set_with_conditions(self, name, value, group, state, time):
201         reschedule = False
202         delay = _reschedule_delay 
203
204         ## evaluate if set conditions are met
205
206         # only can set with conditions after the RM is started
207         if self.status != ResourceStatus.STARTED:
208             reschedule = True
209         else:
210             reschedule, delay = self._needs_reschedule(group, state, time)
211
212         if reschedule:
213             callback = functools.partial(self.set_with_conditions, 
214                     name, value, group, state, time)
215             self.ec.schedule(delay, callback)
216         else:
217             self.set(name, value)
218
219     def start_with_conditions(self):
220         reschedule = False
221         delay = _reschedule_delay 
222
223         ## evaluate if set conditions are met
224
225         # only can start when RM is either STOPPED or DEPLOYED
226         if self.status not in [ResourceStatus.STOPPED, ResourceStatus.DEPLOYED]:
227             reschedule = True
228         else:
229             for action, (group, state, time) in self.conditions.iteritems():
230                 if action == ResourceAction.START:
231                     reschedule, delay = self._needs_reschedule(group, state, time)   
232                     if reschedule:
233                         break
234
235         if reschedule:
236             callback = functools.partial(self.start_with_conditions, 
237                     group, state, time)
238             self.ec.schedule(delay, callback)
239         else:
240             self.start()
241
242     def stop_with_conditions(self):
243         reschedule = False
244         delay = _reschedule_delay 
245
246         ## evaluate if set conditions are met
247
248         # only can start when RM is either STOPPED or DEPLOYED
249         if self.status != ResourceStatus.STARTED:
250             reschedule = True
251         else:
252             for action, (group, state, time) in self.conditions.iteritems():
253                 if action == ResourceAction.STOP:
254                     reschedule, delay = self._needs_reschedule(group, state, time)   
255                     if reschedule:
256                         break
257
258         if reschedule:
259             callback = functools.partial(self.stop_with_conditions, 
260                     group, state, time)
261             self.ec.schedule(delay, callback)
262         else:
263             self.stop()
264
265     def deploy(self):
266         self.discover()
267         self.provision()
268         self._state = ResourceState.DEPLOYED
269
270     def release(self):
271         self._state = ResourceState.RELEASED
272
273     def _validate_connection(self, guid):
274         # TODO: Validate!
275         return True
276
277 class ResourceFactory(object):
278     _resource_types = dict()
279
280     @classmethod
281     def resource_types(cls):
282         return cls._resource_types
283
284     @classmethod
285     def register_type(cls, rclass):
286         cls._resource_types[rclass.rtype()] = rclass
287
288     @classmethod
289     def create(cls, rtype, ec, guid):
290         rclass = cls._resource_types[rtype]
291         return rclass(ec, guid)
292