2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
25 from nepi.util.logger import Logger
28 from sfa.client.sfi import Sfi
29 from sfa.util.xrn import hrn_to_urn
31 log = Logger("SFA API")
32 log.debug("Packages sfa-common or sfa-client not installed.\
33 Could not import sfa.client.sfi or sfa.util.xrn")
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
39 API for quering the SFA service.
41 def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
44 self._blacklist = set()
45 self._reserved = set()
46 self._resources_cache = None
47 self._already_cached = False
48 self._log = Logger("SFA API")
50 self.rspec_proc = SfaRSpecProcessing()
51 self.lock_slice = threading.Lock()
52 self.lock_blist = threading.Lock()
53 self.lock_resv = threading.Lock()
55 self.api.options.timeout = timeout
56 self.api.options.raw = None
57 self.api.options.user = sfi_user
58 self.api.options.auth = sfi_auth
59 self.api.options.registry = sfi_registry
60 self.api.options.sm = sfi_sm
61 self.api.options.user_private_key = private_key
63 # Load blacklist from file
64 if ec.get_global('PlanetlabNode', 'persist_blacklist'):
67 def _set_blacklist(self):
68 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
69 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
70 with open(plblacklist_file, 'r') as f:
71 hosts_tobl = f.read().splitlines()
73 for host in hosts_tobl:
74 self._blacklist.add(host)
76 def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None):
80 if command in ['describe', 'delete', 'allocate', 'provision']:
82 raise TypeError("The slice hrn is expected for this method %s" % command)
83 if command == 'allocate' and not rspec:
84 raise TypeError("RSpec is expected for this method %s" % command)
86 if command == 'allocate':
87 args_list = [slicename, rspec]
88 elif command == 'delete':
89 args_list = [slicename, urn]
90 else: args_list = [slicename, '-o', '/tmp/rspec_output']
92 elif command == 'resources':
93 args_list = ['-o', '/tmp/rspec_output']
95 else: raise TypeError("Sfi method not supported")
97 self.api.command = command
98 self.api.command_parser = self.api.create_parser_command(self.api.command)
99 (command_options, command_args) = self.api.command_parser.parse_args(args_list)
100 #print "1 %s" % command_options.info
101 #command_options.info = ""
102 #print "2 %s" % command_options.info
103 self.api.command_options = command_options
104 self.api.read_config()
107 self.api.dispatch(command, command_options, command_args)
108 with open("/tmp/rspec_output.rspec", "r") as result_file:
109 result = result_file.read()
112 def get_resources_info(self):
114 Get all resources and its attributes from aggregate.
117 rspec_slice = self._sfi_exec_method('resources')
119 raise RuntimeError("Fail to list resources")
121 self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
122 self._already_cached = True
123 return self._resources_cache
125 def get_resources_hrn(self, resources=None):
127 Get list of resources hrn, without the resource info.
130 if not self._already_cached:
131 resources = self.get_resources_info()['resource']
133 resources = self._resources_cache['resource']
135 component_tohrn = dict()
136 for resource in resources:
137 hrn = resource['hrn'].replace('\\', '')
138 component_tohrn[resource['component_name']] = hrn
140 return component_tohrn
142 def get_slice_resources(self, slicename):
144 Get resources and info from slice.
147 with self.lock_slice:
148 rspec_slice = self._sfi_exec_method('describe', slicename)
150 raise RuntimeError("Fail to describe resource for slice %s" % slicename)
152 result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
156 def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
158 Get the list of resources' urn, build the rspec string and call the allocate
159 and provision method.
161 resources_hrn_new = list()
162 resource_parts = resource_hrn.split('.')
163 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
164 resources_hrn_new.append(resource_hrn)
166 slice_resources = self.get_slice_resources(slicename)['resource']
168 with self.lock_slice:
170 slice_resources_hrn = self.get_resources_hrn(slice_resources)
171 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
172 s_parts = s_hrn_value.split('.')
173 s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
174 resources_hrn_new.append(s_hrn)
176 resources_urn = self._get_resources_urn(resources_hrn_new)
177 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
178 f = open("/tmp/rspec_input.rspec", "w")
183 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
184 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
187 self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
189 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
191 self._sfi_exec_method('provision', slicename)
193 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
196 def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
198 Get the list of resources' urn, build the rspec string and call the allocate
199 and provision method.
201 resource_urn = self._get_resources_urn([resource_hrn]).pop()
203 self._sfi_exec_method('delete', slicename, urn=resource_urn)
205 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
209 def _get_resources_urn(self, resources_hrn):
211 Builds list of resources' urn based on hrn.
213 resources_urn = list()
215 for resource in resources_hrn:
216 resources_urn.append(hrn_to_urn(resource, 'node'))
220 def blacklist_resource(self, resource_hrn):
221 with self.lock_blist:
222 self._blacklist.add(resource_hrn)
224 if resource_hrn in self._reserved:
225 self._reserved.remove(resource_hrn)
227 def blacklisted(self, resource_hrn):
228 with self.lock_blist:
229 if resource_hrn in self._blacklist:
233 def reserve_resource(self, resource_hrn):
234 self._reserved.add(resource_hrn)
236 def reserved(self, resource_hrn):
238 if resource_hrn in self._reserved:
241 self.reserve_resource(resource_hrn)
244 class SFAAPIFactory(object):
246 API Factory to manage a map of SFAAPI instances as key-value pairs, it
247 instanciate a single instance per key. The key represents the same SFA,
251 _lock = threading.Lock()
256 def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
259 if sfi_user and sfi_sm:
260 key = cls.make_key(sfi_user, sfi_sm)
262 api = cls._apis.get(key)
265 api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
274 def make_key(cls, *args):
275 skey = "".join(map(str, args))
276 return hashlib.md5(skey).hexdigest()