Fixing bug for ip attr PL node
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import socket
32 import threading
33 import datetime
34
35 @clsinit_copy
36 class PlanetlabNode(LinuxNode):
37     _rtype = "PlanetlabNode"
38     _help = "Controls a PlanetLab host accessible using a SSH key " \
39             "associated to a PlanetLab user account"
40     _backend = "planetlab"
41
42     lock = threading.Lock()
43
44     @classmethod
45     def _register_attributes(cls):
46         ip = Attribute("ip", "PlanetLab host public IP address",
47                 flags = Flags.ReadOnly)
48
49         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
50                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
51                     default = "www.planet-lab.eu",
52                     flags = Flags.Credential)
53
54         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
55                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
56                     default = "https://%(hostname)s:443/PLCAPI/",
57                     flags = Flags.ExecReadOnly)
58     
59         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
60                     authenticate in the website) ",
61                     flags = Flags.Credential)
62
63         pl_password = Attribute("plpassword", 
64                         "PlanetLab account password, as \
65                         the one to authenticate in the website) ",
66                         flags = Flags.Credential)
67
68         city = Attribute("city", "Constrain location (city) during resource \
69                 discovery. May use wildcards.",
70                 flags = Flags.Filter)
71
72         country = Attribute("country", "Constrain location (country) during \
73                     resource discovery. May use wildcards.",
74                     flags = Flags.Filter)
75
76         region = Attribute("region", "Constrain location (region) during \
77                     resource discovery. May use wildcards.",
78                     flags = Flags.Filter)
79
80         architecture = Attribute("architecture", "Constrain architecture \
81                         during resource discovery.",
82                         type = Types.Enumerate,
83                         allowed = ["x86_64", 
84                                     "i386"],
85                         flags = Flags.Filter)
86
87         operating_system = Attribute("operatingSystem", "Constrain operating \
88                             system during resource discovery.",
89                             type = Types.Enumerate,
90                             allowed =  ["f8",
91                                         "f12",
92                                         "f14",
93                                         "centos",
94                                         "other"],
95                             flags = Flags.Filter)
96
97         #site = Attribute("site", "Constrain the PlanetLab site this node \
98         #        should reside on.",
99         #        type = Types.Enumerate,
100         #        allowed = ["PLE",
101         #                    "PLC",
102         #                    "PLJ"],
103         #        flags = Flags.Filter)
104
105         min_reliability = Attribute("minReliability", "Constrain reliability \
106                             while picking PlanetLab nodes. Specifies a lower \
107                             acceptable bound.",
108                             type = Types.Double,
109                             range = (1, 100),
110                             flags = Flags.Filter)
111
112         max_reliability = Attribute("maxReliability", "Constrain reliability \
113                             while picking PlanetLab nodes. Specifies an upper \
114                             acceptable bound.",
115                             type = Types.Double,
116                             range = (1, 100),
117                             flags = Flags.Filter)
118
119         min_bandwidth = Attribute("minBandwidth", "Constrain available \
120                             bandwidth while picking PlanetLab nodes. \
121                             Specifies a lower acceptable bound.",
122                             type = Types.Double,
123                             range = (0, 2**31),
124                             flags = Flags.Filter)
125
126         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
127                             bandwidth while picking PlanetLab nodes. \
128                             Specifies an upper acceptable bound.",
129                             type = Types.Double,
130                             range = (0, 2**31),
131                             flags = Flags.Filter)
132
133         min_load = Attribute("minLoad", "Constrain node load average while \
134                     picking PlanetLab nodes. Specifies a lower acceptable \
135                     bound.",
136                     type = Types.Double,
137                     range = (0, 2**31),
138                     flags = Flags.Filter)
139
140         max_load = Attribute("maxLoad", "Constrain node load average while \
141                     picking PlanetLab nodes. Specifies an upper acceptable \
142                     bound.",
143                     type = Types.Double,
144                     range = (0, 2**31),
145                     flags = Flags.Filter)
146
147         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
148                     picking PlanetLab nodes. Specifies a lower acceptable \
149                     bound.",
150                     type = Types.Double,
151                     range = (0, 100),
152                     flags = Flags.Filter)
153
154         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
155                     picking PlanetLab nodes. Specifies an upper acceptable \
156                     bound.",
157                     type = Types.Double,
158                     range = (0, 100),
159                     flags = Flags.Filter)
160
161         timeframe = Attribute("timeframe", "Past time period in which to check\
162                         information about the node. Values are year,month, \
163                         week, latest",
164                         default = "week",
165                         type = Types.Enumerate,
166                         allowed = ["latest",
167                                     "week",
168                                     "month",
169                                     "year"],
170                         flags = Flags.Filter)
171
172         cls._register_attribute(ip)
173         cls._register_attribute(pl_url)
174         cls._register_attribute(pl_ptn)
175         cls._register_attribute(pl_user)
176         cls._register_attribute(pl_password)
177         #cls._register_attribute(site)
178         cls._register_attribute(city)
179         cls._register_attribute(country)
180         cls._register_attribute(region)
181         cls._register_attribute(architecture)
182         cls._register_attribute(operating_system)
183         cls._register_attribute(min_reliability)
184         cls._register_attribute(max_reliability)
185         cls._register_attribute(min_bandwidth)
186         cls._register_attribute(max_bandwidth)
187         cls._register_attribute(min_load)
188         cls._register_attribute(max_load)
189         cls._register_attribute(min_cpu)
190         cls._register_attribute(max_cpu)
191         cls._register_attribute(timeframe)
192
193     def __init__(self, ec, guid):
194         super(PlanetlabNode, self).__init__(ec, guid)
195
196         self._plapi = None
197         self._node_to_provision = None
198         self._slicenode = False
199
200     def _skip_provision(self):
201         pl_user = self.get("pluser")
202         pl_pass = self.get("plpassword")
203         if not pl_user and not pl_pass:
204             return True
205         else: return False
206     
207     @property
208     def plapi(self):
209         if not self._plapi:
210             pl_user = self.get("pluser")
211             pl_pass = self.get("plpassword")
212             pl_url = self.get("plcApiUrl")
213             pl_ptn = self.get("plcApiPattern")
214
215             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
216                 pl_ptn)
217             
218             if not self._plapi:
219                 self.fail_plapi()
220
221         return self._plapi
222
223     def do_discover(self):
224         """
225         Based on the attributes defined by the user, discover the suitable 
226         nodes for provision.
227         """
228         if self._skip_provision():
229             super(PlanetlabNode, self).do_discover()
230             return
231
232         hostname = self._get_hostname()
233         if hostname:
234             # the user specified one particular node to be provisioned
235             # check with PLCAPI if it is alvive
236             node_id = self._query_if_alive(hostname=hostname)
237             node_id = node_id.pop()
238
239             # check that the node is not blacklisted or being provisioned
240             # by other RM
241             with PlanetlabNode.lock:
242                 plist = self.plapi.reserved()
243                 blist = self.plapi.blacklisted()
244                 if node_id not in blist and node_id not in plist:
245                 
246                     # check that is really alive, by performing ping
247                     ping_ok = self._do_ping(node_id)
248                     if not ping_ok:
249                         self._blacklist_node(node_id)
250                         self.fail_node_not_alive(hostname)
251                     else:
252                         if self._check_if_in_slice([node_id]):
253                             self._slicenode = True
254                         self._put_node_in_provision(node_id)
255                         self._node_to_provision = node_id
256                 else:
257                     self.fail_node_not_available(hostname)
258             super(PlanetlabNode, self).do_discover()
259         
260         else:
261             # the user specifies constraints based on attributes, zero, one or 
262             # more nodes can match these constraints 
263             nodes = self._filter_based_on_attributes()
264             nodes_alive = self._query_if_alive(nodes)
265
266             # nodes that are already part of user's slice have the priority to
267             # provisioned
268             nodes_inslice = self._check_if_in_slice(nodes_alive)
269             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
270             
271             node_id = None
272             if nodes_inslice:
273                 node_id = self._choose_random_node(nodes_inslice)
274                 self._slicenode = True                
275                 
276             if not node_id:
277                 # Either there were no matching nodes in the user's slice, or
278                 # the nodes in the slice  were blacklisted or being provisioned
279                 # by other RM. Note nodes_not_inslice is never empty
280                 node_id = self._choose_random_node(nodes_not_inslice)
281                 self._slicenode = False
282
283             if node_id:
284                 self._node_to_provision = node_id
285                 try:
286                     self._set_hostname_attr(node_id)
287                     self.info(" Selected node to provision ")
288                     super(PlanetlabNode, self).do_discover()
289                 except:
290                     with PlanetlabNode.lock:
291                         self._blacklist_node(node_id)
292                     self.do_discover()
293             else:
294                self.fail_not_enough_nodes() 
295             
296     def do_provision(self):
297         """
298         Add node to user's slice after verifing that the node is functioning
299         correctly
300         """
301         if self._skip_provision():
302             super(PlanetlabNode, self).do_provision()
303             return
304
305         provision_ok = False
306         ssh_ok = False
307         proc_ok = False
308         timeout = 1800
309
310         while not provision_ok:
311             node = self._node_to_provision
312             if not self._slicenode:
313                 self._add_node_to_slice(node)
314             
315                 # check ssh connection
316                 t = 0 
317                 while t < timeout and not ssh_ok:
318
319                     cmd = 'echo \'GOOD NODE\''
320                     ((out, err), proc) = self.execute(cmd)
321                     if out.find("GOOD NODE") < 0:
322                         t = t + 60
323                         time.sleep(60)
324                         continue
325                     else:
326                         ssh_ok = True
327                         continue
328             else:
329                 cmd = 'echo \'GOOD NODE\''
330                 ((out, err), proc) = self.execute(cmd)
331                 if not out.find("GOOD NODE") < 0:
332                     ssh_ok = True
333
334             if not ssh_ok:
335                 # the timeout was reach without establishing ssh connection
336                 # the node is blacklisted, deleted from the slice, and a new
337                 # node to provision is discovered
338                 with PlanetlabNode.lock:
339                     self.warn(" Could not SSH login ")
340                     self._blacklist_node(node)
341                     #self._delete_node_from_slice(node)
342                 self.set('hostname', None)
343                 self.do_discover()
344                 continue
345             
346             # check /proc directory is mounted (ssh_ok = True)
347             else:
348                 cmd = 'mount |grep proc'
349                 ((out, err), proc) = self.execute(cmd)
350                 if out.find("/proc type proc") < 0:
351                     with PlanetlabNode.lock:
352                         self.warn(" Could not find directory /proc ")
353                         self._blacklist_node(node)
354                         #self._delete_node_from_slice(node)
355                     self.set('hostname', None)
356                     self.do_discover()
357                     continue
358             
359                 else:
360                     provision_ok = True
361                     # set IP attribute
362                     ip = self._get_ip(node)
363                     self.set("ip", ip)
364             
365         super(PlanetlabNode, self).do_provision()
366
367     def _filter_based_on_attributes(self):
368         """
369         Retrive the list of nodes ids that match user's constraints 
370         """
371         # Map user's defined attributes with tagnames of PlanetLab
372         timeframe = self.get("timeframe")[0]
373         attr_to_tags = {
374             'city' : 'city',
375             'country' : 'country',
376             'region' : 'region',
377             'architecture' : 'arch',
378             'operatingSystem' : 'fcdistro',
379             #'site' : 'pldistro',
380             'minReliability' : 'reliability%s' % timeframe,
381             'maxReliability' : 'reliability%s' % timeframe,
382             'minBandwidth' : 'bw%s' % timeframe,
383             'maxBandwidth' : 'bw%s' % timeframe,
384             'minLoad' : 'load%s' % timeframe,
385             'maxLoad' : 'load%s' % timeframe,
386             'minCpu' : 'cpu%s' % timeframe,
387             'maxCpu' : 'cpu%s' % timeframe,
388         }
389         
390         nodes_id = []
391         filters = {}
392
393         for attr_name, attr_obj in self._attrs.iteritems():
394             attr_value = self.get(attr_name)
395             
396             if attr_value is not None and attr_obj.flags == 8 and \
397                 attr_name != 'timeframe':
398         
399                 attr_tag = attr_to_tags[attr_name]
400                 filters['tagname'] = attr_tag
401
402                 # filter nodes by fixed constraints e.g. operating system
403                 if not 'min' in attr_name and not 'max' in attr_name:
404                     filters['value'] = attr_value
405                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
406
407                 # filter nodes by range constraints e.g. max bandwidth
408                 elif ('min' or 'max') in attr_name:
409                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
410
411         if not filters:
412             nodes = self.plapi.get_nodes()
413             for node in nodes:
414                 nodes_id.append(node['node_id'])
415         
416         return nodes_id
417                     
418
419     def _filter_by_fixed_attr(self, filters, nodes_id):
420         """
421         Query PLCAPI for nodes ids matching fixed attributes defined by the
422         user
423         """
424         node_tags = self.plapi.get_node_tags(filters)
425         if node_tags is not None:
426
427             if len(nodes_id) == 0:
428                 # first attribute being matched
429                 for node_tag in node_tags:
430                     nodes_id.append(node_tag['node_id'])
431             else:
432                 # remove the nodes ids that don't match the new attribute
433                 # that is being match
434
435                 nodes_id_tmp = []
436                 for node_tag in node_tags:
437                     if node_tag['node_id'] in nodes_id:
438                         nodes_id_tmp.append(node_tag['node_id'])
439
440                 if len(nodes_id_tmp):
441                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
442                 else:
443                     # no node from before match the new constraint
444                     self.fail_discovery()
445         else:
446             # no nodes match the filter applied
447             self.fail_discovery()
448
449         return nodes_id
450
451     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
452         """
453         Query PLCAPI for nodes ids matching attributes defined in a certain
454         range, by the user
455         """
456         node_tags = self.plapi.get_node_tags(filters)
457         if node_tags:
458             
459             if len(nodes_id) == 0:
460                 # first attribute being matched
461                 for node_tag in node_tags:
462  
463                    # check that matches the min or max restriction
464                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
465                         float(node_tag['value']) > attr_value:
466                         nodes_id.append(node_tag['node_id'])
467
468                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
469                         float(node_tag['value']) < attr_value:
470                         nodes_id.append(node_tag['node_id'])
471             else:
472
473                 # remove the nodes ids that don't match the new attribute
474                 # that is being match
475                 nodes_id_tmp = []
476                 for node_tag in node_tags:
477
478                     # check that matches the min or max restriction and was a
479                     # matching previous filters
480                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
481                         float(node_tag['value']) > attr_value and \
482                         node_tag['node_id'] in nodes_id:
483                         nodes_id_tmp.append(node_tag['node_id'])
484
485                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
486                         float(node_tag['value']) < attr_value and \
487                         node_tag['node_id'] in nodes_id:
488                         nodes_id_tmp.append(node_tag['node_id'])
489
490                 if len(nodes_id_tmp):
491                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
492                 else:
493                     # no node from before match the new constraint
494                     self.fail_discovery()
495
496         else: #TODO CHECK
497             # no nodes match the filter applied
498             self.fail_discovery()
499
500         return nodes_id
501         
502     def _query_if_alive(self, nodes_id=None, hostname=None):
503         """
504         Query PLCAPI for nodes that register activity recently, using filters 
505         related to the state of the node, e.g. last time it was contacted
506         """
507         if nodes_id is None and hostname is None:
508             msg = "Specify nodes_id or hostname"
509             raise RuntimeError, msg
510
511         if nodes_id is not None and hostname is not None:
512             msg = "Specify either nodes_id or hostname"
513             raise RuntimeError, msg
514
515         # define PL filters to check the node is alive
516         filters = dict()
517         filters['run_level'] = 'boot'
518         filters['boot_state'] = 'boot'
519         filters['node_type'] = 'regular' 
520         #filters['>last_contact'] =  int(time.time()) - 2*3600
521
522         # adding node_id or hostname to the filters to check for the particular
523         # node
524         if nodes_id:
525             filters['node_id'] = list(nodes_id)
526             alive_nodes_id = self._get_nodes_id(filters)
527         elif hostname:
528             filters['hostname'] = hostname
529             alive_nodes_id = self._get_nodes_id(filters)
530
531         if len(alive_nodes_id) == 0:
532             self.fail_node_not_alive(hostname)
533         else:
534             nodes_id = list()
535             for node_id in alive_nodes_id:
536                 nid = node_id['node_id']
537                 nodes_id.append(nid)
538
539             return nodes_id
540
541     def _choose_random_node(self, nodes):
542         """
543         From the possible nodes for provision, choose randomly to decrese the
544         probability of different RMs choosing the same node for provision
545         """
546         size = len(nodes)
547         while size:
548             size = size - 1
549             index = randint(0, size)
550             node_id = nodes[index]
551             nodes[index] = nodes[size]
552
553             # check the node is not blacklisted or being provision by other RM
554             # and perform ping to check that is really alive
555             with PlanetlabNode.lock:
556
557                 blist = self.plapi.blacklisted()
558                 plist = self.plapi.reserved()
559                 if node_id not in blist and node_id not in plist:
560                     ping_ok = self._do_ping(node_id)
561                     if not ping_ok:
562                         self._set_hostname_attr(node_id)
563                         self.warn(" Node not responding PING ")
564                         self._blacklist_node(node_id)
565                         self.set('hostname', None)
566                     else:
567                         # discovered node for provision, added to provision list
568                         self._put_node_in_provision(node_id)
569                         return node_id
570
571     def _get_nodes_id(self, filters):
572         return self.plapi.get_nodes(filters, fields=['node_id'])
573
574     def _add_node_to_slice(self, node_id):
575         self.info(" Adding node to slice ")
576         slicename = self.get("username")
577         with PlanetlabNode.lock:
578             slice_nodes = self.plapi.get_slice_nodes(slicename)
579             slice_nodes.append(node_id)
580             self.plapi.add_slice_nodes(slicename, slice_nodes)
581
582     def _delete_node_from_slice(self, node):
583         self.warn(" Deleting node from slice ")
584         slicename = self.get("username")
585         self.plapi.delete_slice_node(slicename, [node])
586
587     def _get_hostname(self):
588         hostname = self.get("hostname")
589         ip = self.get("ip")
590         if hostname:
591             return hostname
592         elif ip:
593             hostname = socket.gethostbyaddr(ip)[0]
594             self.set('hostname', hostname)
595             return hostname
596         else:
597             return None
598
599     def _set_hostname_attr(self, node):
600         """
601         Query PLCAPI for the hostname of a certain node id and sets the
602         attribute hostname, it will over write the previous value
603         """
604         hostname = self.plapi.get_nodes(node, ['hostname'])
605         self.set("hostname", hostname[0]['hostname'])
606
607     def _check_if_in_slice(self, nodes_id):
608         """
609         Query PLCAPI to find out if any node id from nodes_id is in the user's
610         slice
611         """
612         slicename = self.get("username")
613         slice_nodes = self.plapi.get_slice_nodes(slicename)
614         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
615         return nodes_inslice
616
617     def _do_ping(self, node_id):
618         """
619         Perform ping command on node's IP matching node id
620         """
621         ping_ok = False
622         ip = self._get_ip(node_id)
623         if not ip: return ping_ok
624
625         command = "ping -c4 %s" % ip
626
627         (out, err) = lexec(command)
628         if not out.find("2 received") or not out.find("3 received") or not \
629             out.find("4 received") < 0:
630             ping_ok = True
631         
632         return ping_ok 
633
634     def _blacklist_node(self, node):
635         """
636         Add node mal functioning node to blacklist
637         """
638         self.warn(" Blacklisting malfunctioning node ")
639         self.plapi.blacklist_host(node)
640
641     def _put_node_in_provision(self, node):
642         """
643         Add node to the list of nodes being provisioned, in order for other RMs
644         to not try to provision the same one again
645         """
646         self.plapi.reserve_host(node)
647
648     def _get_ip(self, node_id):
649         """
650         Query PLCAPI for the IP of a node with certain node id
651         """
652         hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
653         try:
654             ip = sshfuncs.gethostbyname(hostname['hostname'])
655         except:
656             # Fail while trying to find the IP
657             return None
658         return ip
659
660     def fail_discovery(self):
661         msg = "Discovery failed. No candidates found for node"
662         self.error(msg)
663         raise RuntimeError, msg
664
665     def fail_node_not_alive(self, hostname=None):
666         msg = "Node %s not alive" % hostname
667         raise RuntimeError, msg
668     
669     def fail_node_not_available(self, hostname):
670         msg = "Node %s not available for provisioning" % hostname
671         raise RuntimeError, msg
672
673     def fail_not_enough_nodes(self):
674         msg = "Not enough nodes available for provisioning"
675         raise RuntimeError, msg
676
677     def fail_plapi(self):
678         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
679             " attributes pluser and plpassword."
680         raise RuntimeError, msg
681
682     def valid_connection(self, guid):
683         # TODO: Validate!
684         return True
685
686