3dcc7e68dc63096ec68c5b69592ac6471783665f
[nepi.git] / src / nepi / util / sfaapi.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
18
19 import threading
20 import hashlib
21 import re
22 import os
23 import time
24
25 from nepi.util.logger import Logger
26
27 try:
28     from sfa.client.sfi import Sfi
29     from sfa.util.xrn import hrn_to_urn
30 except ImportError:
31     log = Logger("SFA API")
32     log.debug("Packages sfa-common or sfa-client not installed.\
33          Could not import sfa.client.sfi or sfa.util.xrn")
34
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
36
37 class SFAAPI(object):
38     """
39     API for quering the SFA service. It uses Sfi class from the tool sfi client.
40     """
41     def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42         batch, rtype, timeout):
43
44         self._blacklist = set()
45         self._reserved = set()
46         self._resources_cache = None
47         self._already_cached = False
48         self._ec = ec 
49         self.apis = 1
50
51         if batch:
52             self._testbed_res = rtype
53             self._count = 0
54             self._total = self._get_total_res()
55             self._slice_resources_batch = list()
56
57         self._log = Logger("SFA API")
58         self.api = Sfi()
59         self.rspec_proc = SfaRSpecProcessing()
60         self.lock_slice = threading.Lock()
61         self.lock_blist = threading.Lock()
62         self.lock_resv = threading.Lock()
63
64         self.api.options.timeout = timeout
65         self.api.options.raw = None
66         self.api.options.user = sfi_user
67         self.api.options.auth = sfi_auth
68         self.api.options.registry = sfi_registry
69         self.api.options.sm = sfi_sm
70         self.api.options.user_private_key = private_key
71
72         # Load blacklist from file
73         if ec.get_global('PlanetlabNode', 'persist_blacklist'):
74             self._set_blacklist()
75
76     def _set_blacklist(self):
77         """
78         Initialize the blacklist with previous nodes blacklisted, in 
79         previous runs.
80         """
81         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
82         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
83         with open(plblacklist_file, 'r') as f:
84             hosts_tobl = f.read().splitlines()
85             if hosts_tobl:
86                 for host in hosts_tobl:
87                     self._blacklist.add(host)
88
89     def _get_total_res(self):
90         """
91         Get the total amount of resources instanciated using this API,
92         to be able to add them using the same Allocate and Provision
93         call at once. Specially for Wilabt testbed that doesn't allow 
94         to add slivers after the slice already has some.
95         """
96         rms = list()
97         res_gids = self._ec.resources
98         for gid in res_gids:
99             rm = self._ec.get_resource(gid)
100             if self._testbed_res.lower() in rm._rtype.lower():
101                 rms.append(rm)
102         return rms
103
104     def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
105         """
106         Execute sfi method, which correspond to SFA call. It can be the following
107         calls: Describe, Delete, Allocate, Provision, ListResources.
108         """
109         if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
110             if not slicename:
111                 raise TypeError("The slice hrn is expected for this method %s" % command)
112             if command == 'allocate' and not rspec:
113                 raise TypeError("RSpec is expected for this method %s" % command)
114             
115             if command == 'allocate':
116                 args_list = [slicename, rspec]
117             else:
118                 args_list = [slicename]
119             if command != 'delete':
120                 args_list = args_list + ['-o', '/tmp/rspec_output']
121             if command == 'action':
122                 args_list = [slicename, action]
123
124         elif command == 'resources':
125             args_list = ['-o', '/tmp/rspec_output']
126
127         else: raise TypeError("Sfi method not supported")
128
129         self.api.command = command
130         self.api.command_parser = self.api.create_parser_command(self.api.command)
131         (command_options, command_args) = self.api.command_parser.parse_args(args_list)
132         self.api.command_options = command_options
133         self.api.read_config()
134         self.api.bootstrap()
135
136         try:
137             os.remove("/tmp/rspec_output.rspec")
138         except OSError:
139             self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
140
141         try:
142             self.api.dispatch(command, command_options, command_args)
143             with open("/tmp/rspec_output.rspec", "r") as result_file:
144                 result = result_file.read()
145                 return result
146         except:
147             self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
148             return None
149
150     def get_resources_info(self):
151         """
152         Get all resources and its attributes from aggregate.
153         """
154         try:
155             rspec_slice = self._sfi_exec_method('resources')
156         except:
157             raise RuntimeError("Fail to list resources")
158    
159         self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
160         self._already_cached = True
161         return self._resources_cache
162
163     def get_resources_hrn(self, resources=None):
164         """
165         Get list of resources hrn, without the resource info.
166         """
167         if not resources:
168             if not self._already_cached:
169                 resources = self.get_resources_info()['resource']
170             else:
171                 resources = self._resources_cache['resource']
172
173         component_tohrn = dict()
174         for resource in resources:
175             hrn = resource['hrn'].replace('\\', '')
176             component_tohrn[resource['component_name']] = hrn
177
178         return component_tohrn
179             
180     def get_slice_resources(self, slicename):
181         """
182         Get resources and info from slice.
183         """
184         try:
185             with self.lock_slice:
186                 rspec_slice = self._sfi_exec_method('describe', slicename)
187         except:
188             self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
189
190         if rspec_slice is not None:
191             result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
192             return result
193         else:
194             return {'resource':[],'lease':[]}
195
196
197     def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
198         """
199         Get the list of resources' urn, build the rspec string and call the allocate 
200         and provision method.
201         """
202         resources_hrn_new = list()
203         resource_parts = resource_hrn.split('.')
204         resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
205         resources_hrn_new.append(resource_hrn)
206
207         with self.lock_slice:
208             rspec_slice = self._sfi_exec_method('describe', slicename)
209             if rspec_slice is not None:
210                 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
211             else: slice_resources = []
212             if slice_resources:
213                 slice_resources_hrn = self.get_resources_hrn(slice_resources)
214                 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
215                     s_parts = s_hrn_value.split('.')
216                     s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
217                     resources_hrn_new.append(s_hrn)
218
219
220             resources_urn = self._get_resources_urn(resources_hrn_new)
221             rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, None, leases)
222             f = open("/tmp/rspec_input.rspec", "w")
223             f.truncate(0)
224             f.write(rspec)
225             f.close()
226             
227             if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
228                 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
229
230             # ALLOCATE
231             try:
232                 self._log.debug("Allocating resources in slice %s" % slicename)
233                 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
234             except:
235                 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
236
237             if out is not None:
238                 # PROVISION
239                 try:
240                     self._log.debug("Provisioning resources in slice %s" % slicename)
241                     self._sfi_exec_method('provision', slicename) 
242                 except:
243                     raise RuntimeError("Fail to provision resource for slice %s" % slicename)
244                 return True
245
246     def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
247         """
248         Method to add all resources together to the slice. Previous deletion of slivers.
249         Specially used for wilabt that doesn't allow to add more resources to the slice
250         after some resources are added. Every sliver have to be deleted and the batch 
251         has to be added at once.
252         """
253         self._count += 1
254         self._slice_resources_batch.append(resource_hrn)
255         resources_hrn_new = list()
256         if self._count == len(self._total):
257             check_all_inslice = self._check_all_inslice(self._slice_resources_batch, slicename)
258             if check_all_inslice == True:
259                 return True
260             for resource_hrn in self._slice_resources_batch:
261                 resource_parts = resource_hrn.split('.')
262                 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
263                 resources_hrn_new.append(resource_hrn)
264             with self.lock_slice:
265                 if check_all_inslice != 0:
266                     self._sfi_exec_method('delete', slicename)
267                     time.sleep(480)
268                 
269                 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
270                 resources_urn = self._get_urn(resources_hrn_new)
271                 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
272                 f = open("/tmp/rspec_input.rspec", "w")
273                 f.truncate(0)
274                 f.write(rspec)
275                 f.close()
276
277                 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
278                     raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
279
280                 # ALLOCATE    
281                 try:
282                     self._log.debug("Allocating resources in slice %s" % slicename)
283                     out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
284                 except:
285                     raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
286
287                 if out is not None:
288                     # PROVISION
289                     try:
290                         self._log.debug("Provisioning resources in slice %s" % slicename)
291                         self._sfi_exec_method('provision', slicename)
292                         self._sfi_exec_method('action', slicename=slicename, action='geni_start')
293                     except:
294                         raise RuntimeError("Fail to provision resource for slice %s" % slicename)
295                     return True
296                 else:
297                     raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
298     
299         else:
300             self._log.debug(" Waiting for more nodes to add the batch to the slice ")
301
302     def _check_all_inslice(self, resources_hrn, slicename):
303         slice_res = self.get_slice_resources(slicename)['resource']
304         if slice_res:
305             if len(slice_res[0]['services']) != 0:
306                 slice_res_hrn = self.get_resources_hrn(slice_res).values()
307                 if self._compare_lists(slice_res_hrn, resources_hrn):
308                     return True
309                 else: return len(slice_res_hrn)
310         return 0
311
312     def _compare_lists(self, list1, list2):
313         if len(list1) != len(list2):
314             return False
315         for item in list1:
316             if item not in list2:
317                 return False
318         return True
319
320     def _get_urn(self, resources_hrn):
321         """
322         Get urn from hrn.
323         """
324         resources_urn = list()
325         for hrn in resources_hrn:
326             hrn = hrn.replace("\\", "").split('.')
327             node = hrn.pop()
328             auth = '.'.join(hrn)
329             urn = ['urn:publicid:IDN+', auth, '+node+', node]
330             urn = ''.join(urn)
331             resources_urn.append(urn)
332         return resources_urn
333
334     def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
335         """
336         Remove slivers from slice. Currently sfi doesn't support removing particular
337         slivers.
338         """
339         resource_urn = self._get_resources_urn([resource_hrn]).pop()
340         with self.lock_slice:
341             try:
342                 self._sfi_exec_method('delete', slicename, urn=resource_urn)
343             except:
344                 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
345             return True
346
347     def remove_all_from_slice(self, slicename):
348         """
349         De-allocate and de-provision all slivers of the named slice.
350         Currently sfi doesn't support removing particular
351         slivers, so this method works only for removing every sliver. Setting the
352         resource_hrn parameter is not necessary.
353         """
354         with self.lock_slice:
355             try:
356                 self._sfi_exec_method('delete', slicename)
357             except:
358                 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
359             return True
360
361     def _get_resources_urn(self, resources_hrn):
362         """
363         Builds list of resources' urn based on hrn.
364         """
365         resources_urn = list()
366
367         for resource in resources_hrn:
368             resources_urn.append(hrn_to_urn(resource, 'node'))
369             
370         return resources_urn
371
372     def blacklist_resource(self, resource_hrn):
373         """
374         Adding resource_hrn to blacklist, and taking 
375         the resource from the reserved list.
376         """
377         with self.lock_blist:
378             self._blacklist.add(resource_hrn)
379         with self.lock_resv:
380             if resource_hrn in self._reserved:
381                 self._reserved.remove(resource_hrn)
382
383     def blacklisted(self, resource_hrn):
384         """
385         Check if the resource is in the blacklist. 
386         """
387         with self.lock_blist:
388             if resource_hrn in self._blacklist:
389                 return True
390         return False
391
392     def reserve_resource(self, resource_hrn):
393         """
394         Add resource to the reserved list.
395         """
396         self._reserved.add(resource_hrn)
397
398     def reserved(self, resource_hrn):
399         """
400         Check that the resource in not reserved.
401         """
402         with self.lock_resv:
403             if resource_hrn in self._reserved:
404                 return True
405             else:
406                 self.reserve_resource(resource_hrn)
407                 return False
408
409     def release(self):
410         """
411         Remove hosts from the reserved and blacklist lists, and in case
412         the persist attribute is set, it saves the blacklisted hosts
413         in the blacklist file.
414         """
415         self.apis -= 1
416         if self.apis == 0:
417             blacklist = self._blacklist
418             self._blacklist = set()
419             self._reserved = set()
420 #            if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
421 #                if blacklist:
422 #                    to_blacklist = list()
423 #                    hostnames = self.get_nodes(list(blacklist), ['hostname'])
424 #                    for hostname in hostnames:
425 #                        to_blacklist.append(hostname['hostname'])
426 #
427 #                    nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
428 #                    plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
429 #
430 #                    with open(plblacklist_file, 'w') as f:
431 #                        for host in to_blacklist:
432 #                            f.write("%s\n" % host)
433 #
434
435
436 class SFAAPIFactory(object):
437     """
438     API Factory to manage a map of SFAAPI instances as key-value pairs, it
439     instanciate a single instance per key. The key represents the same SFA, 
440     credentials.
441     """
442
443     _lock = threading.Lock()
444     _apis = dict()
445
446    
447     @classmethod
448     def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
449             batch = False, rtype = None, timeout = None):
450
451         if sfi_user and sfi_sm:
452             key = cls.make_key(sfi_user, sfi_sm)
453             with cls._lock:
454                 api = cls._apis.get(key)
455
456                 if not api:
457                     api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
458                         ec, batch, rtype, timeout)
459                     cls._apis[key] = api
460                 else:
461                     api.apis += 1
462
463                 return api
464
465         return None
466
467     @classmethod
468     def make_key(cls, *args):
469         skey = "".join(map(str, args))
470         return hashlib.md5(skey).hexdigest()
471