Changes for release
[nepi.git] / src / nepi / util / sfaapi.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
19
20 import threading
21 import hashlib
22 import re
23 import os
24
25 from nepi.util.logger import Logger
26
27 try:
28     from sfa.client.sfi import Sfi
29     from sfa.util.xrn import hrn_to_urn
30 except ImportError:
31     log = Logger("SFA API")
32     log.debug("Packages sfa-common or sfa-client not installed.\
33          Could not import sfa.client.sfi or sfa.util.xrn")
34
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
36
37 class SFAAPI(object):
38     """
39     API for quering the SFA service. It uses Sfi class from the tool sfi client.
40     """
41     def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42         batch, rtype, timeout):
43
44         self._blacklist = set()
45         self._reserved = set()
46         self._resources_cache = None
47         self._already_cached = False
48         self._ec = ec 
49         self.apis = 1
50
51         if batch:
52             self._testbed_res = rtype
53             self._count = 0
54             self._total = self._get_total_res()
55             self._slice_resources_batch = list()
56
57         self._log = Logger("SFA API")
58         self.api = Sfi()
59         self.rspec_proc = SfaRSpecProcessing()
60         self.lock_slice = threading.Lock()
61         self.lock_blist = threading.Lock()
62         self.lock_resv = threading.Lock()
63
64         self.api.options.timeout = timeout
65         self.api.options.raw = None
66         self.api.options.user = sfi_user
67         self.api.options.auth = sfi_auth
68         self.api.options.registry = sfi_registry
69         self.api.options.sm = sfi_sm
70         self.api.options.user_private_key = private_key
71
72         # Load blacklist from file
73         if ec.get_global('PlanetlabNode', 'persist_blacklist'):
74             self._set_blacklist()
75
76     def _set_blacklist(self):
77         """
78         Initialize the blacklist with previous nodes blacklisted, in 
79         previous runs.
80         """
81         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
82         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
83         with open(plblacklist_file, 'r') as f:
84             hosts_tobl = f.read().splitlines()
85             if hosts_tobl:
86                 for host in hosts_tobl:
87                     self._blacklist.add(host)
88
89     def _get_total_res(self):
90         """
91         Get the total amount of resources instanciated using this API,
92         to be able to add them using the same Allocate and Provision
93         call at once. Specially for Wilabt testbed that doesn't allow 
94         to add slivers after the slice already has some.
95         """
96         rms = list()
97         res_gids = self._ec.resources
98         for gid in res_gids:
99             rm = self._ec.get_resource(gid)
100             if self._testbed_res.lower() in rm._rtype.lower():
101                 rms.append(rm)
102         return rms
103
104     def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
105         """
106         Execute sfi method, which correspond to SFA call. It can be the following
107         calls: Describe, Delete, Allocate, Provision, ListResources.
108         """
109         if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
110             if not slicename:
111                 raise TypeError("The slice hrn is expected for this method %s" % command)
112             if command == 'allocate' and not rspec:
113                 raise TypeError("RSpec is expected for this method %s" % command)
114             
115             if command == 'allocate':
116                 args_list = [slicename, rspec]
117             else:
118                 args_list = [slicename]
119             if command != 'delete':
120                 args_list = args_list + ['-o', '/tmp/rspec_output']
121             if command == 'action':
122                 args_list = [slicename, action]
123
124         elif command == 'resources':
125             args_list = ['-o', '/tmp/rspec_output']
126
127         else: raise TypeError("Sfi method not supported")
128
129         self.api.command = command
130         self.api.command_parser = self.api.create_parser_command(self.api.command)
131         (command_options, command_args) = self.api.command_parser.parse_args(args_list)
132         self.api.command_options = command_options
133         self.api.read_config()
134         self.api.bootstrap()
135
136         try:
137             os.remove("/tmp/rspec_output.rspec")
138         except OSError:
139             self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
140
141         try:
142             self.api.dispatch(command, command_options, command_args)
143             with open("/tmp/rspec_output.rspec", "r") as result_file:
144                 result = result_file.read()
145                 return result
146         except:
147             self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
148             return None
149
150     def get_resources_info(self):
151         """
152         Get all resources and its attributes from aggregate.
153         """
154         try:
155             rspec_slice = self._sfi_exec_method('resources')
156         except:
157             raise RuntimeError("Fail to list resources")
158    
159         self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
160         self._already_cached = True
161         return self._resources_cache
162
163     def get_resources_hrn(self, resources=None):
164         """
165         Get list of resources hrn, without the resource info.
166         """
167         if not resources:
168             if not self._already_cached:
169                 resources = self.get_resources_info()['resource']
170             else:
171                 resources = self._resources_cache['resource']
172
173         component_tohrn = dict()
174         for resource in resources:
175             hrn = resource['hrn'].replace('\\', '')
176             component_tohrn[resource['component_name']] = hrn
177
178         return component_tohrn
179             
180     def get_slice_resources(self, slicename):
181         """
182         Get resources and info from slice.
183         """
184         try:
185             with self.lock_slice:
186                 rspec_slice = self._sfi_exec_method('describe', slicename)
187         except:
188             self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
189
190         if rspec_slice is not None:
191             result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
192             return result
193         else:
194             return {'resource':[],'lease':[]}
195
196
197     def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
198         """
199         Get the list of resources' urn, build the rspec string and call the allocate 
200         and provision method.
201         """
202         resources_hrn_new = list()
203         resource_parts = resource_hrn.split('.')
204         resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
205         resources_hrn_new.append(resource_hrn)
206
207         with self.lock_slice:
208             rspec_slice = self._sfi_exec_method('describe', slicename)
209             if rspec_slice is not None:
210                 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
211             else: slice_resources = []
212             if slice_resources:
213                 slice_resources_hrn = self.get_resources_hrn(slice_resources)
214                 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
215                     s_parts = s_hrn_value.split('.')
216                     s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
217                     resources_hrn_new.append(s_hrn)
218
219
220             resources_urn = self._get_resources_urn(resources_hrn_new)
221             rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, None, leases)
222             f = open("/tmp/rspec_input.rspec", "w")
223             f.truncate(0)
224             f.write(rspec)
225             f.close()
226             
227             if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
228                 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
229
230             # ALLOCATE
231             try:
232                 self._log.debug("Allocating resources in slice %s" % slicename)
233                 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
234             except:
235                 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
236
237             if out is not None:
238                 # PROVISION
239                 try:
240                     self._log.debug("Provisioning resources in slice %s" % slicename)
241                     self._sfi_exec_method('provision', slicename) 
242                 except:
243                     raise RuntimeError("Fail to provision resource for slice %s" % slicename)
244                 return True
245
246     def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
247         """
248         Method to add all resources together to the slice. Previous deletion of slivers.
249         Specially used for wilabt that doesn't allow to add more resources to the slice
250         after some resources are added. Every sliver have to be deleted and the batch 
251         has to be added at once.
252         """
253         self._count += 1
254         self._slice_resources_batch.append(resource_hrn)
255         resources_hrn_new = list()
256         if self._count == len(self._total):
257             for resource_hrn in self._slice_resources_batch:
258                 resource_parts = resource_hrn.split('.')
259                 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
260                 resources_hrn_new.append(resource_hrn)
261             with self.lock_slice:
262                 self._sfi_exec_method('delete', slicename)
263                 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
264                 resources_urn = self._get_urn(resources_hrn_new)
265                 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
266                 f = open("/tmp/rspec_input.rspec", "w")
267                 f.truncate(0)
268                 f.write(rspec)
269                 f.close()
270
271                 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
272                     raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
273
274                 # ALLOCATE    
275                 try:
276                     self._log.debug("Allocating resources in slice %s" % slicename)
277                     out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
278                 except:
279                     raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
280
281                 if out is not None:
282                     # PROVISION
283                     try:
284                         self._log.debug("Provisioning resources in slice %s" % slicename)
285                         self._sfi_exec_method('provision', slicename)
286                         self._sfi_exec_method('action', slicename=slicename, action='geni_start')
287                     except:
288                         raise RuntimeError("Fail to provision resource for slice %s" % slicename)
289                     return True
290                 else:
291                     raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
292     
293         else:
294             self._log.debug(" Waiting for more nodes to add the batch to the slice ")
295
296     def _get_urn(self, resources_hrn):
297         """
298         Get urn from hrn.
299         """
300         resources_urn = list()
301         for hrn in resources_hrn:
302             hrn = hrn.replace("\\", "").split('.')
303             node = hrn.pop()
304             auth = '.'.join(hrn)
305             urn = ['urn:publicid:IDN+', auth, '+node+', node]
306             urn = ''.join(urn)
307             resources_urn.append(urn)
308         return resources_urn
309
310     def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
311         """
312         Remove slivers from slice. Currently sfi doesn't support removing particular
313         slivers.
314         """
315         resource_urn = self._get_resources_urn([resource_hrn]).pop()
316         with self.lock_slice:
317             try:
318                 self._sfi_exec_method('delete', slicename, urn=resource_urn)
319             except:
320                 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
321             return True
322
323     def remove_all_from_slice(self, slicename):
324         """
325         De-allocate and de-provision all slivers of the named slice.
326         Currently sfi doesn't support removing particular
327         slivers, so this method works only for removing every sliver. Setting the
328         resource_hrn parameter is not necessary.
329         """
330         with self.lock_slice:
331             try:
332                 self._sfi_exec_method('delete', slicename)
333             except:
334                 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
335             return True
336
337     def _get_resources_urn(self, resources_hrn):
338         """
339         Builds list of resources' urn based on hrn.
340         """
341         resources_urn = list()
342
343         for resource in resources_hrn:
344             resources_urn.append(hrn_to_urn(resource, 'node'))
345             
346         return resources_urn
347
348     def blacklist_resource(self, resource_hrn):
349         """
350         Adding resource_hrn to blacklist, and taking 
351         the resource from the reserved list.
352         """
353         with self.lock_blist:
354             self._blacklist.add(resource_hrn)
355         with self.lock_resv:
356             if resource_hrn in self._reserved:
357                 self._reserved.remove(resource_hrn)
358
359     def blacklisted(self, resource_hrn):
360         """
361         Check if the resource is in the blacklist. 
362         """
363         with self.lock_blist:
364             if resource_hrn in self._blacklist:
365                 return True
366         return False
367
368     def reserve_resource(self, resource_hrn):
369         """
370         Add resource to the reserved list.
371         """
372         self._reserved.add(resource_hrn)
373
374     def reserved(self, resource_hrn):
375         """
376         Check that the resource in not reserved.
377         """
378         with self.lock_resv:
379             if resource_hrn in self._reserved:
380                 return True
381             else:
382                 self.reserve_resource(resource_hrn)
383                 return False
384
385     def release(self):
386         """
387         Remove hosts from the reserved and blacklist lists, and in case
388         the persist attribute is set, it saves the blacklisted hosts
389         in the blacklist file.
390         """
391         self.apis -= 1
392         if self.apis == 0:
393             blacklist = self._blacklist
394             self._blacklist = set()
395             self._reserved = set()
396 #            if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
397 #                if blacklist:
398 #                    to_blacklist = list()
399 #                    hostnames = self.get_nodes(list(blacklist), ['hostname'])
400 #                    for hostname in hostnames:
401 #                        to_blacklist.append(hostname['hostname'])
402 #
403 #                    nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
404 #                    plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
405 #
406 #                    with open(plblacklist_file, 'w') as f:
407 #                        for host in to_blacklist:
408 #                            f.write("%s\n" % host)
409 #
410
411
412 class SFAAPIFactory(object):
413     """
414     API Factory to manage a map of SFAAPI instances as key-value pairs, it
415     instanciate a single instance per key. The key represents the same SFA, 
416     credentials.
417     """
418
419     _lock = threading.Lock()
420     _apis = dict()
421
422    
423     @classmethod
424     def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
425             batch = False, rtype = None, timeout = None):
426
427         if sfi_user and sfi_sm:
428             key = cls.make_key(sfi_user, sfi_sm)
429             with cls._lock:
430                 api = cls._apis.get(key)
431
432                 if not api:
433                     api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
434                         ec, batch, rtype, timeout)
435                     cls._apis[key] = api
436                 else:
437                     api.apis += 1
438
439                 return api
440
441         return None
442
443     @classmethod
444     def make_key(cls, *args):
445         skey = "".join(map(str, args))
446         return hashlib.md5(skey).hexdigest()
447