Merging heads
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import threading
32 import datetime
33
34 @clsinit_copy
35 class PlanetlabNode(LinuxNode):
36     _rtype = "PlanetlabNode"
37     _help = "Controls a PlanetLab host accessible using a SSH key " \
38             "associated to a PlanetLab user account"
39     _backend = "planetlab"
40
41     lock = threading.Lock()
42
43     @classmethod
44     def _register_attributes(cls):
45         ip = Attribute("ip", "PlanetLab host public IP address",
46                 flags = Flags.ReadOnly)
47
48         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
49                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
50                     default = "www.planet-lab.eu",
51                     flags = Flags.Credential)
52
53         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
54                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
55                     default = "https://%(hostname)s:443/PLCAPI/",
56                     flags = Flags.ExecReadOnly)
57     
58         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
59                     authenticate in the website) ",
60                     flags = Flags.Credential)
61
62         pl_password = Attribute("plpassword", 
63                         "PlanetLab account password, as \
64                         the one to authenticate in the website) ",
65                         flags = Flags.Credential)
66
67         city = Attribute("city", "Constrain location (city) during resource \
68                 discovery. May use wildcards.",
69                 flags = Flags.Filter)
70
71         country = Attribute("country", "Constrain location (country) during \
72                     resource discovery. May use wildcards.",
73                     flags = Flags.Filter)
74
75         region = Attribute("region", "Constrain location (region) during \
76                     resource discovery. May use wildcards.",
77                     flags = Flags.Filter)
78
79         architecture = Attribute("architecture", "Constrain architecture \
80                         during resource discovery.",
81                         type = Types.Enumerate,
82                         allowed = ["x86_64", 
83                                     "i386"],
84                         flags = Flags.Filter)
85
86         operating_system = Attribute("operatingSystem", "Constrain operating \
87                             system during resource discovery.",
88                             type = Types.Enumerate,
89                             allowed =  ["f8",
90                                         "f12",
91                                         "f14",
92                                         "centos",
93                                         "other"],
94                             flags = Flags.Filter)
95
96         #site = Attribute("site", "Constrain the PlanetLab site this node \
97         #        should reside on.",
98         #        type = Types.Enumerate,
99         #        allowed = ["PLE",
100         #                    "PLC",
101         #                    "PLJ"],
102         #        flags = Flags.Filter)
103
104         min_reliability = Attribute("minReliability", "Constrain reliability \
105                             while picking PlanetLab nodes. Specifies a lower \
106                             acceptable bound.",
107                             type = Types.Double,
108                             range = (1, 100),
109                             flags = Flags.Filter)
110
111         max_reliability = Attribute("maxReliability", "Constrain reliability \
112                             while picking PlanetLab nodes. Specifies an upper \
113                             acceptable bound.",
114                             type = Types.Double,
115                             range = (1, 100),
116                             flags = Flags.Filter)
117
118         min_bandwidth = Attribute("minBandwidth", "Constrain available \
119                             bandwidth while picking PlanetLab nodes. \
120                             Specifies a lower acceptable bound.",
121                             type = Types.Double,
122                             range = (0, 2**31),
123                             flags = Flags.Filter)
124
125         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
126                             bandwidth while picking PlanetLab nodes. \
127                             Specifies an upper acceptable bound.",
128                             type = Types.Double,
129                             range = (0, 2**31),
130                             flags = Flags.Filter)
131
132         min_load = Attribute("minLoad", "Constrain node load average while \
133                     picking PlanetLab nodes. Specifies a lower acceptable \
134                     bound.",
135                     type = Types.Double,
136                     range = (0, 2**31),
137                     flags = Flags.Filter)
138
139         max_load = Attribute("maxLoad", "Constrain node load average while \
140                     picking PlanetLab nodes. Specifies an upper acceptable \
141                     bound.",
142                     type = Types.Double,
143                     range = (0, 2**31),
144                     flags = Flags.Filter)
145
146         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
147                     picking PlanetLab nodes. Specifies a lower acceptable \
148                     bound.",
149                     type = Types.Double,
150                     range = (0, 100),
151                     flags = Flags.Filter)
152
153         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
154                     picking PlanetLab nodes. Specifies an upper acceptable \
155                     bound.",
156                     type = Types.Double,
157                     range = (0, 100),
158                     flags = Flags.Filter)
159
160         timeframe = Attribute("timeframe", "Past time period in which to check\
161                         information about the node. Values are year,month, \
162                         week, latest",
163                         default = "week",
164                         type = Types.Enumerate,
165                         allowed = ["latest",
166                                     "week",
167                                     "month",
168                                     "year"],
169                         flags = Flags.Filter)
170
171         cls._register_attribute(ip)
172         cls._register_attribute(pl_url)
173         cls._register_attribute(pl_ptn)
174         cls._register_attribute(pl_user)
175         cls._register_attribute(pl_password)
176         #cls._register_attribute(site)
177         cls._register_attribute(city)
178         cls._register_attribute(country)
179         cls._register_attribute(region)
180         cls._register_attribute(architecture)
181         cls._register_attribute(operating_system)
182         cls._register_attribute(min_reliability)
183         cls._register_attribute(max_reliability)
184         cls._register_attribute(min_bandwidth)
185         cls._register_attribute(max_bandwidth)
186         cls._register_attribute(min_load)
187         cls._register_attribute(max_load)
188         cls._register_attribute(min_cpu)
189         cls._register_attribute(max_cpu)
190         cls._register_attribute(timeframe)
191
192     def __init__(self, ec, guid):
193         super(PlanetlabNode, self).__init__(ec, guid)
194
195         self._plapi = None
196         self._node_to_provision = None
197         self._slicenode = False
198     
199     @property
200     def plapi(self):
201         if not self._plapi:
202             pl_user = self.get("pluser")
203             pl_pass = self.get("plpassword")
204             pl_url = self.get("plcApiUrl")
205             pl_ptn = self.get("plcApiPattern")
206
207             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
208                     pl_ptn)
209             
210             if not self._plapi:
211                 self.fail_plapi()
212
213         return self._plapi
214
215     def do_discover(self):
216         """
217         Based on the attributes defined by the user, discover the suitable 
218         nodes for provision.
219         """
220         hostname = self._get_hostname()
221         if hostname:
222             # the user specified one particular node to be provisioned
223             # check with PLCAPI if it is alvive
224             node_id = self._query_if_alive(hostname=hostname)
225             node_id = node_id.pop()
226
227             # check that the node is not blacklisted or being provisioned
228             # by other RM
229             with PlanetlabNode.lock:
230                 plist = self.plapi.reserved()
231                 blist = self.plapi.blacklisted()
232                 if node_id not in blist and node_id not in plist:
233                 
234                     # check that is really alive, by performing ping
235                     ping_ok = self._do_ping(node_id)
236                     if not ping_ok:
237                         self._blacklist_node(node_id)
238                         self.fail_node_not_alive(hostname)
239                     else:
240                         if self._check_if_in_slice([node_id]):
241                             self._slicenode = True
242                         self._put_node_in_provision(node_id)
243                         self._node_to_provision = node_id
244                         super(PlanetlabNode, self).do_discover()
245                 
246                 else:
247                     self.fail_node_not_available(hostname)
248         
249         else:
250             # the user specifies constraints based on attributes, zero, one or 
251             # more nodes can match these constraints 
252             nodes = self._filter_based_on_attributes()
253             nodes_alive = self._query_if_alive(nodes)
254     
255             # nodes that are already part of user's slice have the priority to
256             # provisioned
257             nodes_inslice = self._check_if_in_slice(nodes_alive)
258             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
259             
260             node_id = None
261             if nodes_inslice:
262                 node_id = self._choose_random_node(nodes_inslice)
263                 self._slicenode = True                
264                 
265             if not node_id:
266                 # Either there were no matching nodes in the user's slice, or
267                 # the nodes in the slice  were blacklisted or being provisioned
268                 # by other RM. Note nodes_not_inslice is never empty
269                 node_id = self._choose_random_node(nodes_not_inslice)
270                 self._slicenode = False
271
272             if node_id:
273                 self._node_to_provision = node_id
274                 try:
275                     self._set_hostname_attr(node_id)
276                     self.info(" Selected node to provision ")
277                 except:
278                     with PlanetlabNode.lock:
279                         self._blacklist_node(node_id)
280                         self.do_discover()
281  
282                 super(PlanetlabNode, self).do_discover()
283             else:
284                self.fail_not_enough_nodes() 
285             
286     def do_provision(self):
287         """
288         Add node to user's slice after verifing that the node is functioning
289         correctly
290         """
291         provision_ok = False
292         ssh_ok = False
293         proc_ok = False
294         timeout = 1800
295
296         while not provision_ok:
297             node = self._node_to_provision
298             if not self._slicenode:
299                 self._add_node_to_slice(node)
300             
301                 # check ssh connection
302                 t = 0 
303                 while t < timeout and not ssh_ok:
304
305                     cmd = 'echo \'GOOD NODE\''
306                     ((out, err), proc) = self.execute(cmd)
307                     if out.find("GOOD NODE") < 0:
308                         t = t + 60
309                         time.sleep(60)
310                         continue
311                     else:
312                         ssh_ok = True
313                         continue
314             else:
315                 cmd = 'echo \'GOOD NODE\''
316                 ((out, err), proc) = self.execute(cmd)
317                 if not out.find("GOOD NODE") < 0:
318                     ssh_ok = True
319
320             if not ssh_ok:
321                 # the timeout was reach without establishing ssh connection
322                 # the node is blacklisted, deleted from the slice, and a new
323                 # node to provision is discovered
324                 with PlanetlabNode.lock:
325                     self.warn(" Could not SSH login ")
326                     self._blacklist_node(node)
327                     #self._delete_node_from_slice(node)
328                 self.set('hostname', None)
329                 self.do_discover()
330                 continue
331             
332             # check /proc directory is mounted (ssh_ok = True)
333             else:
334                 cmd = 'mount |grep proc'
335                 ((out, err), proc) = self.execute(cmd)
336                 if out.find("/proc type proc") < 0:
337                     with PlanetlabNode.lock:
338                         self.warn(" Could not find directory /proc ")
339                         self._blacklist_node(node)
340                         #self._delete_node_from_slice(node)
341                     self.set('hostname', None)
342                     self.do_discover()
343                     continue
344             
345                 else:
346                     provision_ok = True
347                     # set IP attribute
348                     ip = self._get_ip(node)
349                     self.set("ip", ip)
350             
351         super(PlanetlabNode, self).do_provision()
352
353     def _filter_based_on_attributes(self):
354         """
355         Retrive the list of nodes ids that match user's constraints 
356         """
357         # Map user's defined attributes with tagnames of PlanetLab
358         timeframe = self.get("timeframe")[0]
359         attr_to_tags = {
360             'city' : 'city',
361             'country' : 'country',
362             'region' : 'region',
363             'architecture' : 'arch',
364             'operatingSystem' : 'fcdistro',
365             #'site' : 'pldistro',
366             'minReliability' : 'reliability%s' % timeframe,
367             'maxReliability' : 'reliability%s' % timeframe,
368             'minBandwidth' : 'bw%s' % timeframe,
369             'maxBandwidth' : 'bw%s' % timeframe,
370             'minLoad' : 'load%s' % timeframe,
371             'maxLoad' : 'load%s' % timeframe,
372             'minCpu' : 'cpu%s' % timeframe,
373             'maxCpu' : 'cpu%s' % timeframe,
374         }
375         
376         nodes_id = []
377         filters = {}
378
379         for attr_name, attr_obj in self._attrs.iteritems():
380             attr_value = self.get(attr_name)
381             
382             if attr_value is not None and attr_obj.flags == 8 and \
383                 attr_name != 'timeframe':
384         
385                 attr_tag = attr_to_tags[attr_name]
386                 filters['tagname'] = attr_tag
387
388                 # filter nodes by fixed constraints e.g. operating system
389                 if not 'min' in attr_name and not 'max' in attr_name:
390                     filters['value'] = attr_value
391                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
392
393                 # filter nodes by range constraints e.g. max bandwidth
394                 elif ('min' or 'max') in attr_name:
395                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
396
397         if not filters:
398             nodes = self.plapi.get_nodes()
399             for node in nodes:
400                 nodes_id.append(node['node_id'])
401         
402         return nodes_id
403                     
404
405     def _filter_by_fixed_attr(self, filters, nodes_id):
406         """
407         Query PLCAPI for nodes ids matching fixed attributes defined by the
408         user
409         """
410         node_tags = self.plapi.get_node_tags(filters)
411         if node_tags is not None:
412
413             if len(nodes_id) == 0:
414                 # first attribute being matched
415                 for node_tag in node_tags:
416                     nodes_id.append(node_tag['node_id'])
417             else:
418                 # remove the nodes ids that don't match the new attribute
419                 # that is being match
420
421                 nodes_id_tmp = []
422                 for node_tag in node_tags:
423                     if node_tag['node_id'] in nodes_id:
424                         nodes_id_tmp.append(node_tag['node_id'])
425
426                 if len(nodes_id_tmp):
427                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
428                 else:
429                     # no node from before match the new constraint
430                     self.fail_discovery()
431         else:
432             # no nodes match the filter applied
433             self.fail_discovery()
434
435         return nodes_id
436
437     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
438         """
439         Query PLCAPI for nodes ids matching attributes defined in a certain
440         range, by the user
441         """
442         node_tags = self.plapi.get_node_tags(filters)
443         if node_tags is not None:
444             
445             if len(nodes_id) == 0:
446                 # first attribute being matched
447                 for node_tag in node_tags:
448  
449                    # check that matches the min or max restriction
450                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
451                         float(node_tag['value']) > attr_value:
452                         nodes_id.append(node_tag['node_id'])
453
454                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
455                         float(node_tag['value']) < attr_value:
456                         nodes_id.append(node_tag['node_id'])
457             else:
458
459                 # remove the nodes ids that don't match the new attribute
460                 # that is being match
461                 nodes_id_tmp = []
462                 for node_tag in node_tags:
463
464                     # check that matches the min or max restriction and was a
465                     # matching previous filters
466                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
467                         float(node_tag['value']) > attr_value and \
468                         node_tag['node_id'] in nodes_id:
469                         nodes_id_tmp.append(node_tag['node_id'])
470
471                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
472                         float(node_tag['value']) < attr_value and \
473                         node_tag['node_id'] in nodes_id:
474                         nodes_id_tmp.append(node_tag['node_id'])
475
476                 if len(nodes_id_tmp):
477                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
478                 else:
479                     # no node from before match the new constraint
480                     self.fail_discovery()
481
482         else: #TODO CHECK
483             # no nodes match the filter applied
484             self.fail_discovery()
485
486         return nodes_id
487         
488     def _query_if_alive(self, nodes_id=None, hostname=None):
489         """
490         Query PLCAPI for nodes that register activity recently, using filters 
491         related to the state of the node, e.g. last time it was contacted
492         """
493         if nodes_id is None and hostname is None:
494             msg = "Specify nodes_id or hostname"
495             raise RuntimeError, msg
496
497         if nodes_id is not None and hostname is not None:
498             msg = "Specify either nodes_id or hostname"
499             raise RuntimeError, msg
500
501         # define PL filters to check the node is alive
502         filters = dict()
503         filters['run_level'] = 'boot'
504         filters['boot_state'] = 'boot'
505         filters['node_type'] = 'regular' 
506         #filters['>last_contact'] =  int(time.time()) - 2*3600
507
508         # adding node_id or hostname to the filters to check for the particular
509         # node
510         if nodes_id:
511             filters['node_id'] = list(nodes_id)
512             alive_nodes_id = self._get_nodes_id(filters)
513         elif hostname:
514             filters['hostname'] = hostname
515             alive_nodes_id = self._get_nodes_id(filters)
516
517         if len(alive_nodes_id) == 0:
518             self.fail_node_not_alive(self, hostname)
519         else:
520             nodes_id = list()
521             for node_id in alive_nodes_id:
522                 nid = node_id['node_id']
523                 nodes_id.append(nid)
524
525             return nodes_id
526
527     def _choose_random_node(self, nodes):
528         """
529         From the possible nodes for provision, choose randomly to decrese the
530         probability of different RMs choosing the same node for provision
531         """
532         size = len(nodes)
533         while size:
534             size = size - 1
535             index = randint(0, size)
536             node_id = nodes[index]
537             nodes[index] = nodes[size]
538
539             # check the node is not blacklisted or being provision by other RM
540             # and perform ping to check that is really alive
541             with PlanetlabNode.lock:
542
543                 blist = self.plapi.blacklisted()
544                 plist = self.plapi.reserved()
545                 if node_id not in blist and node_id not in plist:
546                     ping_ok = self._do_ping(node_id)
547                     if not ping_ok:
548                         self._set_hostname_attr(node_id)
549                         self.warn(" Node not responding PING ")
550                         self._blacklist_node(node_id)
551                         self.set('hostname', None)
552                     else:
553                         # discovered node for provision, added to provision list
554                         self._put_node_in_provision(node_id)
555                         return node_id
556
557     def _get_nodes_id(self, filters):
558         return self.plapi.get_nodes(filters, fields=['node_id'])
559
560     def _add_node_to_slice(self, node_id):
561         self.info(" Adding node to slice ")
562         slicename = self.get("username")
563         with PlanetlabNode.lock:
564             slice_nodes = self.plapi.get_slice_nodes(slicename)
565             slice_nodes.append(node_id)
566             self.plapi.add_slice_nodes(slicename, slice_nodes)
567
568     def _delete_node_from_slice(self, node):
569         self.warn(" Deleting node from slice ")
570         slicename = self.get("username")
571         self.plapi.delete_slice_node(slicename, [node])
572
573     def _get_hostname(self):
574         hostname = self.get("hostname")
575         ip = self.get("ip")
576         if hostname:
577             return hostname
578         elif ip:
579             hostname = sshfuncs.gethostbyname(ip)
580             return hostname
581         else:
582             return None
583
584     def _set_hostname_attr(self, node):
585         """
586         Query PLCAPI for the hostname of a certain node id and sets the
587         attribute hostname, it will over write the previous value
588         """
589         hostname = self.plapi.get_nodes(node, ['hostname'])
590         self.set("hostname", hostname[0]['hostname'])
591
592     def _check_if_in_slice(self, nodes_id):
593         """
594         Query PLCAPI to find out if any node id from nodes_id is in the user's
595         slice
596         """
597         slicename = self.get("username")
598         slice_nodes = self.plapi.get_slice_nodes(slicename)
599         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
600         return nodes_inslice
601
602     def _do_ping(self, node_id):
603         """
604         Perform ping command on node's IP matching node id
605         """
606         ping_ok = False
607         ip = self._get_ip(node_id)
608         if not ip: return ping_ok
609
610         command = "ping -c2 %s" % ip
611
612         (out, err) = lexec(command)
613         if not out.find("2 received") < 0:
614             ping_ok = True
615         
616         return ping_ok 
617
618     def _blacklist_node(self, node):
619         """
620         Add node mal functioning node to blacklist
621         """
622         self.warn(" Blacklisting malfunctioning node ")
623         self._plapi.blacklist_host(node)
624
625     def _put_node_in_provision(self, node):
626         """
627         Add node to the list of nodes being provisioned, in order for other RMs
628         to not try to provision the same one again
629         """
630         self._plapi.reserve_host(node)
631
632     def _get_ip(self, node_id):
633         """
634         Query PLCAPI for the IP of a node with certain node id
635         """
636         hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
637         try:
638             ip = sshfuncs.gethostbyname(hostname['hostname'])
639         except:
640             # Fail while trying to find the IP
641             return None
642         return ip
643
644     def fail_discovery(self):
645         self.fail()
646         msg = "Discovery failed. No candidates found for node"
647         self.error(msg)
648         raise RuntimeError, msg
649
650     def fail_node_not_alive(self, hostname=None):
651         self.fail()
652         msg = "Node %s not alive" % hostname
653         raise RuntimeError, msg
654     
655     def fail_node_not_available(self, hostname):
656         self.fail()
657         msg = "Node %s not available for provisioning" % hostname
658         raise RuntimeError, msg
659
660     def fail_not_enough_nodes(self):
661         self.fail()
662         msg = "Not enough nodes available for provisioning"
663         raise RuntimeError, msg
664
665     def fail_plapi(self):
666         self.fail()
667         msg = "Failing while trying to instanciate the PLC API"
668         raise RuntimeError, msg
669
670     def valid_connection(self, guid):
671         # TODO: Validate!
672         return True
673
674