2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
25 from nepi.util.logger import Logger
28 from sfa.client.sfi import Sfi
29 from sfa.util.xrn import hrn_to_urn
31 log = Logger("SFA API")
32 log.debug("Packages sfa-common or sfa-client not installed.\
33 Could not import sfa.client.sfi or sfa.util.xrn")
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
39 API for quering the SFA service. It uses Sfi class from the tool sfi client.
41 def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42 batch, rtype, timeout):
44 self._blacklist = set()
45 self._reserved = set()
46 self._resources_cache = None
47 self._already_cached = False
52 self._testbed_res = rtype
54 self._total = self._get_total_res()
55 self._slice_resources_batch = list()
57 self._log = Logger("SFA API")
59 self.rspec_proc = SfaRSpecProcessing()
60 self.lock_slice = threading.Lock()
61 self.lock_blist = threading.Lock()
62 self.lock_resv = threading.Lock()
64 self.api.options.timeout = timeout
65 self.api.options.raw = None
66 self.api.options.user = sfi_user
67 self.api.options.auth = sfi_auth
68 self.api.options.registry = sfi_registry
69 self.api.options.sm = sfi_sm
70 self.api.options.user_private_key = private_key
72 # Load blacklist from file
73 if ec.get_global('PlanetlabNode', 'persist_blacklist'):
76 def _set_blacklist(self):
78 Initialize the blacklist with previous nodes blacklisted, in
81 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
82 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
83 with open(plblacklist_file, 'r') as f:
84 hosts_tobl = f.read().splitlines()
86 for host in hosts_tobl:
87 self._blacklist.add(host)
89 def _get_total_res(self):
91 Get the total amount of resources instanciated using this API,
92 to be able to add them using the same Allocate and Provision
93 call at once. Specially for Wilabt testbed that doesn't allow
94 to add slivers after the slice already has some.
97 res_gids = self._ec.resources
99 rm = self._ec.get_resource(gid)
100 if self._testbed_res.lower() in rm._rtype.lower():
104 def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
106 Execute sfi method, which correspond to SFA call. It can be the following
107 calls: Describe, Delete, Allocate, Provision, ListResources.
109 if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
111 raise TypeError("The slice hrn is expected for this method %s" % command)
112 if command == 'allocate' and not rspec:
113 raise TypeError("RSpec is expected for this method %s" % command)
115 if command == 'allocate':
116 args_list = [slicename, rspec]
118 args_list = [slicename]
119 if command != 'delete':
120 args_list = args_list + ['-o', '/tmp/rspec_output']
121 if command == 'action':
122 args_list = [slicename, action]
124 elif command == 'resources':
125 args_list = ['-o', '/tmp/rspec_output']
127 else: raise TypeError("Sfi method not supported")
129 self.api.command = command
130 self.api.command_parser = self.api.create_parser_command(self.api.command)
131 (command_options, command_args) = self.api.command_parser.parse_args(args_list)
132 self.api.command_options = command_options
133 self.api.read_config()
137 os.remove("/tmp/rspec_output.rspec")
139 self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
142 self.api.dispatch(command, command_options, command_args)
143 with open("/tmp/rspec_output.rspec", "r") as result_file:
144 result = result_file.read()
147 self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
150 def get_resources_info(self):
152 Get all resources and its attributes from aggregate.
155 rspec_slice = self._sfi_exec_method('resources')
157 raise RuntimeError("Fail to list resources")
159 self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
160 self._already_cached = True
161 return self._resources_cache
163 def get_resources_hrn(self, resources=None):
165 Get list of resources hrn, without the resource info.
168 if not self._already_cached:
169 resources = self.get_resources_info()['resource']
171 resources = self._resources_cache['resource']
173 component_tohrn = dict()
174 for resource in resources:
175 hrn = resource['hrn'].replace('\\', '')
176 component_tohrn[resource['component_name']] = hrn
178 return component_tohrn
180 def get_slice_resources(self, slicename):
182 Get resources and info from slice.
185 with self.lock_slice:
186 rspec_slice = self._sfi_exec_method('describe', slicename)
188 self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
190 if rspec_slice is not None:
191 result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
194 return {'resource':[],'lease':[]}
197 def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
199 Get the list of resources' urn, build the rspec string and call the allocate
200 and provision method.
202 resources_hrn_new = list()
203 resource_parts = resource_hrn.split('.')
204 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
205 resources_hrn_new.append(resource_hrn)
207 with self.lock_slice:
208 rspec_slice = self._sfi_exec_method('describe', slicename)
209 if rspec_slice is not None:
210 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
211 else: slice_resources = []
213 slice_resources_hrn = self.get_resources_hrn(slice_resources)
214 for s_hrn_key, s_hrn_value in slice_resources_hrn.items():
215 s_parts = s_hrn_value.split('.')
216 s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
217 resources_hrn_new.append(s_hrn)
220 resources_urn = self._get_resources_urn(resources_hrn_new)
221 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, None, leases)
222 with open("/tmp/rspec_input.rspec", "w") as f:
226 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
227 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
231 self._log.debug("Allocating resources in slice %s" % slicename)
232 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
234 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
239 self._log.debug("Provisioning resources in slice %s" % slicename)
240 self._sfi_exec_method('provision', slicename)
242 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
245 def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
247 Method to add all resources together to the slice. Previous deletion of slivers.
248 Specially used for wilabt that doesn't allow to add more resources to the slice
249 after some resources are added. Every sliver have to be deleted and the batch
250 has to be added at once.
253 self._slice_resources_batch.append(resource_hrn)
254 resources_hrn_new = list()
255 if self._count == len(self._total):
256 check_all_inslice = self._check_all_inslice(self._slice_resources_batch, slicename)
257 if check_all_inslice == True:
259 for resource_hrn in self._slice_resources_batch:
260 resource_parts = resource_hrn.split('.')
261 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
262 resources_hrn_new.append(resource_hrn)
263 with self.lock_slice:
264 if check_all_inslice != 0:
265 self._sfi_exec_method('delete', slicename)
268 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
269 resources_urn = self._get_urn(resources_hrn_new)
270 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
271 with open("/tmp/rspec_input.rspec", "w") as f:
275 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
276 raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
280 self._log.debug("Allocating resources in slice %s" % slicename)
281 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
283 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
288 self._log.debug("Provisioning resources in slice %s" % slicename)
289 self._sfi_exec_method('provision', slicename)
290 self._sfi_exec_method('action', slicename=slicename, action='geni_start')
292 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
295 raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
298 self._log.debug(" Waiting for more nodes to add the batch to the slice ")
300 def _check_all_inslice(self, resources_hrn, slicename):
301 slice_res = self.get_slice_resources(slicename)['resource']
303 if len(slice_res[0]['services']) != 0:
304 # 2to3 added list() and it is useful
305 slice_res_hrn = list(self.get_resources_hrn(slice_res).values())
306 if self._compare_lists(slice_res_hrn, resources_hrn):
308 else: return len(slice_res_hrn)
311 def _compare_lists(self, list1, list2):
312 if len(list1) != len(list2):
315 if item not in list2:
319 def _get_urn(self, resources_hrn):
323 resources_urn = list()
324 for hrn in resources_hrn:
325 hrn = hrn.replace("\\", "").split('.')
328 urn = ['urn:publicid:IDN+', auth, '+node+', node]
330 resources_urn.append(urn)
333 def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
335 Remove slivers from slice. Currently sfi doesn't support removing particular
338 resource_urn = self._get_resources_urn([resource_hrn]).pop()
339 with self.lock_slice:
341 self._sfi_exec_method('delete', slicename, urn=resource_urn)
343 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
346 def remove_all_from_slice(self, slicename):
348 De-allocate and de-provision all slivers of the named slice.
349 Currently sfi doesn't support removing particular
350 slivers, so this method works only for removing every sliver. Setting the
351 resource_hrn parameter is not necessary.
353 with self.lock_slice:
355 self._sfi_exec_method('delete', slicename)
357 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
360 def _get_resources_urn(self, resources_hrn):
362 Builds list of resources' urn based on hrn.
364 resources_urn = list()
366 for resource in resources_hrn:
367 resources_urn.append(hrn_to_urn(resource, 'node'))
371 def blacklist_resource(self, resource_hrn):
373 Adding resource_hrn to blacklist, and taking
374 the resource from the reserved list.
376 with self.lock_blist:
377 self._blacklist.add(resource_hrn)
379 if resource_hrn in self._reserved:
380 self._reserved.remove(resource_hrn)
382 def blacklisted(self, resource_hrn):
384 Check if the resource is in the blacklist.
386 with self.lock_blist:
387 if resource_hrn in self._blacklist:
391 def reserve_resource(self, resource_hrn):
393 Add resource to the reserved list.
395 self._reserved.add(resource_hrn)
397 def reserved(self, resource_hrn):
399 Check that the resource in not reserved.
402 if resource_hrn in self._reserved:
405 self.reserve_resource(resource_hrn)
410 Remove hosts from the reserved and blacklist lists, and in case
411 the persist attribute is set, it saves the blacklisted hosts
412 in the blacklist file.
416 blacklist = self._blacklist
417 self._blacklist = set()
418 self._reserved = set()
419 # if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
421 # to_blacklist = list()
422 # hostnames = self.get_nodes(list(blacklist), ['hostname'])
423 # for hostname in hostnames:
424 # to_blacklist.append(hostname['hostname'])
426 # nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
427 # plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
429 # with open(plblacklist_file, 'w') as f:
430 # for host in to_blacklist:
431 # f.write("%s\n" % host)
435 class SFAAPIFactory(object):
437 API Factory to manage a map of SFAAPI instances as key-value pairs, it
438 instanciate a single instance per key. The key represents the same SFA,
442 _lock = threading.Lock()
447 def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
448 batch = False, rtype = None, timeout = None):
450 if sfi_user and sfi_sm:
451 key = cls.make_key(sfi_user, sfi_sm)
453 api = cls._apis.get(key)
456 api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
457 ec, batch, rtype, timeout)
467 def make_key(cls, *args):
468 skey = "".join(map(str, args))
469 return hashlib.md5(skey).hexdigest()