2 from httplib import HTTPConnection, HTTPException, NotConnected
5 #from time import gmtime, strftime
10 from sfa.util.config import Config
11 #from sfa.util.xrn import hrn_to_urn, get_authority, Xrn, get_leaf
13 from sfa.util.sfalogging import logger
18 """Class used to manipulate json pages given by OAR.
22 """Defines attributes to manipulate and parse the json pages.
25 #All are boolean variables
26 self.concatenate = False
27 #Indicates end of data, no more pages to be loaded.
29 self.next_page = False
31 self.next_offset = None
35 def FindNextPage(self):
37 Gets next data page from OAR when the query's results
38 are too big to be transmitted in a single page.
39 Uses the "links' item in the json returned to check if
40 an additionnal page has to be loaded.
42 :returns: next page , next offset query
44 if "links" in self.raw_json:
45 for page in self.raw_json['links']:
46 if page['rel'] == 'next':
47 self.concatenate = True
49 self.next_offset = "?" + page['href'].split("?")[1]
50 print>>sys.stderr, "\r\n \t FindNextPage NEXT LINK"
55 self.next_page = False
56 self.next_offset = None
60 #Otherwise, no next page and no concatenate, must be a single page
61 #Concatenate the single page and get out of here.
63 self.next_page = False
64 self.concatenate = True
65 self.next_offset = None
69 def ConcatenateJsonPages(saved_json_list):
75 for page in saved_json_list:
76 tmp['items'].extend(page['items'])
79 def ResetNextPage(self):
81 self.next_offset = None
82 self.concatenate = False
90 OAR_REQUEST_POST_URI_DICT = {'POST_job': {'uri': '/oarapi/jobs.json'},
92 {'uri': '/oarapi/jobs/id.json'},
95 POST_FORMAT = {'json': {'content': "application/json", 'object': json}}
97 #OARpostdatareqfields = {'resource' :"/nodes=", 'command':"sleep", \
98 #'workdir':"/home/", 'walltime':""}
100 def __init__(self, config_file='/etc/sfa/oar_config.py'):
102 self.oarserver['uri'] = None
103 self.oarserver['postformat'] = 'json'
106 execfile(config_file, self.__dict__)
108 self.config_file = config_file
109 # path to configuration data
110 self.config_path = os.path.dirname(config_file)
113 raise IOError, "Could not find or load the configuration file: %s" \
115 #logger.setLevelDebug()
116 self.oarserver['ip'] = self.OAR_IP
117 self.oarserver['port'] = self.OAR_PORT
118 self.jobstates = ['Terminated', 'Hold', 'Waiting', 'toLaunch',
119 'toError', 'toAckReservation', 'Launching',
120 'Finishing', 'Running', 'Suspended', 'Resuming',
123 self.parser = OARGETParser(self)
126 def GETRequestToOARRestAPI(self, request, strval=None,
127 next_page=None, username=None):
128 self.oarserver['uri'] = \
129 OARGETParser.OARrequests_uri_dict[request]['uri']
130 #Get job details with username
131 if 'owner' in OARGETParser.OARrequests_uri_dict[request] and username:
132 self.oarserver['uri'] += \
133 OARGETParser.OARrequests_uri_dict[request]['owner'] + username
135 data = json.dumps({})
136 logger.debug("OARrestapi \tGETRequestToOARRestAPI %s" % (request))
138 self.oarserver['uri'] = self.oarserver['uri'].\
139 replace("id",str(strval))
142 self.oarserver['uri'] += next_page
145 headers['X-REMOTE_IDENT'] = username
147 logger.debug("OARrestapi: \t GETRequestToOARRestAPI \
148 self.oarserver['uri'] %s strval %s" \
149 %(self.oarserver['uri'], strval))
151 #seems that it does not work if we don't add this
152 headers['content-length'] = '0'
154 conn = HTTPConnection(self.oarserver['ip'], \
155 self.oarserver['port'])
156 conn.request("GET", self.oarserver['uri'], data, headers)
157 resp = ( conn.getresponse()).read()
160 except HTTPException, error :
161 logger.log_exc("GET_OAR_SRVR : Problem with OAR server : %s " \
163 #raise ServerError("GET_OAR_SRVR : Could not reach OARserver")
165 js_dict = json.loads(resp)
166 #print "\r\n \t\t\t js_dict keys" , js_dict.keys(), " \r\n", js_dict
169 except ValueError, error:
170 logger.log_exc("Failed to parse Server Response: %s ERROR %s"\
172 #raise ServerError("Failed to parse Server Response:" + js)
175 def POSTRequestToOARRestAPI(self, request, datadict, username=None):
176 """ Used to post a job on OAR , along with data associated
181 #first check that all params for are OK
183 self.oarserver['uri'] = self.OAR_REQUEST_POST_URI_DICT[request]['uri']
186 logger.log_exc("OARrestapi \tPOSTRequestToOARRestAPI request not \
189 if datadict and 'strval' in datadict:
190 self.oarserver['uri'] = self.oarserver['uri'].replace("id", \
191 str(datadict['strval']))
192 del datadict['strval']
194 data = json.dumps(datadict)
195 headers = {'X-REMOTE_IDENT':username, \
196 'content-type': self.POST_FORMAT['json']['content'], \
197 'content-length':str(len(data))}
200 conn = HTTPConnection(self.oarserver['ip'], \
201 self.oarserver['port'])
202 conn.request("POST", self.oarserver['uri'], data, headers)
203 resp = (conn.getresponse()).read()
206 logger.log_exc("POSTRequestToOARRestAPI NotConnected ERROR: \
207 data %s \r\n \t\n \t\t headers %s uri %s" \
208 %(data,headers,self.oarserver['uri']))
210 #raise ServerError("POST_OAR_SRVR : error")
213 answer = json.loads(resp)
214 logger.debug("POSTRequestToOARRestAPI : answer %s" %(answer))
217 except ValueError, error:
218 logger.log_exc("Failed to parse Server Response: error %s \
220 #raise ServerError("Failed to parse Server Response:" + answer)
224 def AddOarNodeId(tuplelist, value):
225 """ Adds Oar internal node id to the nodes attributes """
227 tuplelist.append(('oar_id', int(value)))
230 def AddNodeNetworkAddr(dictnode, value):
231 #Inserts new key. The value associated is a tuple list
234 dictnode[node_id] = [('node_id', node_id),('hostname', node_id) ]
238 def AddNodeSite(tuplelist, value):
239 tuplelist.append(('site', str(value)))
241 def AddNodeRadio(tuplelist, value):
242 tuplelist.append(('radio', str(value)))
245 def AddMobility(tuplelist, value):
247 tuplelist.append(('mobile', 'False'))
249 tuplelist.append(('mobile', 'True'))
251 def AddPosX(tuplelist, value):
252 tuplelist.append(('posx', value))
254 def AddPosY(tuplelist, value):
255 tuplelist.append(('posy', value))
257 def AddPosZ(tuplelist, value):
258 tuplelist.append(('posz', value))
260 def AddBootState(tuplelist, value):
261 tuplelist.append(('boot_state', str(value)))
263 #Insert a new node into the dictnode dictionary
264 def AddNodeId(dictnode, value):
265 #Inserts new key. The value associated is a tuple list
268 dictnode[node_id] = [('node_id', node_id)]
271 def AddHardwareType(tuplelist, value):
272 value_list = value.split(':')
273 tuplelist.append(('archi', value_list[0]))
274 tuplelist.append(('radio', value_list[1]))
278 resources_fulljson_dict = {
279 'network_address' : AddNodeNetworkAddr,
281 'radio': AddNodeRadio,
282 'mobile': AddMobility,
286 'archi':AddHardwareType,
287 'state':AddBootState,
292 def __init__(self, srv) :
293 self.version_json_dict = {
294 'api_version' : None , 'apilib_version' :None,\
295 'api_timezone': None, 'api_timestamp': None, 'oar_version': None ,}
296 self.config = Config()
297 self.interface_hrn = self.config.SFA_INTERFACE_HRN
298 self.timezone_json_dict = {
299 'timezone': None, 'api_timestamp': None, }
300 #self.jobs_json_dict = {
301 #'total' : None, 'links' : [],\
302 #'offset':None , 'items' : [], }
303 #self.jobs_table_json_dict = self.jobs_json_dict
304 #self.jobs_details_json_dict = self.jobs_json_dict
306 self.node_dictlist = {}
308 self.json_page = JsonPage()
311 self.SendRequest("GET_version")
317 def ParseVersion(self) :
318 #print self.json_page.raw_json
319 #print >>sys.stderr, self.json_page.raw_json
320 if 'oar_version' in self.json_page.raw_json :
321 self.version_json_dict.update(api_version = \
322 self.json_page.raw_json['api_version'],
323 apilib_version = self.json_page.raw_json['apilib_version'],
324 api_timezone = self.json_page.raw_json['api_timezone'],
325 api_timestamp = self.json_page.raw_json['api_timestamp'],
326 oar_version = self.json_page.raw_json['oar_version'] )
328 self.version_json_dict.update(api_version = \
329 self.json_page.raw_json['api'] ,
330 apilib_version = self.json_page.raw_json['apilib'],
331 api_timezone = self.json_page.raw_json['api_timezone'],
332 api_timestamp = self.json_page.raw_json['api_timestamp'],
333 oar_version = self.json_page.raw_json['oar'] )
335 print self.version_json_dict['apilib_version']
338 def ParseTimezone(self) :
339 api_timestamp = self.json_page.raw_json['api_timestamp']
340 api_tz = self.json_page.raw_json['timezone']
341 return api_timestamp, api_tz
343 def ParseJobs(self) :
346 return self.json_page.raw_json
348 def ParseJobsTable(self) :
349 print "ParseJobsTable"
351 def ParseJobsDetails (self):
352 # currently, this function is not used a lot,
353 #so i have no idea what be usefull to parse,
354 #returning the full json. NT
355 #logger.debug("ParseJobsDetails %s " %(self.json_page.raw_json))
356 return self.json_page.raw_json
359 def ParseJobsIds(self):
361 job_resources = ['wanted_resources', 'name', 'id', 'start_time', \
362 'state','owner','walltime','message']
365 job_resources_full = ['launching_directory', 'links', \
366 'resubmit_job_id', 'owner', 'events', 'message', \
367 'scheduled_start', 'id', 'array_id', 'exit_code', \
368 'properties', 'state','array_index', 'walltime', \
369 'type', 'initial_request', 'stop_time', 'project',\
370 'start_time', 'dependencies','api_timestamp','submission_time', \
371 'reservation', 'stdout_file', 'types', 'cpuset_name', \
372 'name', 'wanted_resources','queue','stderr_file','command']
375 job_info = self.json_page.raw_json
376 #logger.debug("OARESTAPI ParseJobsIds %s" %(self.json_page.raw_json))
379 for k in job_resources:
380 values.append(job_info[k])
381 return dict(zip(job_resources, values))
384 logger.log_exc("ParseJobsIds KeyError ")
387 def ParseJobsIdResources(self):
388 """ Parses the json produced by the request
389 /oarapi/jobs/id/resources.json.
390 Returns a list of oar node ids that are scheduled for the
395 for resource in self.json_page.raw_json['items']:
396 job_resources.append(resource['id'])
398 #logger.debug("OARESTAPI \tParseJobsIdResources %s" %(self.json_page.raw_json))
401 def ParseResources(self) :
402 """ Parses the json produced by a get_resources request on oar."""
404 #logger.debug("OARESTAPI \tParseResources " )
405 #resources are listed inside the 'items' list from the json
406 self.json_page.raw_json = self.json_page.raw_json['items']
409 def ParseReservedNodes(self):
410 """ Returns an array containing the list of the reserved nodes """
412 #resources are listed inside the 'items' list from the json
413 reservation_list = []
415 #Parse resources info
416 for json_element in self.json_page.raw_json['items']:
417 #In case it is a real reservation (not asap case)
418 if json_element['scheduled_start']:
419 job['t_from'] = json_element['scheduled_start']
420 job['t_until'] = int(json_element['scheduled_start']) + \
421 int(json_element['walltime'])
422 #Get resources id list for the job
423 job['resource_ids'] = \
424 [ node_dict['id'] for node_dict in json_element['resources']]
426 job['t_from'] = "As soon as possible"
427 job['t_until'] = "As soon as possible"
428 job['resource_ids'] = ["Undefined"]
431 job['state'] = json_element['state']
432 job['lease_id'] = json_element['id']
435 job['user'] = json_element['owner']
436 #logger.debug("OARRestapi \tParseReservedNodes job %s" %(job))
437 reservation_list.append(job)
440 return reservation_list
442 def ParseRunningJobs(self):
443 """ Gets the list of nodes currently in use from the attributes of the
447 logger.debug("OARESTAPI \tParseRunningJobs__________________________ ")
448 #resources are listed inside the 'items' list from the json
450 for job in self.json_page.raw_json['items']:
451 for node in job['nodes']:
452 nodes.append(node['network_address'])
457 def ParseDeleteJobs(self):
458 """ No need to parse anything in this function.A POST
459 is done to delete the job.
464 def ParseResourcesFull(self) :
465 """ This method is responsible for parsing all the attributes
466 of all the nodes returned by OAR when issuing a get resources full.
467 The information from the nodes and the sites are separated.
468 Updates the node_dictlist so that the dictionnary of the platform's
469 nodes is available afterwards.
472 logger.debug("OARRESTAPI ParseResourcesFull________________________ ")
473 #print self.json_page.raw_json[1]
474 #resources are listed inside the 'items' list from the json
475 if self.version_json_dict['apilib_version'] != "0.2.10" :
476 self.json_page.raw_json = self.json_page.raw_json['items']
479 return self.node_dictlist
481 def ParseResourcesFullSites(self) :
482 """ UNUSED. Originally used to get information from the sites.
483 ParseResourcesFull is used instead.
486 if self.version_json_dict['apilib_version'] != "0.2.10" :
487 self.json_page.raw_json = self.json_page.raw_json['items']
490 return self.site_dict
494 def ParseNodes(self):
495 """ Parse nodes properties from OAR
496 Put them into a dictionary with key = node id and value is a dictionary
497 of the node properties and properties'values.
501 keys = self.resources_fulljson_dict.keys()
504 for dictline in self.json_page.raw_json:
506 # dictionary is empty and/or a new node has to be inserted
507 node_id = self.resources_fulljson_dict['network_address'](\
508 self.node_dictlist, dictline['network_address'])
511 if k == 'network_address':
514 self.resources_fulljson_dict[k](\
515 self.node_dictlist[node_id], dictline[k])
517 #The last property has been inserted in the property tuple list,
519 #Turn the property tuple list (=dict value) into a dictionary
520 self.node_dictlist[node_id] = dict(self.node_dictlist[node_id])
524 def iotlab_hostname_to_hrn( root_auth, hostname):
525 return root_auth + '.'+ hostname
529 def ParseSites(self):
530 """ Returns a list of dictionnaries containing the sites' attributes."""
534 #logger.debug(" OARrestapi.py \tParseSites self.node_dictlist %s"\
535 #%(self.node_dictlist))
536 # Create a list of nodes per site_id
537 for node_id in self.node_dictlist:
538 node = self.node_dictlist[node_id]
540 if node['site'] not in nodes_per_site:
541 nodes_per_site[node['site']] = []
542 nodes_per_site[node['site']].append(node['node_id'])
544 if node['node_id'] not in nodes_per_site[node['site']]:
545 nodes_per_site[node['site']].append(node['node_id'])
547 #Create a site dictionary whose key is site_login_base (name of the site)
548 # and value is a dictionary of properties, including the list
550 for node_id in self.node_dictlist:
551 node = self.node_dictlist[node_id]
552 #node.update({'hrn':self.iotlab_hostname_to_hrn(self.interface_hrn, \
553 #node['site'],node['hostname'])})
554 node.update({'hrn':self.iotlab_hostname_to_hrn(self.interface_hrn, node['hostname'])})
555 self.node_dictlist.update({node_id:node})
557 if node['site'] not in self.site_dict:
558 self.site_dict[node['site']] = {
560 'node_ids':nodes_per_site[node['site']],
561 'latitude':"48.83726",
562 'longitude':"- 2.10336",'name':config.SFA_REGISTRY_ROOT_AUTH,
563 'pcu_ids':[], 'max_slices':None, 'ext_consortium_id':None,
564 'max_slivers':None, 'is_public':True, 'peer_site_id': None,
565 'abbreviated_name':"iotlab", 'address_ids': [],
566 'url':"http,//www.senslab.info", 'person_ids':[],
567 'site_tag_ids':[], 'enabled': True, 'slice_ids':[],
568 'date_created': None, 'peer_id': None }
569 #if node['site_login_base'] not in self.site_dict.keys():
570 #self.site_dict[node['site_login_base']] = {'login_base':node['site_login_base'],
571 #'node_ids':nodes_per_site[node['site_login_base']],
572 #'latitude':"48.83726",
573 #'longitude':"- 2.10336",'name':"senslab",
574 #'pcu_ids':[], 'max_slices':None, 'ext_consortium_id':None,
575 #'max_slivers':None, 'is_public':True, 'peer_site_id': None,
576 #'abbreviated_name':"senslab", 'address_ids': [],
577 #'url':"http,//www.senslab.info", 'person_ids':[],
578 #'site_tag_ids':[], 'enabled': True, 'slice_ids':[],
579 #'date_created': None, 'peer_id': None }
584 OARrequests_uri_dict = {
586 {'uri':'/oarapi/version.json', 'parse_func': ParseVersion},
588 {'uri':'/oarapi/timezone.json' ,'parse_func': ParseTimezone },
590 {'uri':'/oarapi/jobs.json','parse_func': ParseJobs},
592 {'uri':'/oarapi/jobs/id.json','parse_func': ParseJobsIds},
593 'GET_jobs_id_resources':
594 {'uri':'/oarapi/jobs/id/resources.json',\
595 'parse_func': ParseJobsIdResources},
597 {'uri':'/oarapi/jobs/table.json','parse_func': ParseJobsTable},
599 {'uri':'/oarapi/jobs/details.json',\
600 'parse_func': ParseJobsDetails},
601 'GET_reserved_nodes':
603 '/oarapi/jobs/details.json?state=Running,Waiting,Launching',\
605 'parse_func':ParseReservedNodes},
609 {'uri':'/oarapi/jobs/details.json?state=Running',\
610 'parse_func':ParseRunningJobs},
611 'GET_resources_full':
612 {'uri':'/oarapi/resources/full.json',\
613 'parse_func': ParseResourcesFull},
615 {'uri':'/oarapi/resources/full.json',\
616 'parse_func': ParseResourcesFullSites},
618 {'uri':'/oarapi/resources.json' ,'parse_func': ParseResources},
620 {'uri':'/oarapi/jobs/id.json' ,'parse_func': ParseDeleteJobs}
626 def SendRequest(self, request, strval = None , username = None):
627 """ Connects to OAR , sends the valid GET requests and uses
628 the appropriate json parsing functions.
633 self.json_page.ResetNextPage()
636 if request in self.OARrequests_uri_dict :
637 while self.json_page.next_page:
638 self.json_page.raw_json = self.server.GETRequestToOARRestAPI(\
641 self.json_page.next_offset, \
643 self.json_page.FindNextPage()
644 if self.json_page.concatenate:
645 save_json.append(self.json_page.raw_json)
647 if self.json_page.concatenate and self.json_page.end :
648 self.json_page.raw_json = \
649 self.json_page.ConcatenateJsonPages(save_json)
651 return self.OARrequests_uri_dict[request]['parse_func'](self)
653 logger.error("OARRESTAPI OARGetParse __init__ : ERROR_REQUEST " \