Added LICENSE
[nepi.git] / src / nepi / util / sfa_api.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 import logging
21 import hashlib
22
23 from parser import sfa_sfav1
24 import subprocess
25 import warnings
26
27 import threading
28
29 class SFAApi(object):
30
31     def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
32             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
33     
34         self._resources = dict()
35         self._reservable_resources = list()
36         self._leases = dict()
37         self._slice_tags = dict()
38         self._slice_resources = set()
39         self._slice_leases = set()
40         self._aggregate = aggregate
41         self._slice_hrn = slice_id
42         # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
43         # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
44         self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
45         self._lock = threading.Lock()
46
47         # Paremeters to contact the XMLRPC SFA service
48         self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
49                 '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
50                 '-k': private_key}
51
52         #self._logger = logging.getLogger('nepi.utils.sfiapi')
53         self._fetch_resources_info()
54         self._fetch_slice_info()
55
56     def _sfi_command_options(self):
57         command_options = " ".join("%s %s" % (k,v) for (k,v) in \
58                 self._sfi_parameters.iteritems() if v is not None)
59         return command_options
60
61     def _sfi_command_exec(self, command):
62         args = command.split(" ")
63         s = subprocess.Popen(args, stdout = subprocess.PIPE,
64                 stdin = subprocess.PIPE)
65         xml, err = s.communicate()
66         if err:
67            raise RuntimeError("Command excecution problem, error: %s", err)
68         return xml
69
70     def _fetch_resources_info(self, resources = True):
71         command_options = self._sfi_command_options()
72         command = "sfi.py " + command_options + " resources -l all"
73         try:
74             xml = self._sfi_command_exec(command)
75         except:
76             #self._logger.error("Error in SFA responds: %s", xml)
77             raise
78         if resources:
79             self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
80         else:
81             self._leases = self._parser.resources_from_xml(xml)
82         #self._update_reservable()
83         return xml
84     
85     def _fetch_slice_info(self):
86         command_options = self._sfi_command_options()
87         command = "sfi.py " + command_options + " resources -l all"
88         command = command + " " + self._slice_hrn
89         try:
90             xml = self._sfi_command_exec(command)
91         except:
92             #self._logger.error("Error in SFA responds: %s", xml)
93             raise
94         self._slice_resources, self._slice_leases, self._slice_tags = \
95             self._parser.resources_from_xml(xml, sliver = True, resources = True)
96         return xml
97
98     def _update_reservable(self):
99         for rid, r in self._resources.iteritems():
100             if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
101                  or (r['resource_type'] == 'channel'):
102                 self._reservable_resources.append(rid)
103
104
105     def discover_resources(self, resourceId=None, fields=[], **kwargs):
106         result = dict()
107         resources = self._resources
108
109         if resourceId is not None:
110             resource_ids = resourceId
111             if not isinstance(resource_ids, list):
112                 resource_ids = [resource_ids]
113             resources = self._filter_by_resourceId(resources, resource_ids)
114         else:
115             for filter, value in kwargs.items():
116                 resources = self._filter_by_filter(resources, filter, value)
117         if not fields:
118             return resources
119         else:
120             for k, info in resources.iteritems():
121                 info = self._extract_fields(info, fields)
122                 result[k] = info
123             return result
124                 
125     def _filter_by_resourceId(self, resources, resource_ids):
126         return dict((k, resources[k]) for k in resource_ids if k in resources)
127
128     def _filter_by_filter(self, resources, filter, value):
129         d = dict()
130         for k in resources.keys():
131             if filter in resources[k]:
132                 if resources[k][filter] == value:
133                     d[k] = resources[k]
134         return d
135                
136     def _extract_fields(self, info, fields):
137         return dict((k, info[k]) for k in fields if k in info)
138
139     def discover_fields(self):
140         resources = self._resources
141         fields = []
142         for k, data in resources.iteritems():
143             for field in data:
144                 if field not in fields:
145                     fields.append(field)
146         return fields
147
148     def discover_leases(self, resourceId=None):
149         leases = self._leases
150
151         if resourceId is not None:
152             resource_ids = resourceId
153             if not isinstance(resourceId, list):
154                 resource_ids = [resource_ids]
155             leases = self._filterbyresourceId(leases, resource_ids)
156         return leases
157
158     def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
159         result = dict()
160         if rtype not in ['node', 'channel']:
161             raise RuntimeError("Unknown type")
162
163         finish_time = start_time + duration * slot
164
165         leases_resources = dict()
166         reservable_resources = dict()
167         for lid, lease in leases.iteritems():
168             if lease[0]['type'] == rtype:
169                 leases_resources.update({lid: lease})
170         #print leases_resources
171         for rid, resource in resources.iteritems():
172             if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
173                 reservable_resources.update({rid: resource})
174             elif rtype == 'channel':
175                 reservable_resources.update({rid: resource})
176             #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
177             # (in case adding exclusive tag to channels)
178
179         free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
180     
181         if len(free_resources) >= quantity:
182             free_resources = free_resources[:quantity]
183             for rid, resource in resources.iteritems():
184                 if rid in free_resources:
185                     result[rid] = resource
186             return result
187         else:
188             maybe_free = []
189             new_quan = quantity - len(free_resources)
190             print new_quan
191
192             for lid, lease in leases_resources.iteritems():
193                 for l in lease:
194                     st = int(l['start_time'])
195                     ft = st + int(l['duration']) * slot
196                     if (st <= finish_time <= ft) or (st <= start_time <= ft):
197                         if lid in maybe_free:
198                             maybe_free.remove(lid)
199                         break
200                     else:
201                         if lid not in maybe_free:
202                             maybe_free.append(lid)
203                 if len(maybe_free) >= new_quan:
204                     free_resources = [free_resources, maybe_free]
205                     free_resources = sum(free_resources, [])
206                     for rid, resource in resources.iteritems():
207                         if rid in free_resources:
208                             result[rid] = resource
209                         return result
210                     #return free_resources
211             warnings.warn("There aren't enough nodes")
212
213                                  
214     def provision_resource(self, new_resource, start_time = None, duration = None):
215         import os, tempfile
216         with self._lock:
217             xml = self._fetch_slice_info()
218             new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
219             new_resource, start_time, duration, self._aggregate)
220             fh, fname = tempfile.mkstemp()
221             print fname
222             os.write(fh, new_xml)
223             os.close(fh)
224             try:
225                 command_options = self._sfi_command_options()
226                 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
227                 out = self._sfi_command_exec(command)
228             except:
229                 raise
230         xml = self._fetch_slice_info()
231         return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
232                 duration, self._aggregate)
233
234     def release_resource(self, resource, start_time = None, duration = None):
235         import os, tempfile
236         with self._lock:
237             xml = self._fetch_slice_info()
238             new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
239             start_time, duration, self._aggregate)
240             fh, fname = tempfile.mkstemp()
241             print fname
242             os.write(fh, new_xml)
243             os.close(fh)
244             try:
245                 command_options = self._sfi_command_options()
246                 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
247                 out = self._sfi_command_exec(command)
248             except:
249                 raise
250         xml = self._fetch_slice_info()
251         return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
252             duration, self._aggregate)
253
254
255 class SFAApiFactory(object):
256     lock = threading.Lock()
257     _apis = dict()
258
259     @classmethod
260     def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
261             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
262
263         key = cls.make_key(aggregate = 'ple', slice_id, sfi_auth, sfi_user, sfi_registry, sfi_sm,
264             timeout, private_key)
265         api = cls._apis.get(key)
266         cls.lock.acquire()
267         api._fetch_resources_info(resources = False)
268         api._fetch_slice_info()
269         cls.lock.release()
270
271         if not api:
272             api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
273             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
274             cls._apis[key] = api
275
276         return api
277
278     @classmethod
279     def make_key(cls, *args):
280         skey = "".join(map(str, args))
281         return hashlib.md5(skey).hexdigest()
282