2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
25 from nepi.util.logger import Logger
28 from sfa.client.sfi import Sfi
29 from sfa.util.xrn import hrn_to_urn
31 log = Logger("SFA API")
32 log.debug("Packages sfa-common or sfa-client not installed.\
33 Could not import sfa.client.sfi or sfa.util.xrn")
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
39 API for quering the SFA service. It uses Sfi class from the tool sfi client.
41 def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42 batch, rtype, timeout):
44 self._blacklist = set()
45 self._reserved = set()
46 self._resources_cache = None
47 self._already_cached = False
52 self._testbed_res = rtype
54 self._total = self._get_total_res()
55 self._slice_resources_batch = list()
57 self._log = Logger("SFA API")
59 self.rspec_proc = SfaRSpecProcessing()
60 self.lock_slice = threading.Lock()
61 self.lock_blist = threading.Lock()
62 self.lock_resv = threading.Lock()
64 self.api.options.timeout = timeout
65 self.api.options.raw = None
66 self.api.options.user = sfi_user
67 self.api.options.auth = sfi_auth
68 self.api.options.registry = sfi_registry
69 self.api.options.sm = sfi_sm
70 self.api.options.user_private_key = private_key
72 # Load blacklist from file
73 if ec.get_global('PlanetlabNode', 'persist_blacklist'):
76 def _set_blacklist(self):
78 Initialize the blacklist with previous nodes blacklisted, in
81 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
82 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
83 with open(plblacklist_file, 'r') as f:
84 hosts_tobl = f.read().splitlines()
86 for host in hosts_tobl:
87 self._blacklist.add(host)
89 def _get_total_res(self):
91 Get the total amount of resources instanciated using this API,
92 to be able to add them using the same Allocate and Provision
93 call at once. Specially for Wilabt testbed that doesn't allow
94 to add slivers after the slice already has some.
97 res_gids = self._ec.resources
99 rm = self._ec.get_resource(gid)
100 if self._testbed_res.lower() in rm._rtype.lower():
104 def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
106 Execute sfi method, which correspond to SFA call. It can be the following
107 calls: Describe, Delete, Allocate, Provision, ListResources.
109 if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
111 raise TypeError("The slice hrn is expected for this method %s" % command)
112 if command == 'allocate' and not rspec:
113 raise TypeError("RSpec is expected for this method %s" % command)
115 if command == 'allocate':
116 args_list = [slicename, rspec]
118 args_list = [slicename]
119 if command != 'delete':
120 args_list = args_list + ['-o', '/tmp/rspec_output']
121 if command == 'action':
122 args_list = [slicename, action]
124 elif command == 'resources':
125 args_list = ['-o', '/tmp/rspec_output']
127 else: raise TypeError("Sfi method not supported")
129 self.api.command = command
130 self.api.command_parser = self.api.create_parser_command(self.api.command)
131 (command_options, command_args) = self.api.command_parser.parse_args(args_list)
132 self.api.command_options = command_options
133 self.api.read_config()
137 os.remove("/tmp/rspec_output.rspec")
139 self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
142 self.api.dispatch(command, command_options, command_args)
143 with open("/tmp/rspec_output.rspec", "r") as result_file:
144 result = result_file.read()
147 self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
150 def get_resources_info(self):
152 Get all resources and its attributes from aggregate.
155 rspec_slice = self._sfi_exec_method('resources')
157 raise RuntimeError("Fail to list resources")
159 self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
160 self._already_cached = True
161 return self._resources_cache
163 def get_resources_hrn(self, resources=None):
165 Get list of resources hrn, without the resource info.
168 if not self._already_cached:
169 resources = self.get_resources_info()['resource']
171 resources = self._resources_cache['resource']
173 component_tohrn = dict()
174 for resource in resources:
175 hrn = resource['hrn'].replace('\\', '')
176 component_tohrn[resource['component_name']] = hrn
178 return component_tohrn
180 def get_slice_resources(self, slicename):
182 Get resources and info from slice.
185 with self.lock_slice:
186 rspec_slice = self._sfi_exec_method('describe', slicename)
188 self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
190 if rspec_slice is not None:
191 result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
194 return {'resource':[],'lease':[]}
197 def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
199 Get the list of resources' urn, build the rspec string and call the allocate
200 and provision method.
202 resources_hrn_new = list()
203 resource_parts = resource_hrn.split('.')
204 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
205 resources_hrn_new.append(resource_hrn)
207 with self.lock_slice:
208 rspec_slice = self._sfi_exec_method('describe', slicename)
209 if rspec_slice is not None:
210 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
211 else: slice_resources = []
213 slice_resources_hrn = self.get_resources_hrn(slice_resources)
214 for s_hrn_key, s_hrn_value in slice_resources_hrn.items():
215 s_parts = s_hrn_value.split('.')
216 s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
217 resources_hrn_new.append(s_hrn)
220 resources_urn = self._get_resources_urn(resources_hrn_new)
221 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, None, leases)
222 with open("/tmp/rspec_input.rspec", "w") as f:
226 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
227 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
231 self._log.debug("Allocating resources in slice %s" % slicename)
232 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
234 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
239 self._log.debug("Provisioning resources in slice %s" % slicename)
240 self._sfi_exec_method('provision', slicename)
242 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
245 def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
247 Method to add all resources together to the slice. Previous deletion of slivers.
248 Specially used for wilabt that doesn't allow to add more resources to the slice
249 after some resources are added. Every sliver have to be deleted and the batch
250 has to be added at once.
253 self._slice_resources_batch.append(resource_hrn)
254 resources_hrn_new = list()
255 if self._count == len(self._total):
256 check_all_inslice = self._check_all_inslice(self._slice_resources_batch, slicename)
257 if check_all_inslice == True:
259 for resource_hrn in self._slice_resources_batch:
260 resource_parts = resource_hrn.split('.')
261 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
262 resources_hrn_new.append(resource_hrn)
263 with self.lock_slice:
264 if check_all_inslice != 0:
265 self._sfi_exec_method('delete', slicename)
268 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
269 resources_urn = self._get_urn(resources_hrn_new)
270 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
271 with open("/tmp/rspec_input.rspec", "w") as f:
275 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
276 raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
280 self._log.debug("Allocating resources in slice %s" % slicename)
281 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
283 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
288 self._log.debug("Provisioning resources in slice %s" % slicename)
289 self._sfi_exec_method('provision', slicename)
290 self._sfi_exec_method('action', slicename=slicename, action='geni_start')
292 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
295 raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
298 self._log.debug(" Waiting for more nodes to add the batch to the slice ")
300 def _check_all_inslice(self, resources_hrn, slicename):
301 slice_res = self.get_slice_resources(slicename)['resource']
303 if len(slice_res[0]['services']) != 0:
304 slice_res_hrn = self.get_resources_hrn(slice_res).values()
305 if self._compare_lists(slice_res_hrn, resources_hrn):
307 else: return len(slice_res_hrn)
310 def _compare_lists(self, list1, list2):
311 if len(list1) != len(list2):
314 if item not in list2:
318 def _get_urn(self, resources_hrn):
322 resources_urn = list()
323 for hrn in resources_hrn:
324 hrn = hrn.replace("\\", "").split('.')
327 urn = ['urn:publicid:IDN+', auth, '+node+', node]
329 resources_urn.append(urn)
332 def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
334 Remove slivers from slice. Currently sfi doesn't support removing particular
337 resource_urn = self._get_resources_urn([resource_hrn]).pop()
338 with self.lock_slice:
340 self._sfi_exec_method('delete', slicename, urn=resource_urn)
342 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
345 def remove_all_from_slice(self, slicename):
347 De-allocate and de-provision all slivers of the named slice.
348 Currently sfi doesn't support removing particular
349 slivers, so this method works only for removing every sliver. Setting the
350 resource_hrn parameter is not necessary.
352 with self.lock_slice:
354 self._sfi_exec_method('delete', slicename)
356 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
359 def _get_resources_urn(self, resources_hrn):
361 Builds list of resources' urn based on hrn.
363 resources_urn = list()
365 for resource in resources_hrn:
366 resources_urn.append(hrn_to_urn(resource, 'node'))
370 def blacklist_resource(self, resource_hrn):
372 Adding resource_hrn to blacklist, and taking
373 the resource from the reserved list.
375 with self.lock_blist:
376 self._blacklist.add(resource_hrn)
378 if resource_hrn in self._reserved:
379 self._reserved.remove(resource_hrn)
381 def blacklisted(self, resource_hrn):
383 Check if the resource is in the blacklist.
385 with self.lock_blist:
386 if resource_hrn in self._blacklist:
390 def reserve_resource(self, resource_hrn):
392 Add resource to the reserved list.
394 self._reserved.add(resource_hrn)
396 def reserved(self, resource_hrn):
398 Check that the resource in not reserved.
401 if resource_hrn in self._reserved:
404 self.reserve_resource(resource_hrn)
409 Remove hosts from the reserved and blacklist lists, and in case
410 the persist attribute is set, it saves the blacklisted hosts
411 in the blacklist file.
415 blacklist = self._blacklist
416 self._blacklist = set()
417 self._reserved = set()
418 # if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
420 # to_blacklist = list()
421 # hostnames = self.get_nodes(list(blacklist), ['hostname'])
422 # for hostname in hostnames:
423 # to_blacklist.append(hostname['hostname'])
425 # nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
426 # plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
428 # with open(plblacklist_file, 'w') as f:
429 # for host in to_blacklist:
430 # f.write("%s\n" % host)
434 class SFAAPIFactory(object):
436 API Factory to manage a map of SFAAPI instances as key-value pairs, it
437 instanciate a single instance per key. The key represents the same SFA,
441 _lock = threading.Lock()
446 def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
447 batch = False, rtype = None, timeout = None):
449 if sfi_user and sfi_sm:
450 key = cls.make_key(sfi_user, sfi_sm)
452 api = cls._apis.get(key)
455 api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
456 ec, batch, rtype, timeout)
466 def make_key(cls, *args):
467 skey = "".join(map(str, args))
468 return hashlib.md5(skey).hexdigest()