2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
25 from sfa_sfav1 import SFAResourcesParser
28 # TODO: Use nepi utils Logger instead of warnings!
35 def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
36 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
38 self._resources = dict()
39 self._reservable_resources = list()
41 self._slice_tags = dict()
42 self._slice_resources = set()
43 self._slice_leases = set()
44 self._aggregate = aggregate
45 self._slice_hrn = slice_id
46 # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
47 # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
48 self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
49 self._lock = threading.Lock()
51 # Paremeters to contact the XMLRPC SFA service
52 self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
53 '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
56 #self._logger = logging.getLogger('nepi.utils.sfiapi')
57 self._fetch_resources_info()
58 self._fetch_slice_info()
60 def _sfi_command_options(self):
61 command_options = " ".join("%s %s" % (k,v) for (k,v) in \
62 self._sfi_parameters.iteritems() if v is not None)
63 return command_options
65 def _sfi_command_exec(self, command):
66 args = command.split(" ")
67 s = subprocess.Popen(args, stdout = subprocess.PIPE,
68 stdin = subprocess.PIPE)
69 xml, err = s.communicate()
71 raise RuntimeError("Command excecution problem, error: %s", err)
74 def _fetch_resources_info(self, resources = True):
75 command_options = self._sfi_command_options()
76 command = "sfi.py " + command_options + " resources -l all"
78 xml = self._sfi_command_exec(command)
80 #self._logger.error("Error in SFA responds: %s", xml)
83 self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
85 self._leases = self._parser.resources_from_xml(xml)
86 #self._update_reservable()
89 def _fetch_slice_info(self):
90 command_options = self._sfi_command_options()
91 command = "sfi.py " + command_options + " resources -l all"
92 command = command + " " + self._slice_hrn
94 xml = self._sfi_command_exec(command)
96 #self._logger.error("Error in SFA responds: %s", xml)
98 self._slice_resources, self._slice_leases, self._slice_tags = \
99 self._parser.resources_from_xml(xml, sliver = True, resources = True)
102 def _update_reservable(self):
103 for rid, r in self._resources.iteritems():
104 if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
105 or (r['resource_type'] == 'channel'):
106 self._reservable_resources.append(rid)
109 def discover_resources(self, resourceId=None, fields=[], **kwargs):
111 resources = self._resources
113 if resourceId is not None:
114 resource_ids = resourceId
115 if not isinstance(resource_ids, list):
116 resource_ids = [resource_ids]
117 resources = self._filter_by_resourceId(resources, resource_ids)
119 for filter, value in kwargs.items():
120 resources = self._filter_by_filter(resources, filter, value)
124 for k, info in resources.iteritems():
125 info = self._extract_fields(info, fields)
129 def _filter_by_resourceId(self, resources, resource_ids):
130 return dict((k, resources[k]) for k in resource_ids if k in resources)
132 def _filter_by_filter(self, resources, filter, value):
134 for k in resources.keys():
135 if filter in resources[k]:
136 if resources[k][filter] == value:
140 def _extract_fields(self, info, fields):
141 return dict((k, info[k]) for k in fields if k in info)
143 def discover_fields(self):
144 resources = self._resources
146 for k, data in resources.iteritems():
148 if field not in fields:
152 def discover_leases(self, resourceId=None):
153 leases = self._leases
155 if resourceId is not None:
156 resource_ids = resourceId
157 if not isinstance(resourceId, list):
158 resource_ids = [resource_ids]
159 leases = self._filterbyresourceId(leases, resource_ids)
162 def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
164 if rtype not in ['node', 'channel']:
165 raise RuntimeError("Unknown type")
167 finish_time = start_time + duration * slot
169 leases_resources = dict()
170 reservable_resources = dict()
171 for lid, lease in leases.iteritems():
172 if lease[0]['type'] == rtype:
173 leases_resources.update({lid: lease})
174 #print leases_resources
175 for rid, resource in resources.iteritems():
176 if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
177 reservable_resources.update({rid: resource})
178 elif rtype == 'channel':
179 reservable_resources.update({rid: resource})
180 #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
181 # (in case adding exclusive tag to channels)
183 free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
185 if len(free_resources) >= quantity:
186 free_resources = free_resources[:quantity]
187 for rid, resource in resources.iteritems():
188 if rid in free_resources:
189 result[rid] = resource
193 new_quan = quantity - len(free_resources)
196 for lid, lease in leases_resources.iteritems():
198 st = int(l['start_time'])
199 ft = st + int(l['duration']) * slot
200 if (st <= finish_time <= ft) or (st <= start_time <= ft):
201 if lid in maybe_free:
202 maybe_free.remove(lid)
205 if lid not in maybe_free:
206 maybe_free.append(lid)
207 if len(maybe_free) >= new_quan:
208 free_resources = [free_resources, maybe_free]
209 free_resources = sum(free_resources, [])
210 for rid, resource in resources.iteritems():
211 if rid in free_resources:
212 result[rid] = resource
214 #return free_resources
215 warnings.warn("There aren't enough nodes")
218 def provision_resource(self, new_resource, start_time = None, duration = None):
221 xml = self._fetch_slice_info()
222 new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
223 new_resource, start_time, duration, self._aggregate)
224 fh, fname = tempfile.mkstemp()
226 os.write(fh, new_xml)
229 command_options = self._sfi_command_options()
230 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
231 out = self._sfi_command_exec(command)
234 xml = self._fetch_slice_info()
235 return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
236 duration, self._aggregate)
238 def release_resource(self, resource, start_time = None, duration = None):
241 xml = self._fetch_slice_info()
242 new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
243 start_time, duration, self._aggregate)
244 fh, fname = tempfile.mkstemp()
246 os.write(fh, new_xml)
249 command_options = self._sfi_command_options()
250 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
251 out = self._sfi_command_exec(command)
254 xml = self._fetch_slice_info()
255 return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
256 duration, self._aggregate)
259 class SFAApiFactory(object):
260 lock = threading.Lock()
264 def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
265 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
267 key = cls.make_key(slice_id, sfi_auth, sfi_user, sfi_registry,
268 sfi_sm, timeout, private_key, aggregate = 'ple')
269 api = cls._apis.get(key)
271 api._fetch_resources_info(resources = False)
272 api._fetch_slice_info()
276 api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
277 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
283 def make_key(cls, *args):
284 skey = "".join(map(str, args))
285 return hashlib.md5(skey).hexdigest()