Adding new content to Planetlab node discover method
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
25 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
26 import threading
27
28 @clsinit_copy
29 class PlanetlabNode(LinuxNode):
30     _rtype = "PlanetlabNode"
31
32     _blacklist = list()
33     _in_provision = list()
34
35     _lock_bl = threading.Lock()
36     _lock_inpro = threading.Lock()
37
38     @classmethod
39     def blacklist(cls):
40         """ Returns the blacklisted nodes
41
42         """
43         return cls._blacklist
44
45     @classmethod
46     def in_provision(cls):
47         """ Returns the nodes that anohter RM is trying to provision
48
49         """
50         return cls._in_provision
51
52     @classmethod
53     def lock_bl(cls):
54         """ Returns the lock for the blacklist
55
56         """
57         return cls._lock_bl
58
59     @classmethod
60     def lock_inpro(cls):
61         """ Returns the lock for the provision list
62
63         """
64         return cls._lock_inpro
65
66
67     @classmethod
68     def _register_attributes(cls):
69         ip = Attribute("ip", "PlanetLab host public IP address",
70                 flags = Flags.ReadOnly)
71
72         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host (e.g. www.planet-lab.eu or www.planet-lab.org) ",
73                 default = "www.planet-lab.eu",
74                 flags = Flags.Credential)
75
76         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
77                 default = "https://%(hostname)s:443/PLCAPI/",
78                 flags = Flags.ExecReadOnly)
79     
80         pl_user = Attribute("pluser", "PlanetLab account user, as the one to authenticate in the website) ",
81                 flags = Flags.Credential)
82
83         pl_password = Attribute("password", "PlanetLab account password, as the one to authenticate in the website) ",
84                 flags = Flags.Credential)
85
86         city = Attribute("city",
87                 "Constrain location (city) during resource discovery. May use wildcards.",
88                 flags = Flags.Filter)
89
90         country = Attribute("country",
91                 "Constrain location (country) during resource discovery. May use wildcards.",
92                 flags = Flags.Filter)
93
94         region = Attribute("region",
95                 "Constrain location (region) during resource discovery. May use wildcards.",
96                 flags = Flags.Filter)
97
98         architecture = Attribute("architecture",
99                 "Constrain architecture during resource discovery.",
100                 type = Types.Enumerate,
101                 allowed = ["x86_64",
102                             "i386"],
103                 flags = Flags.Filter)
104
105         operating_system = Attribute("operatingSystem",
106                 "Constrain operating system during resource discovery.",
107                 type = Types.Enumerate,
108                 allowed =  ["f8",
109                             "f12",
110                             "f14",
111                             "centos",
112                             "other"],
113                 flags = Flags.Filter)
114
115         site = Attribute("site",
116                 "Constrain the PlanetLab site this node should reside on.",
117                 type = Types.Enumerate,
118                 allowed = ["PLE",
119                             "PLC",
120                             "PLJ"],
121                 flags = Flags.Filter)
122
123         min_reliability = Attribute("minReliability",
124                 "Constrain reliability while picking PlanetLab nodes. Specifies a lower acceptable bound.",
125                 type = Types.Double,
126                 range = (1, 100),
127                 flags = Flags.Filter)
128
129         max_reliability = Attribute("maxReliability",
130                 "Constrain reliability while picking PlanetLab nodes. Specifies an upper acceptable bound.",
131                 type = Types.Double,
132                 range = (1, 100),
133                 flags = Flags.Filter)
134
135         min_bandwidth = Attribute("minBandwidth",
136                 "Constrain available bandwidth while picking PlanetLab nodes. Specifies a lower acceptable bound.",
137                 type = Types.Double,
138                 range = (0, 2**31),
139                 flags = Flags.Filter)
140
141         max_bandwidth = Attribute("maxBandwidth",
142                 "Constrain available bandwidth while picking PlanetLab nodes. Specifies an upper acceptable bound.",
143                 type = Types.Double,
144                 range = (0, 2**31),
145                 flags = Flags.Filter)
146
147         min_load = Attribute("minLoad",
148                 "Constrain node load average while picking PlanetLab nodes. Specifies a lower acceptable bound.",
149                 type = Types.Double,
150                 range = (0, 2**31),
151                 flags = Flags.Filter)
152
153         max_load = Attribute("maxLoad",
154                 "Constrain node load average while picking PlanetLab nodes. Specifies an upper acceptable bound.",
155                 type = Types.Double,
156                 range = (0, 2**31),
157                 flags = Flags.Filter)
158
159         min_cpu = Attribute("minCpu",
160                 "Constrain available cpu time while picking PlanetLab nodes. Specifies a lower acceptable bound.",
161                 type = Types.Double,
162                 range = (0, 100),
163                 flags = Flags.Filter)
164
165         max_cpu = Attribute("maxCpu",
166                 "Constrain available cpu time while picking PlanetLab nodes. Specifies an upper acceptable bound.",
167                 type = Types.Double,
168                 range = (0, 100),
169                 flags = Flags.Filter)
170
171         timeframe = Attribute("timeframe",
172                 "Past time period in which to check information about the node. Values are year,month, week, latest",
173                 default = "week",
174                 type = Types.Enumerate,
175                 allowed = ["latest",
176                             "week",
177                             "month",
178                             "year"],
179                  flags = Flags.Filter)
180
181         cls._register_attribute(ip)
182         cls._register_attribute(pl_url)
183         cls._register_attribute(pl_ptn)
184         cls._register_attribute(pl_user)
185         cls._register_attribute(pl_password)
186         cls._register_attribute(site)
187         cls._register_attribute(city)
188         cls._register_attribute(country)
189         cls._register_attribute(region)
190         cls._register_attribute(architecture)
191         cls._register_attribute(operating_system)
192         cls._register_attribute(min_reliability)
193         cls._register_attribute(max_reliability)
194         cls._register_attribute(min_bandwidth)
195         cls._register_attribute(max_bandwidth)
196         cls._register_attribute(min_load)
197         cls._register_attribute(max_load)
198         cls._register_attribute(min_cpu)
199         cls._register_attribute(max_cpu)
200         cls._register_attribute(timeframe)
201         
202
203     def __init__(self, ec, guid):
204         super(PlanetlabNode, self).__init__(ec, guid)
205
206         self._plapi = None
207     
208     @property
209     def plapi(self):
210         if not self._plapi:
211             pl_user = self.get("pluser")
212             pl_pass = self.get("password")
213             pl_url = self.get("plcApiUrl")
214             pl_ptn = self.get("plcApiPattern")
215
216             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
217                     pl_ptn)
218             
219         return self._plapi
220
221     def discoveri(self):
222         bl = PlanetlabNode.blacklist()
223         inpro = PlanetlabNode.in_provision()
224         lockbl = PlanetlabNode.lock_bl()
225         lockinpro = PlanetlabNode.lock_inpro()
226         hostname = self.get("hostname")
227         if hostname: 
228             node_id = self.check_alive_and_active(hostname=hostname)
229             if node_id not in bl and node_id not in inpro:
230                 try_other = self.do_ping(node_id)
231                 if try_other:
232                     lockbl.acquire()
233                     bl.append(node_id)
234                     lockbl.release()
235                     msg = "Node %s not alive, pick another node" % hostname
236                     raise RuntimeError, msg
237                 else:
238                     self._discover_time = tnow()
239                     self._state = ResourceState.DISCOVERED
240                     return node_id
241             else:
242                 msg = "Node %s not available for provisioning, pick another node" % hostname
243                 raise RuntimeError, msg
244                     
245         else:
246             from random import randint
247             nodes = self.filter_based_on_attributes()
248             nodes_alive = self.check_alive_and_active(nodes)
249             nodes_inslice = self.check_if_in_slice(nodes_alive)
250             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
251             if nodes_inslice:
252                 size = len(nodes_inslice)
253                 while size:
254                     size = size - 1
255                     index = randint(0, size)
256                     node_id = nodes_inslice[index]
257                     nodes_inslice[index] = nodes_inslice[size]
258                     if node_id not in bl and node_id not in inpro:
259                         try_other = self.do_ping(node_id)
260                         if not try_other:
261                             lockinpro.acquire()
262                             inpro.append(node_id)
263                             lockinpro.release()
264                             self._discover_time = tnow()
265                             self._state = ResourceState.DISCOVERED
266                             return node_id
267                         else:
268                             lockbl.acquire()
269                             bl.append(node_id)
270                             lockbl.release()
271
272             if nodes_not_inslice:
273                 size = len(nodes_not_inslice)
274                 while size:
275                     size = size - 1
276                     index = randint(0, size)
277                     node_id = nodes_not_inslice[index]
278                     nodes_not_inslice[index] = nodes_not_inslice[size]
279                     if node_id not in bl and node_id not in inpro:
280                         try_other = self.do_ping(node_id)
281                         if not try_other:
282                             lockinpro.acquire()
283                             inpro.append(node_id)
284                             lockinpro.release()
285                             self._discover_time = tnow()
286                             self._state = ResourceState.DISCOVERED
287                             return node_id
288                         else:
289                             lockbl.acquire()
290                             bl.append(node_id)
291                             lockbl.release()
292                 msg = "Not enough nodes available for provisioning"
293                 raise RuntimeError, msg
294
295                     
296
297     #def provision(self):
298         #de hostname para que provision haga add_node_slice, check que ip coincide con hostname
299         #command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'echo \'GOOD NODE\''" % (pl_slice, hostname p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) stdout, stderr = p.communicate() if stdout.find("GOOD NODE") < 0: continue
300
301
302
303     def filter_based_on_attributes(self):
304         # Map attributes with tagnames of PL
305         timeframe = self.get("timeframe")[0]
306         attr_to_tags = {
307             'city' : 'city',
308             'country' : 'country',
309             'region' : 'region',
310             'architecture' : 'arch',
311             'operatingSystem' : 'fcdistro',
312             #'site' : 'pldistro',
313             'minReliability' : 'reliability%s' % timeframe,
314             'maxReliability' : 'reliability%s' % timeframe,
315             'minBandwidth' : 'bw%s' % timeframe,
316             'maxBandwidth' : 'bw%s' % timeframe,
317             'minLoad' : 'load%s' % timeframe,
318             'maxLoad' : 'load%s' % timeframe,
319             'minCpu' : 'cpu%s' % timeframe,
320             'maxCpu' : 'cpu%s' % timeframe,
321         }
322         
323         nodes_id = []
324         filters = {}
325         for attr_name, attr_obj in self._attrs.iteritems():
326             attr_value = self.get(attr_name)
327             if attr_value is not None and attr_obj.flags == 8 and not 'min' in attr_name \
328                 and not 'max' in attr_name and attr_name != 'timeframe':
329                 attr_tag = attr_to_tags[attr_name]
330                 filters['tagname'] = attr_tag
331                 filters['value'] = attr_value
332                 node_tags = self.plapi.get_node_tags(filters)
333                 if node_tags is not None:
334                     if len(nodes_id) == 0:
335                         for node_tag in node_tags:
336                             nodes_id.append(node_tag['node_id'])
337                     else:
338                         nodes_id_tmp = []
339                         for node_tag in node_tags:
340                             if node_tag['node_id'] in nodes_id:
341                                 nodes_id_tmp.append(node_tag['node_id'])
342                         if len(nodes_id_tmp):
343                             nodes_id = set(nodes_id) & set(nodes_id_tmp)
344                         else:
345                             self.fail()
346                 else:
347                     self.fail()
348             elif attr_value is not None and attr_obj.flags == 8 and ('min' or 'max') in attr_name:
349                 attr_tag = attr_to_tags[attr_name]
350                 filters['tagname'] = attr_tag
351                 node_tags = self.plapi.get_node_tags(filters)
352                 if node_tags is not None:
353                     if len(nodes_id) == 0:
354                         for node_tag in node_tags:
355                             if 'min' in attr_name and node_tag['value'] != 'n/a' and \
356                                 float(node_tag['value']) > attr_value:
357                                 nodes_id.append(node_tag['node_id'])
358                             elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
359                                 float(node_tag['value']) < attr_value:
360                                 nodes_id.append(node_tag['node_id'])
361                     else:
362                         nodes_id_tmp = []
363                         for node_tag in node_tags:
364                             if 'min' in attr_name and node_tag['value'] != 'n/a' and \
365                                 float(node_tag['value']) > attr_value and \
366                                 node_tag['node_id'] in nodes_id:
367                                 nodes_id_tmp.append(node_tag['node_id'])
368                             elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
369                                 float(node_tag['value']) < attr_value and \
370                                 node_tag['node_id'] in nodes_id:
371                                 nodes_id_tmp.append(node_tag['node_id'])
372                         if len(nodes_id_tmp):
373                             nodes_id = set(nodes_id) & set(nodes_id_tmp)
374                         else:
375                             self.fail()
376
377         return nodes_id
378                     
379     def check_alive_and_active(self, nodes_id=None, hostname=None):
380         if nodes_id is None and hostname is None:
381             msg = "Specify nodes_id or hostname"
382             raise RuntimeError, msg
383         if nodes_id is not None and hostname is not None:
384             msg = "Specify either nodes_id or hostname"
385             raise RuntimeError, msg
386
387         # check node alive
388         import time
389         filters = dict()
390         filters['run_level'] = 'boot'
391         filters['boot_state'] = 'boot'
392         filters['node_type'] = 'regular' 
393         filters['>last_contact'] =  int(time.time()) - 2*3600
394         if nodes_id:
395             filters['node_id'] = list(nodes_id)
396             alive_nodes_id = self.plapi.get_nodes(filters, fields=['node_id'])
397         elif hostname:
398             filters['hostname'] = hostname
399             alive_nodes_id = self.plapi.get_nodes(filters, fields=['node_id'])
400         if len(alive_nodes_id) == 0:
401             self.fail()
402         else:
403             nodes_id = list()
404             for node_id in alive_nodes_id:
405                 nid = node_id['node_id']
406                 nodes_id.append(nid)
407             return nodes_id
408
409
410     def check_if_in_slice(self, nodes_id):
411         slicename = self.get("username")
412         slice_nodes = self.plapi.get_slice_nodes(slicename)
413         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
414         return nodes_inslice
415
416     def do_ping(self, node_id):
417         ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
418         ip = ip[0]['ip']
419         import subprocess
420         result = subprocess.call(["ping","-c","2",ip],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
421         if result == 0:
422             return False
423         elif result == 1 or result == 2:
424             return True
425
426
427     def fail(self):
428         msg = "Discovery failed. No candidates found for node"
429         self.error(msg)
430         raise RuntimeError, msg
431            
432                         
433     def valid_connection(self, guid):
434         # TODO: Validate!
435         return True
436
437 #    def blacklist(self):
438 #        # TODO!!!!
439 #        self.warn(" Blacklisting malfunctioning node ")
440 #        #import util
441 #        #util.appendBlacklist(self.hostname)
442