systematic use of context managers for dealing with files instead of open()/close...
[nepi.git] / src / nepi / util / sfaapi.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
18
19 import threading
20 import hashlib
21 import re
22 import os
23 import time
24
25 from nepi.util.logger import Logger
26
27 try:
28     from sfa.client.sfi import Sfi
29     from sfa.util.xrn import hrn_to_urn
30 except ImportError:
31     log = Logger("SFA API")
32     log.debug("Packages sfa-common or sfa-client not installed.\
33          Could not import sfa.client.sfi or sfa.util.xrn")
34
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
36
37 class SFAAPI(object):
38     """
39     API for quering the SFA service. It uses Sfi class from the tool sfi client.
40     """
41     def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42         batch, rtype, timeout):
43
44         self._blacklist = set()
45         self._reserved = set()
46         self._resources_cache = None
47         self._already_cached = False
48         self._ec = ec 
49         self.apis = 1
50
51         if batch:
52             self._testbed_res = rtype
53             self._count = 0
54             self._total = self._get_total_res()
55             self._slice_resources_batch = list()
56
57         self._log = Logger("SFA API")
58         self.api = Sfi()
59         self.rspec_proc = SfaRSpecProcessing()
60         self.lock_slice = threading.Lock()
61         self.lock_blist = threading.Lock()
62         self.lock_resv = threading.Lock()
63
64         self.api.options.timeout = timeout
65         self.api.options.raw = None
66         self.api.options.user = sfi_user
67         self.api.options.auth = sfi_auth
68         self.api.options.registry = sfi_registry
69         self.api.options.sm = sfi_sm
70         self.api.options.user_private_key = private_key
71
72         # Load blacklist from file
73         if ec.get_global('PlanetlabNode', 'persist_blacklist'):
74             self._set_blacklist()
75
76     def _set_blacklist(self):
77         """
78         Initialize the blacklist with previous nodes blacklisted, in 
79         previous runs.
80         """
81         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
82         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
83         with open(plblacklist_file, 'r') as f:
84             hosts_tobl = f.read().splitlines()
85             if hosts_tobl:
86                 for host in hosts_tobl:
87                     self._blacklist.add(host)
88
89     def _get_total_res(self):
90         """
91         Get the total amount of resources instanciated using this API,
92         to be able to add them using the same Allocate and Provision
93         call at once. Specially for Wilabt testbed that doesn't allow 
94         to add slivers after the slice already has some.
95         """
96         rms = list()
97         res_gids = self._ec.resources
98         for gid in res_gids:
99             rm = self._ec.get_resource(gid)
100             if self._testbed_res.lower() in rm._rtype.lower():
101                 rms.append(rm)
102         return rms
103
104     def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
105         """
106         Execute sfi method, which correspond to SFA call. It can be the following
107         calls: Describe, Delete, Allocate, Provision, ListResources.
108         """
109         if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
110             if not slicename:
111                 raise TypeError("The slice hrn is expected for this method %s" % command)
112             if command == 'allocate' and not rspec:
113                 raise TypeError("RSpec is expected for this method %s" % command)
114             
115             if command == 'allocate':
116                 args_list = [slicename, rspec]
117             else:
118                 args_list = [slicename]
119             if command != 'delete':
120                 args_list = args_list + ['-o', '/tmp/rspec_output']
121             if command == 'action':
122                 args_list = [slicename, action]
123
124         elif command == 'resources':
125             args_list = ['-o', '/tmp/rspec_output']
126
127         else: raise TypeError("Sfi method not supported")
128
129         self.api.command = command
130         self.api.command_parser = self.api.create_parser_command(self.api.command)
131         (command_options, command_args) = self.api.command_parser.parse_args(args_list)
132         self.api.command_options = command_options
133         self.api.read_config()
134         self.api.bootstrap()
135
136         try:
137             os.remove("/tmp/rspec_output.rspec")
138         except OSError:
139             self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
140
141         try:
142             self.api.dispatch(command, command_options, command_args)
143             with open("/tmp/rspec_output.rspec", "r") as result_file:
144                 result = result_file.read()
145                 return result
146         except:
147             self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
148             return None
149
150     def get_resources_info(self):
151         """
152         Get all resources and its attributes from aggregate.
153         """
154         try:
155             rspec_slice = self._sfi_exec_method('resources')
156         except:
157             raise RuntimeError("Fail to list resources")
158    
159         self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
160         self._already_cached = True
161         return self._resources_cache
162
163     def get_resources_hrn(self, resources=None):
164         """
165         Get list of resources hrn, without the resource info.
166         """
167         if not resources:
168             if not self._already_cached:
169                 resources = self.get_resources_info()['resource']
170             else:
171                 resources = self._resources_cache['resource']
172
173         component_tohrn = dict()
174         for resource in resources:
175             hrn = resource['hrn'].replace('\\', '')
176             component_tohrn[resource['component_name']] = hrn
177
178         return component_tohrn
179             
180     def get_slice_resources(self, slicename):
181         """
182         Get resources and info from slice.
183         """
184         try:
185             with self.lock_slice:
186                 rspec_slice = self._sfi_exec_method('describe', slicename)
187         except:
188             self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
189
190         if rspec_slice is not None:
191             result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
192             return result
193         else:
194             return {'resource':[],'lease':[]}
195
196
197     def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
198         """
199         Get the list of resources' urn, build the rspec string and call the allocate 
200         and provision method.
201         """
202         resources_hrn_new = list()
203         resource_parts = resource_hrn.split('.')
204         resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
205         resources_hrn_new.append(resource_hrn)
206
207         with self.lock_slice:
208             rspec_slice = self._sfi_exec_method('describe', slicename)
209             if rspec_slice is not None:
210                 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
211             else: slice_resources = []
212             if slice_resources:
213                 slice_resources_hrn = self.get_resources_hrn(slice_resources)
214                 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
215                     s_parts = s_hrn_value.split('.')
216                     s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
217                     resources_hrn_new.append(s_hrn)
218
219
220             resources_urn = self._get_resources_urn(resources_hrn_new)
221             rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, None, leases)
222             with open("/tmp/rspec_input.rspec", "w") as f:
223                 f.truncate(0)
224                 f.write(rspec)
225             
226             if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
227                 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
228
229             # ALLOCATE
230             try:
231                 self._log.debug("Allocating resources in slice %s" % slicename)
232                 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
233             except:
234                 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
235
236             if out is not None:
237                 # PROVISION
238                 try:
239                     self._log.debug("Provisioning resources in slice %s" % slicename)
240                     self._sfi_exec_method('provision', slicename) 
241                 except:
242                     raise RuntimeError("Fail to provision resource for slice %s" % slicename)
243                 return True
244
245     def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
246         """
247         Method to add all resources together to the slice. Previous deletion of slivers.
248         Specially used for wilabt that doesn't allow to add more resources to the slice
249         after some resources are added. Every sliver have to be deleted and the batch 
250         has to be added at once.
251         """
252         self._count += 1
253         self._slice_resources_batch.append(resource_hrn)
254         resources_hrn_new = list()
255         if self._count == len(self._total):
256             check_all_inslice = self._check_all_inslice(self._slice_resources_batch, slicename)
257             if check_all_inslice == True:
258                 return True
259             for resource_hrn in self._slice_resources_batch:
260                 resource_parts = resource_hrn.split('.')
261                 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
262                 resources_hrn_new.append(resource_hrn)
263             with self.lock_slice:
264                 if check_all_inslice != 0:
265                     self._sfi_exec_method('delete', slicename)
266                     time.sleep(480)
267                 
268                 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
269                 resources_urn = self._get_urn(resources_hrn_new)
270                 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
271                 with open("/tmp/rspec_input.rspec", "w") as f:
272                     f.truncate(0)
273                     f.write(rspec)
274
275                 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
276                     raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
277
278                 # ALLOCATE    
279                 try:
280                     self._log.debug("Allocating resources in slice %s" % slicename)
281                     out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
282                 except:
283                     raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
284
285                 if out is not None:
286                     # PROVISION
287                     try:
288                         self._log.debug("Provisioning resources in slice %s" % slicename)
289                         self._sfi_exec_method('provision', slicename)
290                         self._sfi_exec_method('action', slicename=slicename, action='geni_start')
291                     except:
292                         raise RuntimeError("Fail to provision resource for slice %s" % slicename)
293                     return True
294                 else:
295                     raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
296     
297         else:
298             self._log.debug(" Waiting for more nodes to add the batch to the slice ")
299
300     def _check_all_inslice(self, resources_hrn, slicename):
301         slice_res = self.get_slice_resources(slicename)['resource']
302         if slice_res:
303             if len(slice_res[0]['services']) != 0:
304                 slice_res_hrn = self.get_resources_hrn(slice_res).values()
305                 if self._compare_lists(slice_res_hrn, resources_hrn):
306                     return True
307                 else: return len(slice_res_hrn)
308         return 0
309
310     def _compare_lists(self, list1, list2):
311         if len(list1) != len(list2):
312             return False
313         for item in list1:
314             if item not in list2:
315                 return False
316         return True
317
318     def _get_urn(self, resources_hrn):
319         """
320         Get urn from hrn.
321         """
322         resources_urn = list()
323         for hrn in resources_hrn:
324             hrn = hrn.replace("\\", "").split('.')
325             node = hrn.pop()
326             auth = '.'.join(hrn)
327             urn = ['urn:publicid:IDN+', auth, '+node+', node]
328             urn = ''.join(urn)
329             resources_urn.append(urn)
330         return resources_urn
331
332     def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
333         """
334         Remove slivers from slice. Currently sfi doesn't support removing particular
335         slivers.
336         """
337         resource_urn = self._get_resources_urn([resource_hrn]).pop()
338         with self.lock_slice:
339             try:
340                 self._sfi_exec_method('delete', slicename, urn=resource_urn)
341             except:
342                 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
343             return True
344
345     def remove_all_from_slice(self, slicename):
346         """
347         De-allocate and de-provision all slivers of the named slice.
348         Currently sfi doesn't support removing particular
349         slivers, so this method works only for removing every sliver. Setting the
350         resource_hrn parameter is not necessary.
351         """
352         with self.lock_slice:
353             try:
354                 self._sfi_exec_method('delete', slicename)
355             except:
356                 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
357             return True
358
359     def _get_resources_urn(self, resources_hrn):
360         """
361         Builds list of resources' urn based on hrn.
362         """
363         resources_urn = list()
364
365         for resource in resources_hrn:
366             resources_urn.append(hrn_to_urn(resource, 'node'))
367             
368         return resources_urn
369
370     def blacklist_resource(self, resource_hrn):
371         """
372         Adding resource_hrn to blacklist, and taking 
373         the resource from the reserved list.
374         """
375         with self.lock_blist:
376             self._blacklist.add(resource_hrn)
377         with self.lock_resv:
378             if resource_hrn in self._reserved:
379                 self._reserved.remove(resource_hrn)
380
381     def blacklisted(self, resource_hrn):
382         """
383         Check if the resource is in the blacklist. 
384         """
385         with self.lock_blist:
386             if resource_hrn in self._blacklist:
387                 return True
388         return False
389
390     def reserve_resource(self, resource_hrn):
391         """
392         Add resource to the reserved list.
393         """
394         self._reserved.add(resource_hrn)
395
396     def reserved(self, resource_hrn):
397         """
398         Check that the resource in not reserved.
399         """
400         with self.lock_resv:
401             if resource_hrn in self._reserved:
402                 return True
403             else:
404                 self.reserve_resource(resource_hrn)
405                 return False
406
407     def release(self):
408         """
409         Remove hosts from the reserved and blacklist lists, and in case
410         the persist attribute is set, it saves the blacklisted hosts
411         in the blacklist file.
412         """
413         self.apis -= 1
414         if self.apis == 0:
415             blacklist = self._blacklist
416             self._blacklist = set()
417             self._reserved = set()
418 #            if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
419 #                if blacklist:
420 #                    to_blacklist = list()
421 #                    hostnames = self.get_nodes(list(blacklist), ['hostname'])
422 #                    for hostname in hostnames:
423 #                        to_blacklist.append(hostname['hostname'])
424 #
425 #                    nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
426 #                    plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
427 #
428 #                    with open(plblacklist_file, 'w') as f:
429 #                        for host in to_blacklist:
430 #                            f.write("%s\n" % host)
431 #
432
433
434 class SFAAPIFactory(object):
435     """
436     API Factory to manage a map of SFAAPI instances as key-value pairs, it
437     instanciate a single instance per key. The key represents the same SFA, 
438     credentials.
439     """
440
441     _lock = threading.Lock()
442     _apis = dict()
443
444    
445     @classmethod
446     def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
447             batch = False, rtype = None, timeout = None):
448
449         if sfi_user and sfi_sm:
450             key = cls.make_key(sfi_user, sfi_sm)
451             with cls._lock:
452                 api = cls._apis.get(key)
453
454                 if not api:
455                     api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
456                         ec, batch, rtype, timeout)
457                     cls._apis[key] = api
458                 else:
459                     api.apis += 1
460
461                 return api
462
463         return None
464
465     @classmethod
466     def make_key(cls, *args):
467         skey = "".join(map(str, args))
468         return hashlib.md5(skey).hexdigest()
469