580b528b5a85a9fa7cbea68957361a5a070b1d37
[nepi.git] / src / nepi / util / sfa_api.py
1 import logging
2 import hashlib
3
4 from parser import sfa_sfav1
5 import subprocess
6 import warnings
7
8 import threading
9
10 class SFAApi(object):
11
12     def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
13             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
14     
15         self._resources = dict()
16         self._reservable_resources = list()
17         self._leases = dict()
18         self._slice_tags = dict()
19         self._slice_resources = set()
20         self._slice_leases = set()
21         self._aggregate = aggregate
22         self._slice_hrn = slice_id
23         # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
24         # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
25         self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
26         self._lock = threading.Lock()
27
28         # Paremeters to contact the XMLRPC SFA service
29         self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
30                 '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
31                 '-k': private_key}
32
33         #self._logger = logging.getLogger('nepi.utils.sfiapi')
34         self._fetch_resources_info()
35         self._fetch_slice_info()
36
37     def _sfi_command_options(self):
38         command_options = " ".join("%s %s" % (k,v) for (k,v) in \
39                 self._sfi_parameters.iteritems() if v is not None)
40         return command_options
41
42     def _sfi_command_exec(self, command):
43         args = command.split(" ")
44         s = subprocess.Popen(args, stdout = subprocess.PIPE,
45                 stdin = subprocess.PIPE)
46         xml, err = s.communicate()
47         if err:
48            raise RuntimeError("Command excecution problem, error: %s", err)
49         return xml
50
51     def _fetch_resources_info(self, resources = True):
52         command_options = self._sfi_command_options()
53         command = "sfi.py " + command_options + " resources -l all"
54         try:
55             xml = self._sfi_command_exec(command)
56         except:
57             #self._logger.error("Error in SFA responds: %s", xml)
58             raise
59         if resources:
60             self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
61         else:
62             self._leases = self._parser.resources_from_xml(xml)
63         #self._update_reservable()
64         return xml
65     
66     def _fetch_slice_info(self):
67         command_options = self._sfi_command_options()
68         command = "sfi.py " + command_options + " resources -l all"
69         command = command + " " + self._slice_hrn
70         try:
71             xml = self._sfi_command_exec(command)
72         except:
73             #self._logger.error("Error in SFA responds: %s", xml)
74             raise
75         self._slice_resources, self._slice_leases, self._slice_tags = \
76             self._parser.resources_from_xml(xml, sliver = True, resources = True)
77         return xml
78
79     def _update_reservable(self):
80         for rid, r in self._resources.iteritems():
81             if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
82                  or (r['resource_type'] == 'channel'):
83                 self._reservable_resources.append(rid)
84
85
86     def discover_resources(self, resourceId=None, fields=[], **kwargs):
87         result = dict()
88         resources = self._resources
89
90         if resourceId is not None:
91             resource_ids = resourceId
92             if not isinstance(resource_ids, list):
93                 resource_ids = [resource_ids]
94             resources = self._filter_by_resourceId(resources, resource_ids)
95         else:
96             for filter, value in kwargs.items():
97                 resources = self._filter_by_filter(resources, filter, value)
98         if not fields:
99             return resources
100         else:
101             for k, info in resources.iteritems():
102                 info = self._extract_fields(info, fields)
103                 result[k] = info
104             return result
105                 
106     def _filter_by_resourceId(self, resources, resource_ids):
107         return dict((k, resources[k]) for k in resource_ids if k in resources)
108
109     def _filter_by_filter(self, resources, filter, value):
110         d = dict()
111         for k in resources.keys():
112             if filter in resources[k]:
113                 if resources[k][filter] == value:
114                     d[k] = resources[k]
115         return d
116                
117     def _extract_fields(self, info, fields):
118         return dict((k, info[k]) for k in fields if k in info)
119
120     def discover_fields(self):
121         resources = self._resources
122         fields = []
123         for k, data in resources.iteritems():
124             for field in data:
125                 if field not in fields:
126                     fields.append(field)
127         return fields
128
129     def discover_leases(self, resourceId=None):
130         leases = self._leases
131
132         if resourceId is not None:
133             resource_ids = resourceId
134             if not isinstance(resourceId, list):
135                 resource_ids = [resource_ids]
136             leases = self._filterbyresourceId(leases, resource_ids)
137         return leases
138
139     def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
140         result = dict()
141         if rtype not in ['node', 'channel']:
142             raise RuntimeError("Unknown type")
143
144         finish_time = start_time + duration * slot
145
146         leases_resources = dict()
147         reservable_resources = dict()
148         for lid, lease in leases.iteritems():
149             if lease[0]['type'] == rtype:
150                 leases_resources.update({lid: lease})
151         #print leases_resources
152         for rid, resource in resources.iteritems():
153             if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
154                 reservable_resources.update({rid: resource})
155             elif rtype == 'channel':
156                 reservable_resources.update({rid: resource})
157             #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
158             # (in case adding exclusive tag to channels)
159
160         free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
161     
162         if len(free_resources) >= quantity:
163             free_resources = free_resources[:quantity]
164             for rid, resource in resources.iteritems():
165                 if rid in free_resources:
166                     result[rid] = resource
167             return result
168         else:
169             maybe_free = []
170             new_quan = quantity - len(free_resources)
171             print new_quan
172
173             for lid, lease in leases_resources.iteritems():
174                 for l in lease:
175                     st = int(l['start_time'])
176                     ft = st + int(l['duration']) * slot
177                     if (st <= finish_time <= ft) or (st <= start_time <= ft):
178                         if lid in maybe_free:
179                             maybe_free.remove(lid)
180                         break
181                     else:
182                         if lid not in maybe_free:
183                             maybe_free.append(lid)
184                 if len(maybe_free) >= new_quan:
185                     free_resources = [free_resources, maybe_free]
186                     free_resources = sum(free_resources, [])
187                     for rid, resource in resources.iteritems():
188                         if rid in free_resources:
189                             result[rid] = resource
190                         return result
191                     #return free_resources
192             warnings.warn("There aren't enough nodes")
193
194                                  
195     def provision_resource(self, new_resource, start_time = None, duration = None):
196         import os, tempfile
197         with self._lock:
198             xml = self._fetch_slice_info()
199             new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
200             new_resource, start_time, duration, self._aggregate)
201             fh, fname = tempfile.mkstemp()
202             print fname
203             os.write(fh, new_xml)
204             os.close(fh)
205             try:
206                 command_options = self._sfi_command_options()
207                 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
208                 out = self._sfi_command_exec(command)
209             except:
210                 raise
211         xml = self._fetch_slice_info()
212         return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
213                 duration, self._aggregate)
214
215     def release_resource(self, resource, start_time = None, duration = None):
216         import os, tempfile
217         with self._lock:
218             xml = self._fetch_slice_info()
219             new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
220             start_time, duration, self._aggregate)
221             fh, fname = tempfile.mkstemp()
222             print fname
223             os.write(fh, new_xml)
224             os.close(fh)
225             try:
226                 command_options = self._sfi_command_options()
227                 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
228                 out = self._sfi_command_exec(command)
229             except:
230                 raise
231         xml = self._fetch_slice_info()
232         return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
233             duration, self._aggregate)
234
235
236 class SFAApiFactory(object):
237     lock = threading.Lock()
238     _apis = dict()
239
240     @classmethod
241     def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
242             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
243
244         key = cls.make_key(aggregate = 'ple', slice_id, sfi_auth, sfi_user, sfi_registry, sfi_sm,
245             timeout, private_key)
246         api = cls._apis.get(key)
247         cls.lock.acquire()
248         api._fetch_resources_info(resources = False)
249         api._fetch_slice_info()
250         cls.lock.release()
251
252         if not api:
253             api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
254             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
255             cls._apis[key] = api
256
257         return api
258
259     @classmethod
260     def make_key(cls, *args):
261         skey = "".join(map(str, args))
262         return hashlib.md5(skey).hexdigest()
263