2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
25 from nepi.util.logger import Logger
28 from sfa.client.sfi import Sfi
29 from sfa.util.xrn import hrn_to_urn
31 log = Logger("SFA API")
32 log.debug("Packages sfa-common or sfa-client not installed.\
33 Could not import sfa.client.sfi or sfa.util.xrn")
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
39 API for quering the SFA service. It uses Sfi class from the tool sfi client.
41 def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42 batch, rtype, timeout):
44 self._blacklist = set()
45 self._reserved = set()
46 self._resources_cache = None
47 self._already_cached = False
52 self._testbed_res = rtype
54 self._total = self._get_total_res()
55 self._slice_resources_batch = list()
57 self._log = Logger("SFA API")
59 self.rspec_proc = SfaRSpecProcessing()
60 self.lock_slice = threading.Lock()
61 self.lock_blist = threading.Lock()
62 self.lock_resv = threading.Lock()
64 self.api.options.timeout = timeout
65 self.api.options.raw = None
66 self.api.options.user = sfi_user
67 self.api.options.auth = sfi_auth
68 self.api.options.registry = sfi_registry
69 self.api.options.sm = sfi_sm
70 self.api.options.user_private_key = private_key
72 # Load blacklist from file
73 if ec.get_global('PlanetlabNode', 'persist_blacklist'):
76 def _set_blacklist(self):
78 Initialize the blacklist with previous nodes blacklisted, in
81 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
82 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
83 with open(plblacklist_file, 'r') as f:
84 hosts_tobl = f.read().splitlines()
86 for host in hosts_tobl:
87 self._blacklist.add(host)
89 def _get_total_res(self):
91 Get the total amount of resources instanciated using this API,
92 to be able to add them using the same Allocate and Provision
93 call at once. Specially for Wilabt testbed that doesn't allow
94 to add slivers after the slice already has some.
97 res_gids = self._ec.resources
99 rm = self._ec.get_resource(gid)
100 if self._testbed_res.lower() in rm._rtype.lower():
104 def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
106 Execute sfi method, which correspond to SFA call. It can be the following
107 calls: Describe, Delete, Allocate, Provision, ListResources.
109 if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
111 raise TypeError("The slice hrn is expected for this method %s" % command)
112 if command == 'allocate' and not rspec:
113 raise TypeError("RSpec is expected for this method %s" % command)
115 if command == 'allocate':
116 args_list = [slicename, rspec]
118 args_list = [slicename]
119 if command != 'delete':
120 args_list = args_list + ['-o', '/tmp/rspec_output']
121 if command == 'action':
122 args_list = [slicename, action]
124 elif command == 'resources':
125 args_list = ['-o', '/tmp/rspec_output']
127 else: raise TypeError("Sfi method not supported")
129 self.api.command = command
130 self.api.command_parser = self.api.create_parser_command(self.api.command)
131 (command_options, command_args) = self.api.command_parser.parse_args(args_list)
132 self.api.command_options = command_options
133 self.api.read_config()
137 os.remove("/tmp/rspec_output.rspec")
139 self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
142 self.api.dispatch(command, command_options, command_args)
143 with open("/tmp/rspec_output.rspec", "r") as result_file:
144 result = result_file.read()
147 self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
150 def get_resources_info(self):
152 Get all resources and its attributes from aggregate.
155 rspec_slice = self._sfi_exec_method('resources')
157 raise RuntimeError("Fail to list resources")
159 self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
160 self._already_cached = True
161 return self._resources_cache
163 def get_resources_hrn(self, resources=None):
165 Get list of resources hrn, without the resource info.
168 if not self._already_cached:
169 resources = self.get_resources_info()['resource']
171 resources = self._resources_cache['resource']
173 component_tohrn = dict()
174 for resource in resources:
175 hrn = resource['hrn'].replace('\\', '')
176 component_tohrn[resource['component_name']] = hrn
178 return component_tohrn
180 def get_slice_resources(self, slicename):
182 Get resources and info from slice.
185 with self.lock_slice:
186 rspec_slice = self._sfi_exec_method('describe', slicename)
188 self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
190 if rspec_slice is not None:
191 result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
194 return {'resource':[],'lease':[]}
197 def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
199 Get the list of resources' urn, build the rspec string and call the allocate
200 and provision method.
202 resources_hrn_new = list()
203 resource_parts = resource_hrn.split('.')
204 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
205 resources_hrn_new.append(resource_hrn)
207 with self.lock_slice:
208 rspec_slice = self._sfi_exec_method('describe', slicename)
209 if rspec_slice is not None:
210 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
211 else: slice_resources = []
213 slice_resources_hrn = self.get_resources_hrn(slice_resources)
214 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
215 s_parts = s_hrn_value.split('.')
216 s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
217 resources_hrn_new.append(s_hrn)
220 resources_urn = self._get_resources_urn(resources_hrn_new)
221 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, None, leases)
222 f = open("/tmp/rspec_input.rspec", "w")
227 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
228 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
232 self._log.debug("Allocating resources in slice %s" % slicename)
233 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
235 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
240 self._log.debug("Provisioning resources in slice %s" % slicename)
241 self._sfi_exec_method('provision', slicename)
243 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
246 def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
248 Method to add all resources together to the slice. Previous deletion of slivers.
249 Specially used for wilabt that doesn't allow to add more resources to the slice
250 after some resources are added. Every sliver have to be deleted and the batch
251 has to be added at once.
254 self._slice_resources_batch.append(resource_hrn)
255 resources_hrn_new = list()
256 if self._count == len(self._total):
257 check_all_inslice = self._check_all_inslice(self._slice_resources_batch, slicename)
258 if check_all_inslice == True:
260 for resource_hrn in self._slice_resources_batch:
261 resource_parts = resource_hrn.split('.')
262 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
263 resources_hrn_new.append(resource_hrn)
264 with self.lock_slice:
265 if check_all_inslice != 0:
266 self._sfi_exec_method('delete', slicename)
269 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
270 resources_urn = self._get_urn(resources_hrn_new)
271 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
272 f = open("/tmp/rspec_input.rspec", "w")
277 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
278 raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
282 self._log.debug("Allocating resources in slice %s" % slicename)
283 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
285 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
290 self._log.debug("Provisioning resources in slice %s" % slicename)
291 self._sfi_exec_method('provision', slicename)
292 self._sfi_exec_method('action', slicename=slicename, action='geni_start')
294 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
297 raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
300 self._log.debug(" Waiting for more nodes to add the batch to the slice ")
302 def _check_all_inslice(self, resources_hrn, slicename):
303 slice_res = self.get_slice_resources(slicename)['resource']
305 if len(slice_res[0]['services']) != 0:
306 slice_res_hrn = self.get_resources_hrn(slice_res).values()
307 if self._compare_lists(slice_res_hrn, resources_hrn):
309 else: return len(slice_res_hrn)
312 def _compare_lists(self, list1, list2):
313 if len(list1) != len(list2):
316 if item not in list2:
320 def _get_urn(self, resources_hrn):
324 resources_urn = list()
325 for hrn in resources_hrn:
326 hrn = hrn.replace("\\", "").split('.')
329 urn = ['urn:publicid:IDN+', auth, '+node+', node]
331 resources_urn.append(urn)
334 def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
336 Remove slivers from slice. Currently sfi doesn't support removing particular
339 resource_urn = self._get_resources_urn([resource_hrn]).pop()
340 with self.lock_slice:
342 self._sfi_exec_method('delete', slicename, urn=resource_urn)
344 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
347 def remove_all_from_slice(self, slicename):
349 De-allocate and de-provision all slivers of the named slice.
350 Currently sfi doesn't support removing particular
351 slivers, so this method works only for removing every sliver. Setting the
352 resource_hrn parameter is not necessary.
354 with self.lock_slice:
356 self._sfi_exec_method('delete', slicename)
358 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
361 def _get_resources_urn(self, resources_hrn):
363 Builds list of resources' urn based on hrn.
365 resources_urn = list()
367 for resource in resources_hrn:
368 resources_urn.append(hrn_to_urn(resource, 'node'))
372 def blacklist_resource(self, resource_hrn):
374 Adding resource_hrn to blacklist, and taking
375 the resource from the reserved list.
377 with self.lock_blist:
378 self._blacklist.add(resource_hrn)
380 if resource_hrn in self._reserved:
381 self._reserved.remove(resource_hrn)
383 def blacklisted(self, resource_hrn):
385 Check if the resource is in the blacklist.
387 with self.lock_blist:
388 if resource_hrn in self._blacklist:
392 def reserve_resource(self, resource_hrn):
394 Add resource to the reserved list.
396 self._reserved.add(resource_hrn)
398 def reserved(self, resource_hrn):
400 Check that the resource in not reserved.
403 if resource_hrn in self._reserved:
406 self.reserve_resource(resource_hrn)
411 Remove hosts from the reserved and blacklist lists, and in case
412 the persist attribute is set, it saves the blacklisted hosts
413 in the blacklist file.
417 blacklist = self._blacklist
418 self._blacklist = set()
419 self._reserved = set()
420 # if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
422 # to_blacklist = list()
423 # hostnames = self.get_nodes(list(blacklist), ['hostname'])
424 # for hostname in hostnames:
425 # to_blacklist.append(hostname['hostname'])
427 # nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
428 # plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
430 # with open(plblacklist_file, 'w') as f:
431 # for host in to_blacklist:
432 # f.write("%s\n" % host)
436 class SFAAPIFactory(object):
438 API Factory to manage a map of SFAAPI instances as key-value pairs, it
439 instanciate a single instance per key. The key represents the same SFA,
443 _lock = threading.Lock()
448 def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
449 batch = False, rtype = None, timeout = None):
451 if sfi_user and sfi_sm:
452 key = cls.make_key(sfi_user, sfi_sm)
454 api = cls._apis.get(key)
457 api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
458 ec, batch, rtype, timeout)
468 def make_key(cls, *args):
469 skey = "".join(map(str, args))
470 return hashlib.md5(skey).hexdigest()