Updating discover and provision method
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
25 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
26 import threading
27 import subprocess
28
29 @clsinit_copy
30 class PlanetlabNode(LinuxNode):
31     _rtype = "PlanetlabNode"
32
33     _blacklist = list()
34     _in_provision = list()
35
36     _lock_bl = threading.Lock()
37     _lock_inpro = threading.Lock()
38
39     @classmethod
40     def blacklist(cls):
41         """ Returns the blacklisted nodes
42
43         """
44         return cls._blacklist
45
46     @classmethod
47     def in_provision(cls):
48         """ Returns the nodes that anohter RM is trying to provision
49
50         """
51         return cls._in_provision
52
53     @classmethod
54     def lock_bl(cls):
55         """ Returns the lock for the blacklist
56
57         """
58         return cls._lock_bl
59
60     @classmethod
61     def lock_inpro(cls):
62         """ Returns the lock for the provision list
63
64         """
65         return cls._lock_inpro
66
67
68     @classmethod
69     def _register_attributes(cls):
70         ip = Attribute("ip", "PlanetLab host public IP address",
71                 flags = Flags.ReadOnly)
72
73         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host (e.g. www.planet-lab.eu or www.planet-lab.org) ",
74                 default = "www.planet-lab.eu",
75                 flags = Flags.Credential)
76
77         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
78                 default = "https://%(hostname)s:443/PLCAPI/",
79                 flags = Flags.ExecReadOnly)
80     
81         pl_user = Attribute("pluser", "PlanetLab account user, as the one to authenticate in the website) ",
82                 flags = Flags.Credential)
83
84         pl_password = Attribute("password", "PlanetLab account password, as the one to authenticate in the website) ",
85                 flags = Flags.Credential)
86
87         city = Attribute("city",
88                 "Constrain location (city) during resource discovery. May use wildcards.",
89                 flags = Flags.Filter)
90
91         country = Attribute("country",
92                 "Constrain location (country) during resource discovery. May use wildcards.",
93                 flags = Flags.Filter)
94
95         region = Attribute("region",
96                 "Constrain location (region) during resource discovery. May use wildcards.",
97                 flags = Flags.Filter)
98
99         architecture = Attribute("architecture",
100                 "Constrain architecture during resource discovery.",
101                 type = Types.Enumerate,
102                 allowed = ["x86_64",
103                             "i386"],
104                 flags = Flags.Filter)
105
106         operating_system = Attribute("operatingSystem",
107                 "Constrain operating system during resource discovery.",
108                 type = Types.Enumerate,
109                 allowed =  ["f8",
110                             "f12",
111                             "f14",
112                             "centos",
113                             "other"],
114                 flags = Flags.Filter)
115
116         site = Attribute("site",
117                 "Constrain the PlanetLab site this node should reside on.",
118                 type = Types.Enumerate,
119                 allowed = ["PLE",
120                             "PLC",
121                             "PLJ"],
122                 flags = Flags.Filter)
123
124         min_reliability = Attribute("minReliability",
125                 "Constrain reliability while picking PlanetLab nodes. Specifies a lower acceptable bound.",
126                 type = Types.Double,
127                 range = (1, 100),
128                 flags = Flags.Filter)
129
130         max_reliability = Attribute("maxReliability",
131                 "Constrain reliability while picking PlanetLab nodes. Specifies an upper acceptable bound.",
132                 type = Types.Double,
133                 range = (1, 100),
134                 flags = Flags.Filter)
135
136         min_bandwidth = Attribute("minBandwidth",
137                 "Constrain available bandwidth while picking PlanetLab nodes. Specifies a lower acceptable bound.",
138                 type = Types.Double,
139                 range = (0, 2**31),
140                 flags = Flags.Filter)
141
142         max_bandwidth = Attribute("maxBandwidth",
143                 "Constrain available bandwidth while picking PlanetLab nodes. Specifies an upper acceptable bound.",
144                 type = Types.Double,
145                 range = (0, 2**31),
146                 flags = Flags.Filter)
147
148         min_load = Attribute("minLoad",
149                 "Constrain node load average while picking PlanetLab nodes. Specifies a lower acceptable bound.",
150                 type = Types.Double,
151                 range = (0, 2**31),
152                 flags = Flags.Filter)
153
154         max_load = Attribute("maxLoad",
155                 "Constrain node load average while picking PlanetLab nodes. Specifies an upper acceptable bound.",
156                 type = Types.Double,
157                 range = (0, 2**31),
158                 flags = Flags.Filter)
159
160         min_cpu = Attribute("minCpu",
161                 "Constrain available cpu time while picking PlanetLab nodes. Specifies a lower acceptable bound.",
162                 type = Types.Double,
163                 range = (0, 100),
164                 flags = Flags.Filter)
165
166         max_cpu = Attribute("maxCpu",
167                 "Constrain available cpu time while picking PlanetLab nodes. Specifies an upper acceptable bound.",
168                 type = Types.Double,
169                 range = (0, 100),
170                 flags = Flags.Filter)
171
172         timeframe = Attribute("timeframe",
173                 "Past time period in which to check information about the node. Values are year,month, week, latest",
174                 default = "week",
175                 type = Types.Enumerate,
176                 allowed = ["latest",
177                             "week",
178                             "month",
179                             "year"],
180                  flags = Flags.Filter)
181
182         cls._register_attribute(ip)
183         cls._register_attribute(pl_url)
184         cls._register_attribute(pl_ptn)
185         cls._register_attribute(pl_user)
186         cls._register_attribute(pl_password)
187         cls._register_attribute(site)
188         cls._register_attribute(city)
189         cls._register_attribute(country)
190         cls._register_attribute(region)
191         cls._register_attribute(architecture)
192         cls._register_attribute(operating_system)
193         cls._register_attribute(min_reliability)
194         cls._register_attribute(max_reliability)
195         cls._register_attribute(min_bandwidth)
196         cls._register_attribute(max_bandwidth)
197         cls._register_attribute(min_load)
198         cls._register_attribute(max_load)
199         cls._register_attribute(min_cpu)
200         cls._register_attribute(max_cpu)
201         cls._register_attribute(timeframe)
202         
203
204     def __init__(self, ec, guid):
205         super(PlanetlabNode, self).__init__(ec, guid)
206
207         self._plapi = None
208         self._node_to_provision = None
209     
210     @property
211     def plapi(self):
212         if not self._plapi:
213             pl_user = self.get("pluser")
214             pl_pass = self.get("password")
215             pl_url = self.get("plcApiUrl")
216             pl_ptn = self.get("plcApiPattern")
217
218             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
219                     pl_ptn)
220             
221         return self._plapi
222
223     def discoverl(self):
224         bl = PlanetlabNode.blacklist()
225         inpro = PlanetlabNode.in_provision()
226         lockbl = PlanetlabNode.lock_bl()
227         lockinpro = PlanetlabNode.lock_inpro()
228         hostname = self.get("hostname")
229         if hostname: 
230             node_id = self.check_alive_and_active(hostname=hostname)
231             if node_id not in bl and node_id not in inpro:
232                 try_other = self.do_ping(node_id)
233                 if try_other:
234                     lockbl.acquire()
235                     bl.append(node_id)
236                     lockbl.release()
237                     msg = "Node %s not alive, pick another node" % hostname
238                     raise RuntimeError, msg
239                 else:
240                     self._node_to_provision = node_id
241                     super(PlanetlabNode, self).discover()
242                     #self._discover_time = tnow()
243                     #self._state = ResourceState.DISCOVERED
244                     return node_id
245             else:
246                 msg = "Node %s not available for provisioning, pick another node" % hostname
247                 raise RuntimeError, msg
248                     
249         else:
250             from random import randint
251             nodes = self.filter_based_on_attributes()
252             nodes_alive = self.check_alive_and_active(nodes)
253             print nodes, nodes_alive
254             nodes_inslice = self.check_if_in_slice(nodes_alive)
255             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
256             if nodes_inslice:
257                 size = len(nodes_inslice)
258                 while size:
259                     size = size - 1
260                     index = randint(0, size)
261                     node_id = nodes_inslice[index]
262                     nodes_inslice[index] = nodes_inslice[size]
263                     if node_id not in bl and node_id not in inpro:
264                         try_other = self.do_ping(node_id)
265                         if not try_other:
266                             lockinpro.acquire()
267                             inpro.append(node_id)
268                             lockinpro.release()
269                             self._node_to_provision = node_id
270
271                             super(PlanetlabNode, self).discover()
272                             #self._discover_time = tnow()
273                             #self._state = ResourceState.DISCOVERED
274                             return node_id
275                         else:
276                             lockbl.acquire()
277                             bl.append(node_id)
278                             lockbl.release()
279
280             if nodes_not_inslice:
281                 size = len(nodes_not_inslice)
282                 while size:
283                     size = size - 1
284                     index = randint(0, size)
285                     node_id = nodes_not_inslice[index]
286                     nodes_not_inslice[index] = nodes_not_inslice[size]
287                     if node_id not in bl and node_id not in inpro:
288                         try_other = self.do_ping(node_id)
289                         if not try_other:
290                             lockinpro.acquire()
291                             inpro.append(node_id)
292                             lockinpro.release()
293                             self._node_to_provision = node_id
294                             
295                             super(PlanetlabNode, self).discover()
296                             #self._discover_time = tnow()
297                             #self._state = ResourceState.DISCOVERED
298                             return node_id
299                         else:
300                             lockbl.acquire()
301                             bl.append(node_id)
302                             lockbl.release()
303                 msg = "Not enough nodes available for provisioning"
304                 raise RuntimeError, msg
305
306                     
307
308     def provisionl(self):
309         import time
310         bl = PlanetlabNode.blacklist()
311         lockbl = PlanetlabNode.lock_bl()
312         provision_ok = False
313         ssh_ok = False
314         proc_ok = False
315         timeout = 1200
316         while not provision_ok:
317             slicename = self.get("username")
318             node = self._node_to_provision
319             ip = self.plapi.get_interfaces({'node_id':node}, fields=['ip'])
320             ip = ip[0]['ip']
321             print ip
322
323             self.plapi.add_slice_nodes(slicename, [node])
324             
325             t = 0 
326             while t < timeout and not ssh_ok:
327                 # check ssh connection
328                 command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'echo \'GOOD NODE\''" % (slicename, ip)
329                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) 
330                 stdout, stderr = p.communicate()
331                 if stdout.find("GOOD NODE") < 0:
332                     print t
333                     t = t + 60
334                     time.sleep(60)
335                     continue
336                 else:
337                     ssh_ok = True
338                     continue
339
340             if not ssh_ok:
341                 with lockbl:
342                     bl.append(node)
343                     print bl
344                     self.plapi.delete_slice_node(slicename, [node])
345                     self.discover()
346                 continue
347             
348             # check /proc directory
349             else: # ssh_ok:
350                 command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'mount |grep proc'" % (slicename, ip)
351                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
352                 stdout, stderr = p.communicate()
353                 if stdout.find("/proc type proc") < 0:
354                     lockbl.acquire()
355                     bl.append(node)
356                     lockbl.release()
357                     self.plapi.delete_slice_node(slicename, [node])
358                     self.discover()
359                     continue
360             
361                 else:
362                     provision_ok = True
363                     # set attributes ip, hostname
364                     self.set("ip", ip)
365  
366                     hostname = self.plapi.get_nodes(node, ['hostname'])
367                     self.set("hostname", hostname[0]['hostname'])
368                     print self.get("hostname")
369             
370         # call provision de linux node?
371         super(PlanetlabNode, self).provision()
372
373         
374     def filter_based_on_attributes(self):
375         # Map attributes with tagnames of PL
376         timeframe = self.get("timeframe")[0]
377         attr_to_tags = {
378             'city' : 'city',
379             'country' : 'country',
380             'region' : 'region',
381             'architecture' : 'arch',
382             'operatingSystem' : 'fcdistro',
383             #'site' : 'pldistro',
384             'minReliability' : 'reliability%s' % timeframe,
385             'maxReliability' : 'reliability%s' % timeframe,
386             'minBandwidth' : 'bw%s' % timeframe,
387             'maxBandwidth' : 'bw%s' % timeframe,
388             'minLoad' : 'load%s' % timeframe,
389             'maxLoad' : 'load%s' % timeframe,
390             'minCpu' : 'cpu%s' % timeframe,
391             'maxCpu' : 'cpu%s' % timeframe,
392         }
393         
394         nodes_id = []
395         filters = {}
396         for attr_name, attr_obj in self._attrs.iteritems():
397             attr_value = self.get(attr_name)
398             if attr_value is not None and attr_obj.flags == 8 and not 'min' in attr_name \
399                 and not 'max' in attr_name and attr_name != 'timeframe':
400                 attr_tag = attr_to_tags[attr_name]
401                 filters['tagname'] = attr_tag
402                 filters['value'] = attr_value
403                 node_tags = self.plapi.get_node_tags(filters)
404                 if node_tags is not None:
405                     if len(nodes_id) == 0:
406                         for node_tag in node_tags:
407                             nodes_id.append(node_tag['node_id'])
408                     else:
409                         nodes_id_tmp = []
410                         for node_tag in node_tags:
411                             if node_tag['node_id'] in nodes_id:
412                                 nodes_id_tmp.append(node_tag['node_id'])
413                         if len(nodes_id_tmp):
414                             nodes_id = set(nodes_id) & set(nodes_id_tmp)
415                         else:
416                             self.fail2()
417                 else:
418                     self.fail2()
419             elif attr_value is not None and attr_obj.flags == 8 and ('min' or 'max') in attr_name:
420                 attr_tag = attr_to_tags[attr_name]
421                 filters['tagname'] = attr_tag
422                 node_tags = self.plapi.get_node_tags(filters)
423                 if node_tags is not None:
424                     if len(nodes_id) == 0:
425                         for node_tag in node_tags:
426                             if 'min' in attr_name and node_tag['value'] != 'n/a' and \
427                                 float(node_tag['value']) > attr_value:
428                                 nodes_id.append(node_tag['node_id'])
429                             elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
430                                 float(node_tag['value']) < attr_value:
431                                 nodes_id.append(node_tag['node_id'])
432                     else:
433                         nodes_id_tmp = []
434                         for node_tag in node_tags:
435                             if 'min' in attr_name and node_tag['value'] != 'n/a' and \
436                                 float(node_tag['value']) > attr_value and \
437                                 node_tag['node_id'] in nodes_id:
438                                 nodes_id_tmp.append(node_tag['node_id'])
439                             elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
440                                 float(node_tag['value']) < attr_value and \
441                                 node_tag['node_id'] in nodes_id:
442                                 nodes_id_tmp.append(node_tag['node_id'])
443                         if len(nodes_id_tmp):
444                             nodes_id = set(nodes_id) & set(nodes_id_tmp)
445                         else:
446                             self.fail2()
447
448         return nodes_id
449                     
450     def check_alive_and_active(self, nodes_id=None, hostname=None):
451         if nodes_id is None and hostname is None:
452             msg = "Specify nodes_id or hostname"
453             raise RuntimeError, msg
454         if nodes_id is not None and hostname is not None:
455             msg = "Specify either nodes_id or hostname"
456             raise RuntimeError, msg
457
458         # check node alive
459         import time
460         filters = dict()
461         filters['run_level'] = 'boot'
462         filters['boot_state'] = 'boot'
463         filters['node_type'] = 'regular' 
464         filters['>last_contact'] =  int(time.time()) - 2*3600
465         if nodes_id:
466             filters['node_id'] = list(nodes_id)
467             alive_nodes_id = self.plapi.get_nodes(filters, fields=['node_id'])
468         elif hostname:
469             filters['hostname'] = hostname
470             alive_nodes_id = self.plapi.get_nodes(filters, fields=['node_id'])
471         if len(alive_nodes_id) == 0:
472             self.fail2()
473         else:
474             nodes_id = list()
475             for node_id in alive_nodes_id:
476                 nid = node_id['node_id']
477                 nodes_id.append(nid)
478             return nodes_id
479
480
481     def check_if_in_slice(self, nodes_id):
482         slicename = self.get("username")
483         slice_nodes = self.plapi.get_slice_nodes(slicename)
484         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
485         return nodes_inslice
486
487     def do_ping(self, node_id):
488         ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
489         ip = ip[0]['ip']
490         result = subprocess.call(["ping","-c","2",ip],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
491         if result == 0:
492             return False
493         elif result == 1 or result == 2:
494             return True
495
496
497     def fail2(self):
498         self.fail()
499         msg = "Discovery failed. No candidates found for node"
500         self.error(msg)
501         raise RuntimeError, msg
502            
503                         
504     def valid_connection(self, guid):
505         # TODO: Validate!
506         return True
507
508 #    def blacklist(self):
509 #        # TODO!!!!
510 #        self.warn(" Blacklisting malfunctioning node ")
511 #        #import util
512 #        #util.appendBlacklist(self.hostname)
513