2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
26 from nepi.util.logger import Logger
29 from sfa.client.sfi import Sfi
30 from sfa.util.xrn import hrn_to_urn
32 log = Logger("SFA API")
33 log.debug("Packages sfa-common or sfa-client not installed.\
34 Could not import sfa.client.sfi or sfa.util.xrn")
36 from nepi.util.sfarspec_proc import SfaRSpecProcessing
40 API for quering the SFA service. It uses Sfi class from the tool sfi client.
42 def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
43 batch, rtype, timeout):
45 self._blacklist = set()
46 self._reserved = set()
47 self._resources_cache = None
48 self._already_cached = False
53 self._testbed_res = rtype
55 self._total = self._get_total_res()
56 self._slice_resources_batch = list()
58 self._log = Logger("SFA API")
60 self.rspec_proc = SfaRSpecProcessing()
61 self.lock_slice = threading.Lock()
62 self.lock_blist = threading.Lock()
63 self.lock_resv = threading.Lock()
65 self.api.options.timeout = timeout
66 self.api.options.raw = None
67 self.api.options.user = sfi_user
68 self.api.options.auth = sfi_auth
69 self.api.options.registry = sfi_registry
70 self.api.options.sm = sfi_sm
71 self.api.options.user_private_key = private_key
73 # Load blacklist from file
74 if ec.get_global('PlanetlabNode', 'persist_blacklist'):
77 def _set_blacklist(self):
79 Initialize the blacklist with previous nodes blacklisted, in
82 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
83 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
84 with open(plblacklist_file, 'r') as f:
85 hosts_tobl = f.read().splitlines()
87 for host in hosts_tobl:
88 self._blacklist.add(host)
90 def _get_total_res(self):
92 Get the total amount of resources instanciated using this API,
93 to be able to add them using the same Allocate and Provision
94 call at once. Specially for Wilabt testbed that doesn't allow
95 to add slivers after the slice already has some.
98 res_gids = self._ec.resources
100 rm = self._ec.get_resource(gid)
101 if self._testbed_res.lower() in rm._rtype.lower():
105 def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
107 Execute sfi method, which correspond to SFA call. It can be the following
108 calls: Describe, Delete, Allocate, Provision, ListResources.
110 if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
112 raise TypeError("The slice hrn is expected for this method %s" % command)
113 if command == 'allocate' and not rspec:
114 raise TypeError("RSpec is expected for this method %s" % command)
116 if command == 'allocate':
117 args_list = [slicename, rspec]
119 args_list = [slicename]
120 if command != 'delete':
121 args_list = args_list + ['-o', '/tmp/rspec_output']
122 if command == 'action':
123 args_list = [slicename, action]
125 elif command == 'resources':
126 args_list = ['-o', '/tmp/rspec_output']
128 else: raise TypeError("Sfi method not supported")
130 self.api.command = command
131 self.api.command_parser = self.api.create_parser_command(self.api.command)
132 (command_options, command_args) = self.api.command_parser.parse_args(args_list)
133 self.api.command_options = command_options
134 self.api.read_config()
138 os.remove("/tmp/rspec_output.rspec")
140 self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
143 self.api.dispatch(command, command_options, command_args)
144 with open("/tmp/rspec_output.rspec", "r") as result_file:
145 result = result_file.read()
148 self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
151 def get_resources_info(self):
153 Get all resources and its attributes from aggregate.
156 rspec_slice = self._sfi_exec_method('resources')
158 raise RuntimeError("Fail to list resources")
160 self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
161 self._already_cached = True
162 return self._resources_cache
164 def get_resources_hrn(self, resources=None):
166 Get list of resources hrn, without the resource info.
169 if not self._already_cached:
170 resources = self.get_resources_info()['resource']
172 resources = self._resources_cache['resource']
174 component_tohrn = dict()
175 for resource in resources:
176 hrn = resource['hrn'].replace('\\', '')
177 component_tohrn[resource['component_name']] = hrn
179 return component_tohrn
181 def get_slice_resources(self, slicename):
183 Get resources and info from slice.
186 with self.lock_slice:
187 rspec_slice = self._sfi_exec_method('describe', slicename)
189 self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
191 if rspec_slice is not None:
192 result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
195 return {'resource':[],'lease':[]}
198 def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
200 Get the list of resources' urn, build the rspec string and call the allocate
201 and provision method.
203 resources_hrn_new = list()
204 resource_parts = resource_hrn.split('.')
205 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
206 resources_hrn_new.append(resource_hrn)
208 with self.lock_slice:
209 rspec_slice = self._sfi_exec_method('describe', slicename)
210 if rspec_slice is not None:
211 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
212 else: slice_resources = []
214 slice_resources_hrn = self.get_resources_hrn(slice_resources)
215 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
216 s_parts = s_hrn_value.split('.')
217 s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
218 resources_hrn_new.append(s_hrn)
221 resources_urn = self._get_resources_urn(resources_hrn_new)
222 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, None, leases)
223 f = open("/tmp/rspec_input.rspec", "w")
228 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
229 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
233 self._log.debug("Allocating resources in slice %s" % slicename)
234 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
236 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
241 self._log.debug("Provisioning resources in slice %s" % slicename)
242 self._sfi_exec_method('provision', slicename)
244 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
247 def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
249 Method to add all resources together to the slice. Previous deletion of slivers.
250 Specially used for wilabt that doesn't allow to add more resources to the slice
251 after some resources are added. Every sliver have to be deleted and the batch
252 has to be added at once.
255 self._slice_resources_batch.append(resource_hrn)
256 resources_hrn_new = list()
257 if self._count == len(self._total):
258 check_all_inslice = self._check_all_inslice(self._slice_resources_batch, slicename)
259 if check_all_inslice == True:
261 for resource_hrn in self._slice_resources_batch:
262 resource_parts = resource_hrn.split('.')
263 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
264 resources_hrn_new.append(resource_hrn)
265 with self.lock_slice:
266 if check_all_inslice != 0:
267 self._sfi_exec_method('delete', slicename)
270 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
271 resources_urn = self._get_urn(resources_hrn_new)
272 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
273 f = open("/tmp/rspec_input.rspec", "w")
278 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
279 raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
283 self._log.debug("Allocating resources in slice %s" % slicename)
284 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
286 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
291 self._log.debug("Provisioning resources in slice %s" % slicename)
292 self._sfi_exec_method('provision', slicename)
293 self._sfi_exec_method('action', slicename=slicename, action='geni_start')
295 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
298 raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
301 self._log.debug(" Waiting for more nodes to add the batch to the slice ")
303 def _check_all_inslice(self, resources_hrn, slicename):
304 slice_res = self.get_slice_resources(slicename)['resource']
306 if len(slice_res[0]['services']) != 0:
307 slice_res_hrn = self.get_resources_hrn(slice_res).values()
308 if self._compare_lists(slice_res_hrn, resources_hrn):
310 else: return len(slice_res_hrn)
313 def _compare_lists(self, list1, list2):
314 if len(list1) != len(list2):
317 if item not in list2:
321 def _get_urn(self, resources_hrn):
325 resources_urn = list()
326 for hrn in resources_hrn:
327 hrn = hrn.replace("\\", "").split('.')
330 urn = ['urn:publicid:IDN+', auth, '+node+', node]
332 resources_urn.append(urn)
335 def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
337 Remove slivers from slice. Currently sfi doesn't support removing particular
340 resource_urn = self._get_resources_urn([resource_hrn]).pop()
341 with self.lock_slice:
343 self._sfi_exec_method('delete', slicename, urn=resource_urn)
345 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
348 def remove_all_from_slice(self, slicename):
350 De-allocate and de-provision all slivers of the named slice.
351 Currently sfi doesn't support removing particular
352 slivers, so this method works only for removing every sliver. Setting the
353 resource_hrn parameter is not necessary.
355 with self.lock_slice:
357 self._sfi_exec_method('delete', slicename)
359 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
362 def _get_resources_urn(self, resources_hrn):
364 Builds list of resources' urn based on hrn.
366 resources_urn = list()
368 for resource in resources_hrn:
369 resources_urn.append(hrn_to_urn(resource, 'node'))
373 def blacklist_resource(self, resource_hrn):
375 Adding resource_hrn to blacklist, and taking
376 the resource from the reserved list.
378 with self.lock_blist:
379 self._blacklist.add(resource_hrn)
381 if resource_hrn in self._reserved:
382 self._reserved.remove(resource_hrn)
384 def blacklisted(self, resource_hrn):
386 Check if the resource is in the blacklist.
388 with self.lock_blist:
389 if resource_hrn in self._blacklist:
393 def reserve_resource(self, resource_hrn):
395 Add resource to the reserved list.
397 self._reserved.add(resource_hrn)
399 def reserved(self, resource_hrn):
401 Check that the resource in not reserved.
404 if resource_hrn in self._reserved:
407 self.reserve_resource(resource_hrn)
412 Remove hosts from the reserved and blacklist lists, and in case
413 the persist attribute is set, it saves the blacklisted hosts
414 in the blacklist file.
418 blacklist = self._blacklist
419 self._blacklist = set()
420 self._reserved = set()
421 # if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
423 # to_blacklist = list()
424 # hostnames = self.get_nodes(list(blacklist), ['hostname'])
425 # for hostname in hostnames:
426 # to_blacklist.append(hostname['hostname'])
428 # nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
429 # plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
431 # with open(plblacklist_file, 'w') as f:
432 # for host in to_blacklist:
433 # f.write("%s\n" % host)
437 class SFAAPIFactory(object):
439 API Factory to manage a map of SFAAPI instances as key-value pairs, it
440 instanciate a single instance per key. The key represents the same SFA,
444 _lock = threading.Lock()
449 def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
450 batch = False, rtype = None, timeout = None):
452 if sfi_user and sfi_sm:
453 key = cls.make_key(sfi_user, sfi_sm)
455 api = cls._apis.get(key)
458 api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
459 ec, batch, rtype, timeout)
469 def make_key(cls, *args):
470 skey = "".join(map(str, args))
471 return hashlib.md5(skey).hexdigest()