Code cleanup. Setting resource state through specific functions
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
25 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
26
27 import subprocess
28 import threading
29
30 # A.Q. GENERAL COMMENTS: This module needs major cleaning up
31 #     - Lines should be 80 characters
32 #     - Most methods have too many lines and there are no comments or spaces
33 #     - There should be only two line breaks between two methods
34 #     - Code is too compressed. Hard to read. Add spaces when needed
35 #     - In general the code needs to be more subdivided. Use more methods 
36 #       with clear names to divide operations (even if you don't reuse the 
37 #       methods else where, this will make the code more readable)
38
39 @clsinit_copy
40 class PlanetlabNode(LinuxNode):
41     _rtype = "PlanetlabNode"
42
43     _blacklist = list()
44     _in_provision = list()
45
46     _lock_bl = threading.Lock()
47     _lock_inpro = threading.Lock()
48
49     @classmethod
50     def blacklist(cls):
51         """ Returns the blacklisted nodes
52
53         """
54         return cls._blacklist
55
56     ### A.Q. COMMENT: Why did you wrapped the locks inside methods ?
57     @classmethod
58     def in_provision(cls):
59         """ Returns the nodes that anohter RM is trying to provision
60
61         """
62         return cls._in_provision
63
64     @classmethod
65     def lock_bl(cls):
66         """ Returns the lock for the blacklist
67
68         """
69         return cls._lock_bl
70
71     @classmethod
72     def lock_inpro(cls):
73         """ Returns the lock for the provision list
74
75         """
76         return cls._lock_inpro
77
78
79     @classmethod
80     def _register_attributes(cls):
81         ip = Attribute("ip", "PlanetLab host public IP address",
82                 flags = Flags.ReadOnly)
83
84         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host (e.g. www.planet-lab.eu or www.planet-lab.org) ",
85                 default = "www.planet-lab.eu",
86                 flags = Flags.Credential)
87
88         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
89                 default = "https://%(hostname)s:443/PLCAPI/",
90                 flags = Flags.ExecReadOnly)
91     
92         pl_user = Attribute("pluser", "PlanetLab account user, as the one to authenticate in the website) ",
93                 flags = Flags.Credential)
94
95         pl_password = Attribute("password", "PlanetLab account password, as the one to authenticate in the website) ",
96                 flags = Flags.Credential)
97
98         city = Attribute("city",
99                 "Constrain location (city) during resource discovery. May use wildcards.",
100                 flags = Flags.Filter)
101
102         country = Attribute("country",
103                 "Constrain location (country) during resource discovery. May use wildcards.",
104                 flags = Flags.Filter)
105
106         region = Attribute("region",
107                 "Constrain location (region) during resource discovery. May use wildcards.",
108                 flags = Flags.Filter)
109
110         architecture = Attribute("architecture",
111                 "Constrain architecture during resource discovery.",
112                 type = Types.Enumerate,
113                 allowed = ["x86_64",
114                             "i386"],
115                 flags = Flags.Filter)
116
117         operating_system = Attribute("operatingSystem",
118                 "Constrain operating system during resource discovery.",
119                 type = Types.Enumerate,
120                 allowed =  ["f8",
121                             "f12",
122                             "f14",
123                             "centos",
124                             "other"],
125                 flags = Flags.Filter)
126
127         site = Attribute("site",
128                 "Constrain the PlanetLab site this node should reside on.",
129                 type = Types.Enumerate,
130                 allowed = ["PLE",
131                             "PLC",
132                             "PLJ"],
133                 flags = Flags.Filter)
134
135         min_reliability = Attribute("minReliability",
136                 "Constrain reliability while picking PlanetLab nodes. Specifies a lower acceptable bound.",
137                 type = Types.Double,
138                 range = (1, 100),
139                 flags = Flags.Filter)
140
141         max_reliability = Attribute("maxReliability",
142                 "Constrain reliability while picking PlanetLab nodes. Specifies an upper acceptable bound.",
143                 type = Types.Double,
144                 range = (1, 100),
145                 flags = Flags.Filter)
146
147         min_bandwidth = Attribute("minBandwidth",
148                 "Constrain available bandwidth while picking PlanetLab nodes. Specifies a lower acceptable bound.",
149                 type = Types.Double,
150                 range = (0, 2**31),
151                 flags = Flags.Filter)
152
153         max_bandwidth = Attribute("maxBandwidth",
154                 "Constrain available bandwidth while picking PlanetLab nodes. Specifies an upper acceptable bound.",
155                 type = Types.Double,
156                 range = (0, 2**31),
157                 flags = Flags.Filter)
158
159         min_load = Attribute("minLoad",
160                 "Constrain node load average while picking PlanetLab nodes. Specifies a lower acceptable bound.",
161                 type = Types.Double,
162                 range = (0, 2**31),
163                 flags = Flags.Filter)
164
165         max_load = Attribute("maxLoad",
166                 "Constrain node load average while picking PlanetLab nodes. Specifies an upper acceptable bound.",
167                 type = Types.Double,
168                 range = (0, 2**31),
169                 flags = Flags.Filter)
170
171         min_cpu = Attribute("minCpu",
172                 "Constrain available cpu time while picking PlanetLab nodes. Specifies a lower acceptable bound.",
173                 type = Types.Double,
174                 range = (0, 100),
175                 flags = Flags.Filter)
176
177         max_cpu = Attribute("maxCpu",
178                 "Constrain available cpu time while picking PlanetLab nodes. Specifies an upper acceptable bound.",
179                 type = Types.Double,
180                 range = (0, 100),
181                 flags = Flags.Filter)
182
183         timeframe = Attribute("timeframe",
184                 "Past time period in which to check information about the node. Values are year,month, week, latest",
185                 default = "week",
186                 type = Types.Enumerate,
187                 allowed = ["latest",
188                             "week",
189                             "month",
190                             "year"],
191                  flags = Flags.Filter)
192
193         cls._register_attribute(ip)
194         cls._register_attribute(pl_url)
195         cls._register_attribute(pl_ptn)
196         cls._register_attribute(pl_user)
197         cls._register_attribute(pl_password)
198         cls._register_attribute(site)
199         cls._register_attribute(city)
200         cls._register_attribute(country)
201         cls._register_attribute(region)
202         cls._register_attribute(architecture)
203         cls._register_attribute(operating_system)
204         cls._register_attribute(min_reliability)
205         cls._register_attribute(max_reliability)
206         cls._register_attribute(min_bandwidth)
207         cls._register_attribute(max_bandwidth)
208         cls._register_attribute(min_load)
209         cls._register_attribute(max_load)
210         cls._register_attribute(min_cpu)
211         cls._register_attribute(max_cpu)
212         cls._register_attribute(timeframe)
213         
214
215     def __init__(self, ec, guid):
216         super(PlanetlabNode, self).__init__(ec, guid)
217
218         self._plapi = None
219         self._node_to_provision = None
220     
221     @property
222     def plapi(self):
223         if not self._plapi:
224             pl_user = self.get("pluser")
225             pl_pass = self.get("password")
226             pl_url = self.get("plcApiUrl")
227             pl_ptn = self.get("plcApiPattern")
228
229             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
230                     pl_ptn)
231             
232         return self._plapi
233
234     def discoverl(self):
235         #### A.Q. COMMENT: no need to have methods for the locks and 
236         ##                 other attributes. Please remove.
237         bl = PlanetlabNode.blacklist()
238         inpro = PlanetlabNode.in_provision()
239         lockbl = PlanetlabNode.lock_bl()
240         lockinpro = PlanetlabNode.lock_inpro()
241         hostname = self.get("hostname")
242         if hostname: 
243             node_id = self.check_alive_and_active(hostname=hostname)
244             if node_id not in bl and node_id not in inpro:
245                 try_other = self.do_ping(node_id)
246                 if try_other:
247                     # A.Q. COMMENT: Here you could do 
248                     #
249                     #   with self._lockbl:
250                     #       ...
251                     #
252                     #  Class attributes can still be accesed with 'self'
253                     lockbl.acquire()
254                     bl.append(node_id)
255                     lockbl.release()
256                     msg = "Node %s not alive, pick another node" % hostname
257                     raise RuntimeError, msg
258                 else:
259                     self._node_to_provision = node_id
260                     super(PlanetlabNode, self).discover()
261                     #self._discover_time = tnow()
262                     #self._state = ResourceState.DISCOVERED
263                     return node_id
264             else:
265                 msg = "Node %s not available for provisioning, pick another node" % hostname
266                 raise RuntimeError, msg
267                     
268         else:
269             from random import randint
270             nodes = self.filter_based_on_attributes()
271             nodes_alive = self.check_alive_and_active(nodes)
272             print nodes, nodes_alive
273             nodes_inslice = self.check_if_in_slice(nodes_alive)
274             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
275             if nodes_inslice:
276                 size = len(nodes_inslice)
277                 while size:
278                     size = size - 1
279                     index = randint(0, size)
280                     node_id = nodes_inslice[index]
281                     nodes_inslice[index] = nodes_inslice[size]
282                     if node_id not in bl and node_id not in inpro:
283                         try_other = self.do_ping(node_id)
284                         if not try_other:
285                             lockinpro.acquire()
286                             inpro.append(node_id)
287                             lockinpro.release()
288                             self._node_to_provision = node_id
289
290                             super(PlanetlabNode, self).discover()
291                             #self._discover_time = tnow()
292                             #self._state = ResourceState.DISCOVERED
293                             return node_id
294                         else:
295                             lockbl.acquire()
296                             bl.append(node_id)
297                             lockbl.release()
298
299             if nodes_not_inslice:
300                 size = len(nodes_not_inslice)
301                 while size:
302                     size = size - 1
303                     index = randint(0, size)
304                     node_id = nodes_not_inslice[index]
305                     nodes_not_inslice[index] = nodes_not_inslice[size]
306                     if node_id not in bl and node_id not in inpro:
307                         try_other = self.do_ping(node_id)
308                         if not try_other:
309                             lockinpro.acquire()
310                             inpro.append(node_id)
311                             lockinpro.release()
312                             self._node_to_provision = node_id
313                             
314                             super(PlanetlabNode, self).discover()
315                             #self._discover_time = tnow()
316                             #self._state = ResourceState.DISCOVERED
317                             return node_id
318                         else:
319                             lockbl.acquire()
320                             bl.append(node_id)
321                             lockbl.release()
322                 msg = "Not enough nodes available for provisioning"
323                 raise RuntimeError, msg
324
325                     
326
327     def provisionl(self):
328         # A.Q. COMMENT: you can import time on the top
329         import time
330         bl = PlanetlabNode.blacklist()
331         lockbl = PlanetlabNode.lock_bl()
332         provision_ok = False
333         ssh_ok = False
334         proc_ok = False
335         timeout = 1200
336         while not provision_ok:
337             slicename = self.get("username")
338             node = self._node_to_provision
339             ip = self.plapi.get_interfaces({'node_id':node}, fields=['ip'])
340             ip = ip[0]['ip']
341             print ip
342
343             self.plapi.add_slice_nodes(slicename, [node])
344             
345             t = 0 
346             while t < timeout and not ssh_ok:
347                 # check ssh connection
348
349                 # A.Q. COMMENT IMPORTANT! Instead of issuing SSH commands directly use the
350                 #    "execute" method inherithed from LinuxNode with blocking = True
351                 command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'echo \'GOOD NODE\''" % (slicename, ip)
352                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) 
353                 stdout, stderr = p.communicate()
354                 if stdout.find("GOOD NODE") < 0:
355                     print t
356                     t = t + 60
357                     time.sleep(60)
358                     continue
359                 else:
360                     ssh_ok = True
361                     continue
362
363             if not ssh_ok:
364                 with lockbl:
365                     bl.append(node)
366                     print bl
367                     # A.Q. COMMENT: Make method "delete_slice_node" and there 
368                     #               put this code. Repeat this for all calls to plapi.
369                     #               This will make the code cleaner.
370                     self.plapi.delete_slice_node(slicename, [node])
371                     self.discover()
372                 continue
373             
374             # check /proc directory
375             else: # ssh_ok:
376                 command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'mount |grep proc'" % (slicename, ip)
377                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
378                 stdout, stderr = p.communicate()
379                 if stdout.find("/proc type proc") < 0:
380                     # A.Q. COMMENT: lines 382-384 should go to a method
381                     #       "blacklist_node()"
382                     lockbl.acquire()
383                     bl.append(node)
384                     lockbl.release()
385                     self.plapi.delete_slice_node(slicename, [node])
386                     self.discover()
387                     continue
388             
389                 else:
390                     provision_ok = True
391                     # set attributes ip, hostname
392                     self.set("ip", ip)
393  
394                     hostname = self.plapi.get_nodes(node, ['hostname'])
395                     self.set("hostname", hostname[0]['hostname'])
396                     print self.get("hostname")
397             
398         # call provision de linux node?
399         super(PlanetlabNode, self).provision()
400
401     def filter_based_on_attributes(self):
402         # Map attributes with tagnames of PL
403         timeframe = self.get("timeframe")[0]
404         attr_to_tags = {
405             'city' : 'city',
406             'country' : 'country',
407             'region' : 'region',
408             'architecture' : 'arch',
409             'operatingSystem' : 'fcdistro',
410             #'site' : 'pldistro',
411             'minReliability' : 'reliability%s' % timeframe,
412             'maxReliability' : 'reliability%s' % timeframe,
413             'minBandwidth' : 'bw%s' % timeframe,
414             'maxBandwidth' : 'bw%s' % timeframe,
415             'minLoad' : 'load%s' % timeframe,
416             'maxLoad' : 'load%s' % timeframe,
417             'minCpu' : 'cpu%s' % timeframe,
418             'maxCpu' : 'cpu%s' % timeframe,
419         }
420         
421         nodes_id = []
422         filters = {}
423         for attr_name, attr_obj in self._attrs.iteritems():
424             attr_value = self.get(attr_name)
425             if attr_value is not None and attr_obj.flags == 8 and not 'min' in attr_name \
426                 and not 'max' in attr_name and attr_name != 'timeframe':
427                 attr_tag = attr_to_tags[attr_name]
428                 filters['tagname'] = attr_tag
429                 filters['value'] = attr_value
430                 node_tags = self.plapi.get_node_tags(filters)
431                 if node_tags is not None:
432                     if len(nodes_id) == 0:
433                         for node_tag in node_tags:
434                             nodes_id.append(node_tag['node_id'])
435                     else:
436                         nodes_id_tmp = []
437                         for node_tag in node_tags:
438                             if node_tag['node_id'] in nodes_id:
439                                 nodes_id_tmp.append(node_tag['node_id'])
440                         if len(nodes_id_tmp):
441                             nodes_id = set(nodes_id) & set(nodes_id_tmp)
442                         else:
443                             self.fail2()
444                 else:
445                     self.fail2()
446             elif attr_value is not None and attr_obj.flags == 8 and ('min' or 'max') in attr_name:
447                 attr_tag = attr_to_tags[attr_name]
448                 filters['tagname'] = attr_tag
449                 node_tags = self.plapi.get_node_tags(filters)
450                 if node_tags is not None:
451                     if len(nodes_id) == 0:
452                         for node_tag in node_tags:
453                             if 'min' in attr_name and node_tag['value'] != 'n/a' and \
454                                 float(node_tag['value']) > attr_value:
455                                 nodes_id.append(node_tag['node_id'])
456                             elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
457                                 float(node_tag['value']) < attr_value:
458                                 nodes_id.append(node_tag['node_id'])
459                     else:
460                         nodes_id_tmp = []
461                         for node_tag in node_tags:
462                             if 'min' in attr_name and node_tag['value'] != 'n/a' and \
463                                 float(node_tag['value']) > attr_value and \
464                                 node_tag['node_id'] in nodes_id:
465                                 nodes_id_tmp.append(node_tag['node_id'])
466                             elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
467                                 float(node_tag['value']) < attr_value and \
468                                 node_tag['node_id'] in nodes_id:
469                                 nodes_id_tmp.append(node_tag['node_id'])
470                         if len(nodes_id_tmp):
471                             nodes_id = set(nodes_id) & set(nodes_id_tmp)
472                         else:
473                             self.fail2()
474
475         return nodes_id
476                     
477     def check_alive_and_active(self, nodes_id=None, hostname=None):
478         if nodes_id is None and hostname is None:
479             msg = "Specify nodes_id or hostname"
480             raise RuntimeError, msg
481         if nodes_id is not None and hostname is not None:
482             msg = "Specify either nodes_id or hostname"
483             raise RuntimeError, msg
484
485         # check node alive
486         import time
487         filters = dict()
488         filters['run_level'] = 'boot'
489         filters['boot_state'] = 'boot'
490         filters['node_type'] = 'regular' 
491         filters['>last_contact'] =  int(time.time()) - 2*3600
492         if nodes_id:
493             filters['node_id'] = list(nodes_id)
494             alive_nodes_id = self.plapi.get_nodes(filters, fields=['node_id'])
495         elif hostname:
496             filters['hostname'] = hostname
497             alive_nodes_id = self.plapi.get_nodes(filters, fields=['node_id'])
498         if len(alive_nodes_id) == 0:
499             self.fail2()
500         else:
501             nodes_id = list()
502             for node_id in alive_nodes_id:
503                 nid = node_id['node_id']
504                 nodes_id.append(nid)
505             return nodes_id
506
507
508     def check_if_in_slice(self, nodes_id):
509         slicename = self.get("username")
510         slice_nodes = self.plapi.get_slice_nodes(slicename)
511         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
512         return nodes_inslice
513
514     def do_ping(self, node_id):
515         # A.Q. COMMENT: the execfuncs module in utils will do the local ping for you
516         #               code reuse is good...
517         ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
518         ip = ip[0]['ip']
519         result = subprocess.call(["ping","-c","2",ip],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
520         if result == 0:
521             return False
522         elif result == 1 or result == 2:
523             return True
524
525     # A.Q. Unclear name for method "fail2"
526     def fail2(self):
527         self.fail()
528         msg = "Discovery failed. No candidates found for node"
529         self.error(msg)
530         raise RuntimeError, msg
531            
532                         
533     def valid_connection(self, guid):
534         # TODO: Validate!
535         return True
536
537 #    def blacklist(self):
538 #        # TODO!!!!
539 #        self.warn(" Blacklisting malfunctioning node ")
540 #        #import util
541 #        #util.appendBlacklist(self.hostname)
542