ba785a5c86b9358bde3487733a0cfa7ed2d7651c
[sfa.git] / sfa / iotlab / OARrestapi.py
1 #import sys
2 from httplib import HTTPConnection, HTTPException, NotConnected
3 import json
4 #import datetime
5 #from time import gmtime, strftime
6 import os.path
7 import sys
8 #import urllib
9 #import urllib2
10 from sfa.util.config import Config
11 #from sfa.util.xrn import hrn_to_urn, get_authority, Xrn, get_leaf
12
13 from sfa.util.sfalogging import logger
14
15
16 class JsonPage:
17
18     """Class used to manipulate json pages given by OAR.
19     """
20
21     def __init__(self):
22         """Defines attributes to manipulate and parse the json pages.
23
24         """
25         #All are boolean variables
26         self.concatenate = False
27         #Indicates end of data, no more pages to be loaded.
28         self.end = False
29         self.next_page = False
30         #Next query address
31         self.next_offset = None
32         #Json page
33         self.raw_json = None
34
35     def FindNextPage(self):
36         """
37         Gets next data page from OAR when the query's results
38             are too big to  be transmitted in a single page.
39             Uses the "links' item in the json returned to check if
40             an additionnal page has to be loaded.
41
42         :returns: next page , next offset query
43         """
44         if "links" in self.raw_json:
45             for page in self.raw_json['links']:
46                 if page['rel'] == 'next':
47                     self.concatenate = True
48                     self.next_page = True
49                     self.next_offset = "?" + page['href'].split("?")[1]
50                     print>>sys.stderr, "\r\n \t FindNextPage NEXT LINK"
51                     return
52
53         if self.concatenate:
54             self.end = True
55             self.next_page = False
56             self.next_offset = None
57
58             return
59
60         #Otherwise, no next page and no concatenate, must be a single page
61         #Concatenate the single page and get out of here.
62         else:
63             self.next_page = False
64             self.concatenate = True
65             self.next_offset = None
66             return
67
68     @staticmethod
69     def ConcatenateJsonPages(saved_json_list):
70         #reset items list
71
72         tmp = {}
73         tmp['items'] = []
74
75         for page in saved_json_list:
76             tmp['items'].extend(page['items'])
77         return tmp
78
79     def ResetNextPage(self):
80         self.next_page = True
81         self.next_offset = None
82         self.concatenate = False
83         self.end = False
84
85
86 class OARrestapi:
87
88     # classes attributes
89
90     OAR_REQUEST_POST_URI_DICT = {'POST_job': {'uri': '/oarapi/jobs.json'},
91                                  'DELETE_jobs_id': \
92                                  {'uri': '/oarapi/jobs/id.json'},
93                                  }
94
95     POST_FORMAT = {'json': {'content': "application/json", 'object': json}}
96
97     #OARpostdatareqfields = {'resource' :"/nodes=", 'command':"sleep", \
98                             #'workdir':"/home/", 'walltime':""}
99
100     def __init__(self, config_file='/etc/sfa/oar_config.py'):
101         self.oarserver = {}
102         self.oarserver['uri'] = None
103         self.oarserver['postformat'] = 'json'
104
105         try:
106             execfile(config_file, self.__dict__)
107
108             self.config_file = config_file
109             # path to configuration data
110             self.config_path = os.path.dirname(config_file)
111
112         except IOError:
113             raise IOError, "Could not find or load the configuration file: %s" \
114                             % config_file
115         #logger.setLevelDebug()
116         self.oarserver['ip'] = self.OAR_IP
117         self.oarserver['port'] = self.OAR_PORT
118         self.jobstates = ['Terminated', 'Hold', 'Waiting', 'toLaunch',
119                           'toError', 'toAckReservation', 'Launching',
120                           'Finishing', 'Running', 'Suspended', 'Resuming',
121                           'Error']
122
123         self.parser = OARGETParser(self)
124
125
126     def GETRequestToOARRestAPI(self, request, strval=None,
127                             next_page=None, username=None):
128         self.oarserver['uri'] = \
129             OARGETParser.OARrequests_uri_dict[request]['uri']
130         #Get job details with username
131         if 'owner' in OARGETParser.OARrequests_uri_dict[request] and username:
132             self.oarserver['uri'] += \
133                 OARGETParser.OARrequests_uri_dict[request]['owner'] + username
134         headers = {}
135         data = json.dumps({})
136         logger.debug("OARrestapi \tGETRequestToOARRestAPI %s" % (request))
137         if strval:
138             self.oarserver['uri'] = self.oarserver['uri'].\
139                                             replace("id",str(strval))
140
141         if next_page:
142             self.oarserver['uri'] += next_page
143
144         if username:
145             headers['X-REMOTE_IDENT'] = username
146
147         logger.debug("OARrestapi: \t  GETRequestToOARRestAPI  \
148                         self.oarserver['uri'] %s strval %s" \
149                         %(self.oarserver['uri'], strval))
150         try :
151             #seems that it does not work if we don't add this
152             headers['content-length'] = '0'
153
154             conn = HTTPConnection(self.oarserver['ip'], \
155                                                 self.oarserver['port'])
156             conn.request("GET", self.oarserver['uri'], data, headers)
157             resp = ( conn.getresponse()).read()
158             conn.close()
159
160         except HTTPException, error :
161             logger.log_exc("GET_OAR_SRVR : Problem with OAR server : %s " \
162                                                                     %(error))
163             #raise ServerError("GET_OAR_SRVR : Could not reach OARserver")
164         try:
165             js_dict = json.loads(resp)
166             #print "\r\n \t\t\t js_dict keys" , js_dict.keys(), " \r\n", js_dict
167             return js_dict
168
169         except ValueError, error:
170             logger.log_exc("Failed to parse Server Response: %s ERROR %s"\
171                                                             %(js_dict, error))
172             #raise ServerError("Failed to parse Server Response:" + js)
173
174
175     def POSTRequestToOARRestAPI(self, request, datadict, username=None):
176         """ Used to post a job on OAR , along with data associated
177         with the job.
178
179         """
180
181         #first check that all params for are OK
182         try:
183             self.oarserver['uri'] = self.OAR_REQUEST_POST_URI_DICT[request]['uri']
184
185         except KeyError:
186             logger.log_exc("OARrestapi \tPOSTRequestToOARRestAPI request not \
187                              valid")
188             return
189         if datadict and 'strval' in datadict:
190             self.oarserver['uri'] = self.oarserver['uri'].replace("id", \
191                                                 str(datadict['strval']))
192             del datadict['strval']
193
194         data = json.dumps(datadict)
195         headers = {'X-REMOTE_IDENT':username, \
196                 'content-type': self.POST_FORMAT['json']['content'], \
197                 'content-length':str(len(data))}
198         try :
199
200             conn = HTTPConnection(self.oarserver['ip'], \
201                                         self.oarserver['port'])
202             conn.request("POST", self.oarserver['uri'], data, headers)
203             resp = (conn.getresponse()).read()
204             conn.close()
205         except NotConnected:
206             logger.log_exc("POSTRequestToOARRestAPI NotConnected ERROR: \
207                             data %s \r\n \t\n \t\t headers %s uri %s" \
208                             %(data,headers,self.oarserver['uri']))
209
210             #raise ServerError("POST_OAR_SRVR : error")
211
212         try:
213             answer = json.loads(resp)
214             logger.debug("POSTRequestToOARRestAPI : answer %s" %(answer))
215             return answer
216
217         except ValueError, error:
218             logger.log_exc("Failed to parse Server Response: error %s  \
219                             %s" %(error))
220             #raise ServerError("Failed to parse Server Response:" + answer)
221
222
223
224 def AddOarNodeId(tuplelist, value):
225     """ Adds Oar internal node id to the nodes attributes """
226
227     tuplelist.append(('oar_id', int(value)))
228
229
230 def AddNodeNetworkAddr(dictnode, value):
231     #Inserts new key. The value associated is a tuple list
232     node_id = value
233
234     dictnode[node_id] = [('node_id', node_id),('hostname', node_id) ]
235
236     return node_id
237
238 def AddNodeSite(tuplelist, value):
239     tuplelist.append(('site', str(value)))
240
241 def AddNodeRadio(tuplelist, value):
242     tuplelist.append(('radio', str(value)))
243
244
245 def AddMobility(tuplelist, value):
246     if value is 0:
247         tuplelist.append(('mobile', 'False'))
248     else:
249         tuplelist.append(('mobile', 'True'))
250
251 def AddPosX(tuplelist, value):
252     tuplelist.append(('posx', value))
253
254 def AddPosY(tuplelist, value):
255     tuplelist.append(('posy', value))
256
257 def AddPosZ(tuplelist, value):
258     tuplelist.append(('posz', value))
259
260 def AddBootState(tuplelist, value):
261     tuplelist.append(('boot_state', str(value)))
262
263 #Insert a new node into the dictnode dictionary
264 def AddNodeId(dictnode, value):
265     #Inserts new key. The value associated is a tuple list
266     node_id = int(value)
267
268     dictnode[node_id] = [('node_id', node_id)]
269     return node_id
270
271 def AddHardwareType(tuplelist, value):
272     value_list = value.split(':')
273     tuplelist.append(('archi', value_list[0]))
274     tuplelist.append(('radio', value_list[1]))
275
276
277 class OARGETParser:
278     resources_fulljson_dict = {
279         'network_address' : AddNodeNetworkAddr,
280         'site': AddNodeSite,
281         'radio': AddNodeRadio,
282         'mobile': AddMobility,
283         'x': AddPosX,
284         'y': AddPosY,
285         'z':AddPosZ,
286         'archi':AddHardwareType,
287         'state':AddBootState,
288         'id' : AddOarNodeId,
289         }
290
291
292     def __init__(self, srv) :
293         self.version_json_dict = {
294             'api_version' : None , 'apilib_version' :None,\
295             'api_timezone': None, 'api_timestamp': None, 'oar_version': None ,}
296         self.config = Config()
297         self.interface_hrn = self.config.SFA_INTERFACE_HRN
298         self.timezone_json_dict = {
299             'timezone': None, 'api_timestamp': None, }
300         #self.jobs_json_dict = {
301             #'total' : None, 'links' : [],\
302             #'offset':None , 'items' : [], }
303         #self.jobs_table_json_dict = self.jobs_json_dict
304         #self.jobs_details_json_dict = self.jobs_json_dict
305         self.server = srv
306         self.node_dictlist = {}
307
308         self.json_page = JsonPage()
309
310         self.site_dict = {}
311         self.SendRequest("GET_version")
312
313
314
315
316
317     def ParseVersion(self) :
318         #print self.json_page.raw_json
319         #print >>sys.stderr, self.json_page.raw_json
320         if 'oar_version' in self.json_page.raw_json :
321             self.version_json_dict.update(api_version = \
322                                         self.json_page.raw_json['api_version'],
323                     apilib_version = self.json_page.raw_json['apilib_version'],
324                     api_timezone = self.json_page.raw_json['api_timezone'],
325                     api_timestamp = self.json_page.raw_json['api_timestamp'],
326                     oar_version = self.json_page.raw_json['oar_version'] )
327         else:
328             self.version_json_dict.update(api_version = \
329                         self.json_page.raw_json['api'] ,
330                         apilib_version = self.json_page.raw_json['apilib'],
331                         api_timezone = self.json_page.raw_json['api_timezone'],
332                         api_timestamp = self.json_page.raw_json['api_timestamp'],
333                         oar_version = self.json_page.raw_json['oar'] )
334
335         print self.version_json_dict['apilib_version']
336
337
338     def ParseTimezone(self) :
339         api_timestamp = self.json_page.raw_json['api_timestamp']
340         api_tz = self.json_page.raw_json['timezone']
341         return api_timestamp, api_tz
342
343     def ParseJobs(self) :
344         self.jobs_list = []
345         print " ParseJobs "
346         return self.json_page.raw_json
347
348     def ParseJobsTable(self) :
349         print "ParseJobsTable"
350
351     def ParseJobsDetails (self):
352         # currently, this function is not used a lot,
353         #so i have no idea what be usefull to parse,
354         #returning the full json. NT
355         #logger.debug("ParseJobsDetails %s " %(self.json_page.raw_json))
356         return self.json_page.raw_json
357
358
359     def ParseJobsIds(self):
360
361         job_resources = ['wanted_resources', 'name', 'id', 'start_time', \
362                         'state','owner','walltime','message']
363
364
365         job_resources_full = ['launching_directory', 'links', \
366             'resubmit_job_id', 'owner', 'events', 'message', \
367             'scheduled_start', 'id', 'array_id',  'exit_code', \
368             'properties', 'state','array_index', 'walltime', \
369             'type', 'initial_request', 'stop_time', 'project',\
370             'start_time',  'dependencies','api_timestamp','submission_time', \
371             'reservation', 'stdout_file', 'types', 'cpuset_name', \
372             'name',  'wanted_resources','queue','stderr_file','command']
373
374
375         job_info = self.json_page.raw_json
376         #logger.debug("OARESTAPI ParseJobsIds %s" %(self.json_page.raw_json))
377         values = []
378         try:
379             for k in job_resources:
380                 values.append(job_info[k])
381             return dict(zip(job_resources, values))
382
383         except KeyError:
384             logger.log_exc("ParseJobsIds KeyError ")
385
386
387     def ParseJobsIdResources(self):
388         """ Parses the json produced by the request
389         /oarapi/jobs/id/resources.json.
390         Returns a list of oar node ids that are scheduled for the
391         given job id.
392
393         """
394         job_resources = []
395         for resource in self.json_page.raw_json['items']:
396             job_resources.append(resource['id'])
397
398         #logger.debug("OARESTAPI \tParseJobsIdResources %s" %(self.json_page.raw_json))
399         return job_resources
400
401     def ParseResources(self) :
402         """ Parses the json produced by a get_resources request on oar."""
403
404         #logger.debug("OARESTAPI \tParseResources " )
405         #resources are listed inside the 'items' list from the json
406         self.json_page.raw_json = self.json_page.raw_json['items']
407         self.ParseNodes()
408
409     def ParseReservedNodes(self):
410         """  Returns an array containing the list of the reserved nodes """
411
412         #resources are listed inside the 'items' list from the json
413         reservation_list = []
414         job = {}
415         #Parse resources info
416         for json_element in  self.json_page.raw_json['items']:
417             #In case it is a real reservation (not asap case)
418             if json_element['scheduled_start']:
419                 job['t_from'] = json_element['scheduled_start']
420                 job['t_until'] = int(json_element['scheduled_start']) + \
421                                                 int(json_element['walltime'])
422                 #Get resources id list for the job
423                 job['resource_ids'] = \
424                     [ node_dict['id'] for node_dict in json_element['resources']]
425             else:
426                 job['t_from'] = "As soon as possible"
427                 job['t_until'] = "As soon as possible"
428                 job['resource_ids'] = ["Undefined"]
429
430
431             job['state'] = json_element['state']
432             job['lease_id'] = json_element['id']
433
434
435             job['user'] = json_element['owner']
436             #logger.debug("OARRestapi \tParseReservedNodes job %s" %(job))
437             reservation_list.append(job)
438             #reset dict
439             job = {}
440         return reservation_list
441
442     def ParseRunningJobs(self):
443         """ Gets the list of nodes currently in use from the attributes of the
444         running jobs.
445
446         """
447         logger.debug("OARESTAPI \tParseRunningJobs__________________________ ")
448         #resources are listed inside the 'items' list from the json
449         nodes = []
450         for job in  self.json_page.raw_json['items']:
451             for node in job['nodes']:
452                 nodes.append(node['network_address'])
453         return nodes
454
455
456
457     def ParseDeleteJobs(self):
458         """ No need to parse anything in this function.A POST
459         is done to delete the job.
460
461         """
462         return
463
464     def ParseResourcesFull(self) :
465         """ This method is responsible for parsing all the attributes
466         of all the nodes returned by OAR when issuing a get resources full.
467         The information from the nodes and the sites are separated.
468         Updates the node_dictlist so that the dictionnary of the platform's
469         nodes is available afterwards.
470
471         """
472         logger.debug("OARRESTAPI ParseResourcesFull________________________ ")
473         #print self.json_page.raw_json[1]
474         #resources are listed inside the 'items' list from the json
475         if self.version_json_dict['apilib_version'] != "0.2.10" :
476             self.json_page.raw_json = self.json_page.raw_json['items']
477         self.ParseNodes()
478         self.ParseSites()
479         return self.node_dictlist
480
481     def ParseResourcesFullSites(self) :
482         """ UNUSED. Originally used to get information from the sites.
483         ParseResourcesFull is used instead.
484
485         """
486         if self.version_json_dict['apilib_version'] != "0.2.10" :
487             self.json_page.raw_json = self.json_page.raw_json['items']
488         self.ParseNodes()
489         self.ParseSites()
490         return self.site_dict
491
492
493
494     def ParseNodes(self):
495         """ Parse nodes properties from OAR
496         Put them into a dictionary with key = node id and value is a dictionary
497         of the node properties and properties'values.
498
499         """
500         node_id = None
501         keys = self.resources_fulljson_dict.keys()
502         keys.sort()
503
504         for dictline in self.json_page.raw_json:
505             node_id = None
506             # dictionary is empty and/or a new node has to be inserted
507             node_id = self.resources_fulljson_dict['network_address'](\
508                                 self.node_dictlist, dictline['network_address'])
509             for k in keys:
510                 if k in dictline:
511                     if k == 'network_address':
512                         continue
513
514                     self.resources_fulljson_dict[k](\
515                                     self.node_dictlist[node_id], dictline[k])
516
517             #The last property has been inserted in the property tuple list,
518             #reset node_id
519             #Turn the property tuple list (=dict value) into a dictionary
520             self.node_dictlist[node_id] = dict(self.node_dictlist[node_id])
521             node_id = None
522
523     @staticmethod
524     def iotlab_hostname_to_hrn( root_auth,  hostname):
525         return root_auth + '.'+ hostname
526
527
528
529     def ParseSites(self):
530         """ Returns a list of dictionnaries containing the sites' attributes."""
531
532         nodes_per_site = {}
533         config = Config()
534         #logger.debug(" OARrestapi.py \tParseSites  self.node_dictlist %s"\
535                                                         #%(self.node_dictlist))
536         # Create a list of nodes per site_id
537         for node_id in self.node_dictlist:
538             node  = self.node_dictlist[node_id]
539
540             if node['site'] not in nodes_per_site:
541                 nodes_per_site[node['site']] = []
542                 nodes_per_site[node['site']].append(node['node_id'])
543             else:
544                 if node['node_id'] not in nodes_per_site[node['site']]:
545                     nodes_per_site[node['site']].append(node['node_id'])
546
547         #Create a site dictionary whose key is site_login_base (name of the site)
548         # and value is a dictionary of properties, including the list
549         #of the node_ids
550         for node_id in self.node_dictlist:
551             node  = self.node_dictlist[node_id]
552             #node.update({'hrn':self.iotlab_hostname_to_hrn(self.interface_hrn, \
553                                             #node['site'],node['hostname'])})
554             node.update({'hrn':self.iotlab_hostname_to_hrn(self.interface_hrn, node['hostname'])})
555             self.node_dictlist.update({node_id:node})
556
557             if node['site'] not in self.site_dict:
558                 self.site_dict[node['site']] = {
559                     'site':node['site'],
560                     'node_ids':nodes_per_site[node['site']],
561                     'latitude':"48.83726",
562                     'longitude':"- 2.10336",'name':config.SFA_REGISTRY_ROOT_AUTH,
563                     'pcu_ids':[], 'max_slices':None, 'ext_consortium_id':None,
564                     'max_slivers':None, 'is_public':True, 'peer_site_id': None,
565                     'abbreviated_name':"iotlab", 'address_ids': [],
566                     'url':"http,//www.senslab.info", 'person_ids':[],
567                     'site_tag_ids':[], 'enabled': True,  'slice_ids':[],
568                     'date_created': None, 'peer_id': None }
569             #if node['site_login_base'] not in self.site_dict.keys():
570                 #self.site_dict[node['site_login_base']] = {'login_base':node['site_login_base'],
571                                                         #'node_ids':nodes_per_site[node['site_login_base']],
572                                                         #'latitude':"48.83726",
573                                                         #'longitude':"- 2.10336",'name':"senslab",
574                                                         #'pcu_ids':[], 'max_slices':None, 'ext_consortium_id':None,
575                                                         #'max_slivers':None, 'is_public':True, 'peer_site_id': None,
576                                                         #'abbreviated_name':"senslab", 'address_ids': [],
577                                                         #'url':"http,//www.senslab.info", 'person_ids':[],
578                                                         #'site_tag_ids':[], 'enabled': True,  'slice_ids':[],
579                                                         #'date_created': None, 'peer_id': None }
580
581
582
583
584     OARrequests_uri_dict = {
585         'GET_version':
586                 {'uri':'/oarapi/version.json', 'parse_func': ParseVersion},
587         'GET_timezone':
588                 {'uri':'/oarapi/timezone.json' ,'parse_func': ParseTimezone },
589         'GET_jobs':
590                 {'uri':'/oarapi/jobs.json','parse_func': ParseJobs},
591         'GET_jobs_id':
592                 {'uri':'/oarapi/jobs/id.json','parse_func': ParseJobsIds},
593         'GET_jobs_id_resources':
594                 {'uri':'/oarapi/jobs/id/resources.json',\
595                 'parse_func': ParseJobsIdResources},
596         'GET_jobs_table':
597                 {'uri':'/oarapi/jobs/table.json','parse_func': ParseJobsTable},
598         'GET_jobs_details':
599                 {'uri':'/oarapi/jobs/details.json',\
600                 'parse_func': ParseJobsDetails},
601         'GET_reserved_nodes':
602                 {'uri':
603                 '/oarapi/jobs/details.json?state=Running,Waiting,Launching',\
604                 'owner':'&user=',
605                 'parse_func':ParseReservedNodes},
606
607
608         'GET_running_jobs':
609                 {'uri':'/oarapi/jobs/details.json?state=Running',\
610                 'parse_func':ParseRunningJobs},
611         'GET_resources_full':
612                 {'uri':'/oarapi/resources/full.json',\
613                 'parse_func': ParseResourcesFull},
614         'GET_sites':
615                 {'uri':'/oarapi/resources/full.json',\
616                 'parse_func': ParseResourcesFullSites},
617         'GET_resources':
618                 {'uri':'/oarapi/resources.json' ,'parse_func': ParseResources},
619         'DELETE_jobs_id':
620                 {'uri':'/oarapi/jobs/id.json' ,'parse_func': ParseDeleteJobs}
621         }
622
623
624
625
626     def SendRequest(self, request, strval = None , username = None):
627         """ Connects to OAR , sends the valid GET requests and uses
628         the appropriate json parsing functions.
629
630         """
631         save_json = None
632
633         self.json_page.ResetNextPage()
634         save_json = []
635
636         if request in self.OARrequests_uri_dict :
637             while self.json_page.next_page:
638                 self.json_page.raw_json = self.server.GETRequestToOARRestAPI(\
639                                                 request, \
640                                                 strval, \
641                                                 self.json_page.next_offset, \
642                                                 username)
643                 self.json_page.FindNextPage()
644                 if self.json_page.concatenate:
645                     save_json.append(self.json_page.raw_json)
646
647             if self.json_page.concatenate and self.json_page.end :
648                 self.json_page.raw_json = \
649                     self.json_page.ConcatenateJsonPages(save_json)
650
651             return self.OARrequests_uri_dict[request]['parse_func'](self)
652         else:
653             logger.error("OARRESTAPI OARGetParse __init__ : ERROR_REQUEST " \
654                                                                  %(request))
655