4 from parser import sfa_sfav1
12 def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
13 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
15 self._resources = dict()
16 self._reservable_resources = list()
18 self._slice_tags = dict()
19 self._slice_resources = set()
20 self._slice_leases = set()
21 self._aggregate = aggregate
22 self._slice_hrn = slice_id
23 # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
24 # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
25 self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
26 self._lock = threading.Lock()
28 # Paremeters to contact the XMLRPC SFA service
29 self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
30 '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
33 #self._logger = logging.getLogger('nepi.utils.sfiapi')
34 self._fetch_resources_info()
35 self._fetch_slice_info()
37 def _sfi_command_options(self):
38 command_options = " ".join("%s %s" % (k,v) for (k,v) in \
39 self._sfi_parameters.iteritems() if v is not None)
40 return command_options
42 def _sfi_command_exec(self, command):
43 args = command.split(" ")
44 s = subprocess.Popen(args, stdout = subprocess.PIPE,
45 stdin = subprocess.PIPE)
46 xml, err = s.communicate()
48 raise RuntimeError("Command excecution problem, error: %s", err)
51 def _fetch_resources_info(self, resources = True):
52 command_options = self._sfi_command_options()
53 command = "sfi.py " + command_options + " resources -l all"
55 xml = self._sfi_command_exec(command)
57 #self._logger.error("Error in SFA responds: %s", xml)
60 self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
62 self._leases = self._parser.resources_from_xml(xml)
63 #self._update_reservable()
66 def _fetch_slice_info(self):
67 command_options = self._sfi_command_options()
68 command = "sfi.py " + command_options + " resources -l all"
69 command = command + " " + self._slice_hrn
71 xml = self._sfi_command_exec(command)
73 #self._logger.error("Error in SFA responds: %s", xml)
75 self._slice_resources, self._slice_leases, self._slice_tags = \
76 self._parser.resources_from_xml(xml, sliver = True, resources = True)
79 def _update_reservable(self):
80 for rid, r in self._resources.iteritems():
81 if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
82 or (r['resource_type'] == 'channel'):
83 self._reservable_resources.append(rid)
86 def discover_resources(self, resourceId=None, fields=[], **kwargs):
88 resources = self._resources
90 if resourceId is not None:
91 resource_ids = resourceId
92 if not isinstance(resource_ids, list):
93 resource_ids = [resource_ids]
94 resources = self._filter_by_resourceId(resources, resource_ids)
96 for filter, value in kwargs.items():
97 resources = self._filter_by_filter(resources, filter, value)
101 for k, info in resources.iteritems():
102 info = self._extract_fields(info, fields)
106 def _filter_by_resourceId(self, resources, resource_ids):
107 return dict((k, resources[k]) for k in resource_ids if k in resources)
109 def _filter_by_filter(self, resources, filter, value):
111 for k in resources.keys():
112 if filter in resources[k]:
113 if resources[k][filter] == value:
117 def _extract_fields(self, info, fields):
118 return dict((k, info[k]) for k in fields if k in info)
120 def discover_fields(self):
121 resources = self._resources
123 for k, data in resources.iteritems():
125 if field not in fields:
129 def discover_leases(self, resourceId=None):
130 leases = self._leases
132 if resourceId is not None:
133 resource_ids = resourceId
134 if not isinstance(resourceId, list):
135 resource_ids = [resource_ids]
136 leases = self._filterbyresourceId(leases, resource_ids)
139 def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
141 if rtype not in ['node', 'channel']:
142 raise RuntimeError("Unknown type")
144 finish_time = start_time + duration * slot
146 leases_resources = dict()
147 reservable_resources = dict()
148 for lid, lease in leases.iteritems():
149 if lease[0]['type'] == rtype:
150 leases_resources.update({lid: lease})
151 #print leases_resources
152 for rid, resource in resources.iteritems():
153 if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
154 reservable_resources.update({rid: resource})
155 elif rtype == 'channel':
156 reservable_resources.update({rid: resource})
157 #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
158 # (in case adding exclusive tag to channels)
160 free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
162 if len(free_resources) >= quantity:
163 free_resources = free_resources[:quantity]
164 for rid, resource in resources.iteritems():
165 if rid in free_resources:
166 result[rid] = resource
170 new_quan = quantity - len(free_resources)
173 for lid, lease in leases_resources.iteritems():
175 st = int(l['start_time'])
176 ft = st + int(l['duration']) * slot
177 if (st <= finish_time <= ft) or (st <= start_time <= ft):
178 if lid in maybe_free:
179 maybe_free.remove(lid)
182 if lid not in maybe_free:
183 maybe_free.append(lid)
184 if len(maybe_free) >= new_quan:
185 free_resources = [free_resources, maybe_free]
186 free_resources = sum(free_resources, [])
187 for rid, resource in resources.iteritems():
188 if rid in free_resources:
189 result[rid] = resource
191 #return free_resources
192 warnings.warn("There aren't enough nodes")
195 def provision_resource(self, new_resource, start_time = None, duration = None):
198 xml = self._fetch_slice_info()
199 new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
200 new_resource, start_time, duration, self._aggregate)
201 fh, fname = tempfile.mkstemp()
203 os.write(fh, new_xml)
206 command_options = self._sfi_command_options()
207 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
208 out = self._sfi_command_exec(command)
211 xml = self._fetch_slice_info()
212 return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
213 duration, self._aggregate)
215 def release_resource(self, resource, start_time = None, duration = None):
218 xml = self._fetch_slice_info()
219 new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
220 start_time, duration, self._aggregate)
221 fh, fname = tempfile.mkstemp()
223 os.write(fh, new_xml)
226 command_options = self._sfi_command_options()
227 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
228 out = self._sfi_command_exec(command)
231 xml = self._fetch_slice_info()
232 return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
233 duration, self._aggregate)
236 class SFAApiFactory(object):
237 lock = threading.Lock()
241 def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
242 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
244 key = cls.make_key(aggregate = 'ple', slice_id, sfi_auth, sfi_user, sfi_registry, sfi_sm,
245 timeout, private_key)
246 api = cls._apis.get(key)
248 api._fetch_resources_info(resources = False)
249 api._fetch_slice_info()
253 api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
254 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
260 def make_key(cls, *args):
261 skey = "".join(map(str, args))
262 return hashlib.md5(skey).hexdigest()