From a7568cd24df5472dc6437e8f7aa16c051a6c367f Mon Sep 17 00:00:00 2001 From: Sandrine Avakian Date: Mon, 16 Dec 2013 13:56:23 +0100 Subject: [PATCH] Iotlab now using sfa database for the special table lease_table. Using alchemy.py classes to create a session to the DB. --- sfa/importer/iotlabimporter.py | 38 ++- sfa/iotlab/iotlabaggregate.py | 5 +- sfa/iotlab/iotlabdriver.py | 201 ++++++++++++++- sfa/iotlab/iotlabpostgres.py | 440 +++++++++++++++++---------------- sfa/iotlab/iotlabshell.py | 161 +----------- sfa/iotlab/iotlabslices.py | 8 +- 6 files changed, 460 insertions(+), 393 deletions(-) diff --git a/sfa/importer/iotlabimporter.py b/sfa/importer/iotlabimporter.py index b533faf4..c29457c9 100644 --- a/sfa/importer/iotlabimporter.py +++ b/sfa/importer/iotlabimporter.py @@ -7,18 +7,20 @@ from sfa.util.config import Config from sfa.util.xrn import Xrn, get_authority, hrn_to_urn from sfa.iotlab.iotlabshell import IotlabShell # from sfa.iotlab.iotlabdriver import IotlabDriver -from sfa.iotlab.iotlabpostgres import TestbedAdditionalSfaDB +# from sfa.iotlab.iotlabpostgres import TestbedAdditionalSfaDB from sfa.trust.certificate import Keypair, convert_public_key from sfa.trust.gid import create_uuid # using global alchemy.session() here is fine # as importer is on standalone one-shot process -from sfa.storage.alchemy import global_dbsession + +from sfa.storage.alchemy import global_dbsession, engine from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \ - RegUser, RegKey + RegUser, RegKey, init_tables +from sqlalchemy import Table, MetaData +from sqlalchemy.exc import SQLAlchemyError, NoSuchTableError -from sqlalchemy.exc import SQLAlchemyError class IotlabImporter: @@ -44,6 +46,7 @@ class IotlabImporter: self.auth_hierarchy = auth_hierarchy self.logger = loc_logger self.logger.setLevelDebug() + #retrieve all existing SFA objects self.all_records = global_dbsession.query(RegRecord).all() @@ -67,6 +70,27 @@ class IotlabImporter: for record in self.all_records if record.pointer != -1]) + + def exists(self, tablename, engine): + """ + Checks if the table specified as tablename exists. + :param tablename: name of the table in the db that has to be checked. + :type tablename: string + :returns: True if the table exists, False otherwise. + :rtype: bool + + """ + metadata = MetaData(bind=engine) + try: + table = Table(tablename, metadata, autoload=True) + return True + + except NoSuchTableError: + self.logger.log_exc("Iotlabimporter tablename %s does not exist" + % (tablename)) + return False + + @staticmethod def hostname_to_hrn_escaped(root_auth, hostname): """ @@ -522,11 +546,11 @@ class IotlabImporter: root_auth = config.SFA_REGISTRY_ROOT_AUTH testbed_shell = IotlabShell(config) - leases_db = TestbedAdditionalSfaDB(config) + # leases_db = TestbedAdditionalSfaDB(config) #Create special slice table for iotlab - if not leases_db.exists('lease_table'): - leases_db.createtable() + if not self.exists('lease_table', engine): + init_tables(engine) self.logger.info("IotlabImporter.run: lease_table table created ") # import site and node records in site into the SFA db. diff --git a/sfa/iotlab/iotlabaggregate.py b/sfa/iotlab/iotlabaggregate.py index ac1073e7..7ada1089 100644 --- a/sfa/iotlab/iotlabaggregate.py +++ b/sfa/iotlab/iotlabaggregate.py @@ -425,7 +425,7 @@ class IotlabAggregate: logger.debug("IOTLABAGGREGATE get_all_leases ldap_username %s " % (ldap_username)) - leases = self.driver.testbed_shell.GetLeases(login=ldap_username) + leases = self.driver.GetLeases(login=ldap_username) grain = self.driver.testbed_shell.GetLeaseGranularity() rspec_leases = [] @@ -618,8 +618,7 @@ class IotlabAggregate: if slice_hrn: slices = self.driver.GetSlices(slice_hrn, slice_filter_type) - leases = self.driver.testbed_shell.GetLeases( - {'slice_hrn':slice_hrn}) + leases = self.driver.GetLeases({'slice_hrn':slice_hrn}) logger.debug("IotlabAggregate \t get_slivers \ slices %s leases %s\r\n" % (slices, leases )) if not slices: diff --git a/sfa/iotlab/iotlabdriver.py b/sfa/iotlab/iotlabdriver.py index 865d155b..13344aa3 100644 --- a/sfa/iotlab/iotlabdriver.py +++ b/sfa/iotlab/iotlabdriver.py @@ -1,6 +1,7 @@ """ Implements what a driver should provide for SFA to work. """ +from datetime import datetime from sfa.util.faults import SliverDoesNotExist, Forbidden from sfa.util.sfalogging import logger @@ -26,6 +27,7 @@ from sfa.storage.model import SliverAllocation from sfa.iotlab.iotlabshell import IotlabShell from sqlalchemy.orm import joinedload +from sfa.iotlab.iotlabpostgres import LeaseTableXP class IotlabDriver(Driver): """ Iotlab Driver class inherited from Driver generic class. @@ -377,7 +379,7 @@ class IotlabDriver(Driver): #jobs associated to this slice leases_list = [] - leases_list = self.testbed_shell.GetLeases(login=login) + leases_list = self.GetLeases(login=login) #If no job is running or no job scheduled #return only the slice record if leases_list == [] and fixed_slicerec_dict: @@ -482,6 +484,203 @@ class IotlabDriver(Driver): return return_slicerec_dictlist + def AddLeases(self, hostname_list, slice_record, + lease_start_time, lease_duration): + + """Creates a job in OAR corresponding to the information provided + as parameters. Adds the job id and the slice hrn in the iotlab + database so that we are able to know which slice has which nodes. + + :param hostname_list: list of nodes' OAR hostnames. + :param slice_record: sfa slice record, must contain login and hrn. + :param lease_start_time: starting time , unix timestamp format + :param lease_duration: duration in minutes + + :type hostname_list: list + :type slice_record: dict + :type lease_start_time: integer + :type lease_duration: integer + :returns: job_id, can be None if the job request failed. + + """ + logger.debug("IOTLAB_API \r\n \r\n \t AddLeases hostname_list %s \ + slice_record %s lease_start_time %s lease_duration %s "\ + %( hostname_list, slice_record , lease_start_time, \ + lease_duration)) + + #tmp = slice_record['reg-researchers'][0].split(".") + username = slice_record['login'] + #username = tmp[(len(tmp)-1)] + job_id = self.testbed_shell.LaunchExperimentOnOAR(hostname_list, \ + slice_record['hrn'], \ + lease_start_time, lease_duration, \ + username) + if job_id is not None: + start_time = \ + datetime.fromtimestamp(int(lease_start_time)).\ + strftime(self.testbed_shell.time_format) + end_time = lease_start_time + lease_duration + + + logger.debug("IOTLAB_API \r\n \r\n \t AddLeases TURN ON LOGGING SQL \ + %s %s %s "%(slice_record['hrn'], job_id, end_time)) + + + logger.debug("IOTLAB_API \r\n \r\n \t AddLeases %s %s %s " \ + %(type(slice_record['hrn']), type(job_id), type(end_time))) + + iotlab_ex_row = LeaseTableXP(slice_hrn = slice_record['hrn'], + experiment_id=job_id, + end_time= end_time) + + logger.debug("IOTLAB_API \r\n \r\n \t AddLeases iotlab_ex_row %s" \ + %(iotlab_ex_row)) + self.api.dbsession().add(iotlab_ex_row) + self.api.dbsession().commit() + + logger.debug("IOTLAB_API \t AddLeases hostname_list start_time %s " + %(start_time)) + + return job_id + + def GetLeases(self, lease_filter_dict=None, login=None): + """ + + Get the list of leases from OAR with complete information + about which slice owns which jobs and nodes. + Two purposes: + -Fetch all the jobs from OAR (running, waiting..) + complete the reservation information with slice hrn + found in testbed_xp table. If not available in the table, + assume it is a iotlab slice. + -Updates the iotlab table, deleting jobs when necessary. + + :returns: reservation_list, list of dictionaries with 'lease_id', + 'reserved_nodes','slice_id', 'state', 'user', 'component_id_list', + 'slice_hrn', 'resource_ids', 't_from', 't_until' + :rtype: list + + """ + + unfiltered_reservation_list = self.testbed_shell.GetReservedNodes(login) + + reservation_list = [] + #Find the slice associated with this user iotlab ldap uid + logger.debug(" IOTLAB_API.PY \tGetLeases login %s\ + unfiltered_reservation_list %s " + % (login, unfiltered_reservation_list)) + #Create user dict first to avoid looking several times for + #the same user in LDAP SA 27/07/12 + job_oar_list = [] + jobs_psql_query = self.api.dbsession().query(LeaseTableXP).all() + jobs_psql_dict = dict([(row.experiment_id, row.__dict__) + for row in jobs_psql_query]) + #jobs_psql_dict = jobs_psql_dict) + logger.debug("IOTLAB_API \tGetLeases jobs_psql_dict %s" + % (jobs_psql_dict)) + jobs_psql_id_list = [row.experiment_id for row in jobs_psql_query] + + for resa in unfiltered_reservation_list: + logger.debug("IOTLAB_API \tGetLeases USER %s" + % (resa['user'])) + #Construct list of jobs (runing, waiting..) in oar + job_oar_list.append(resa['lease_id']) + #If there is information on the job in IOTLAB DB ] + #(slice used and job id) + if resa['lease_id'] in jobs_psql_dict: + job_info = jobs_psql_dict[resa['lease_id']] + logger.debug("IOTLAB_API \tGetLeases job_info %s" + % (job_info)) + resa['slice_hrn'] = job_info['slice_hrn'] + resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice') + + #otherwise, assume it is a iotlab slice: + else: + resa['slice_id'] = hrn_to_urn(self.testbed_shell.root_auth \ + + '.' + resa['user'] + "_slice", + 'slice') + resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn() + + resa['component_id_list'] = [] + #Transform the hostnames into urns (component ids) + for node in resa['reserved_nodes']: + + iotlab_xrn = xrn_object(self.testbed_shell.root_auth, node) + resa['component_id_list'].append(iotlab_xrn.urn) + + if lease_filter_dict: + logger.debug("IOTLAB_API \tGetLeases \ + \r\n leasefilter %s" % ( lease_filter_dict)) + + filter_dict_functions = { + 'slice_hrn' : IotlabShell.filter_lease_name, + 't_from' : IotlabShell.filter_lease_start_time + } + reservation_list = list(unfiltered_reservation_list) + for filter_type in lease_filter_dict: + logger.debug("IOTLAB_API \tGetLeases reservation_list %s" \ + % (reservation_list)) + reservation_list = filter_dict_functions[filter_type](\ + reservation_list,lease_filter_dict[filter_type] ) + + # Filter the reservation list with a maximum timespan so that the + # leases and jobs running after this timestamp do not appear + # in the result leases. + # if 'start_time' in : + # if resa['start_time'] < lease_filter_dict['start_time']: + # reservation_list.append(resa) + + + # if 'name' in lease_filter_dict and \ + # lease_filter_dict['name'] == resa['slice_hrn']: + # reservation_list.append(resa) + + + if lease_filter_dict is None: + reservation_list = unfiltered_reservation_list + + self.update_experiments_in_additional_sfa_db(job_oar_list, jobs_psql_id_list) + + logger.debug(" IOTLAB_API.PY \tGetLeases reservation_list %s" + % (reservation_list)) + return reservation_list + + + + def update_experiments_in_additional_sfa_db(self, + experiment_list_from_testbed, experiment_list_in_db): + """ Cleans the iotlab db by deleting expired and cancelled jobs. + + Compares the list of experiment ids given by the testbed with the + experiment ids that are already in the database, deletes the + experiments that are no longer in the testbed experiment id list. + + :param experiment_list_from_testbed: list of experiment ids coming + from testbed + :type experiment_list_from_testbed: list + :param experiment_list_in_db: list of experiment ids from the sfa + additionnal database. + :type experiment_list_in_db: list + + :returns: None + """ + #Turn the list into a set + set_experiment_list_in_db = set(experiment_list_in_db) + + kept_experiments = set(experiment_list_from_testbed).intersection(set_experiment_list_in_db) + logger.debug("\r\n \t update_experiments_in_additional_sfa_db \ + experiment_list_in_db %s \r\n \ + experiment_list_from_testbed %s \ + kept_experiments %s " + % (set_experiment_list_in_db, + experiment_list_from_testbed, kept_experiments)) + deleted_experiments = set_experiment_list_in_db.difference( + kept_experiments) + deleted_experiments = list(deleted_experiments) + if len(deleted_experiments) > 0: + self.api.dbsession().query(LeaseTableXP).filter(LeaseTableXP.experiment_id.in_(deleted_experiments)).delete(synchronize_session='fetch') + self.api.dbsession().commit() + return def AddSlice(self, slice_record, user_record): """ diff --git a/sfa/iotlab/iotlabpostgres.py b/sfa/iotlab/iotlabpostgres.py index eed8769a..de74dc93 100644 --- a/sfa/iotlab/iotlabpostgres.py +++ b/sfa/iotlab/iotlabpostgres.py @@ -2,18 +2,18 @@ File defining classes to handle the table in the iotlab dedicated database. """ -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker +# from sqlalchemy import create_engine +# from sqlalchemy.orm import sessionmaker # from sfa.util.config import Config -from sfa.util.sfalogging import logger - +# from sfa.util.sfalogging import logger +from sfa.storage.model import Base, AlchemyObj from sqlalchemy import Column, Integer, String -from sqlalchemy import Table, MetaData -from sqlalchemy.ext.declarative import declarative_base +# from sqlalchemy import Table, MetaData +# # from sqlalchemy.ext.declarative import declarative_base -# from sqlalchemy.dialects import postgresql +# # from sqlalchemy.dialects import postgresql -from sqlalchemy.exc import NoSuchTableError +# from sqlalchemy.exc import NoSuchTableError #Dict holding the columns names of the table as keys @@ -26,10 +26,12 @@ slice_table = {'record_id_user': 'integer PRIMARY KEY references X ON DELETE \ # tablenames_dict = {'lease_table': slice_table} -TestbedBase = declarative_base() +# TestbedBase = declarative_base() -class LeaseTableXP (TestbedBase): +# class LeaseTableXP (TestbedBase): +# class LeaseTableXP (Base,AlchemyObj): +class LeaseTableXP (Base): """ SQL alchemy class to manipulate the rows of the slice_iotlab table in lease_table database. Handles the records representation and creates the table if it does not exist yet. @@ -62,212 +64,212 @@ class LeaseTableXP (TestbedBase): return result -class TestbedAdditionalSfaDB(object): - """ SQL Alchemy connection class. - From alchemy.py - """ - # Stores the unique Singleton instance- - _connection_singleton = None - # defines the database name - dbname = "testbed_xp" - - class Singleton: - """ - Class used with this Python singleton design pattern to allow the - definition of one single instance of iotlab db session in the whole - code. Wherever a conenction to the database is needed, this class - returns the same instance every time. Removes the need for global - variable throughout the code. - """ - - def __init__(self, config, debug=False): - self.testbed_engine = None - self.testbed_session = None - self.url = None - self.create_testbed_engine(config, debug) - self.session() - - def create_testbed_engine(self, config, debug=False): - """Creates the SQLAlchemy engine, which is the starting point for - any SQLAlchemy application. - :param config: configuration object created by SFA based on the - configuration file in /etc - :param debug: if set to true, echo and echo pool will be set to true - as well. If echo is True, all statements as well as a repr() of - their parameter lists to the engines logger, which defaults to - sys.stdout. If echo_pool is True, the connection pool will log all - checkouts/checkins to the logging stream. A python logger can be - used to configure this logging directly but so far it has not been - configured. Refer to sql alchemy engine documentation. - - :type config: Config instance (sfa.util.config) - :type debug: bool - - """ - - if debug is True: - l_echo_pool = True - l_echo = True - else: - l_echo_pool = False - l_echo = False - # the former PostgreSQL.py used the psycopg2 directly and was doing - #self.connection.set_client_encoding("UNICODE") - # it's unclear how to achieve this in sqlalchemy, nor if it's needed - # at all - # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode - # we indeed have /var/lib/pgsql/data/postgresql.conf where - # this setting is unset, it might be an angle to tweak that if need - # be try a unix socket first - # - omitting the hostname does the trick - unix_url = "postgresql+psycopg2://%s:%s@:%s/%s" \ - % (config.SFA_DB_USER, config.SFA_DB_PASSWORD, - config.SFA_DB_PORT, TestbedAdditionalSfaDB.dbname) - - # the TCP fallback method - tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s" \ - % (config.SFA_DB_USER, config.SFA_DB_PASSWORD, - config.SFA_DB_HOST, config.SFA_DB_PORT, TestbedAdditionalSfaDB.dbname) - - for url in [unix_url, tcp_url]: - try: - self.testbed_engine = create_engine( - url, echo_pool=l_echo_pool, echo=l_echo) - self.check() - self.url = url - return - except: - pass - self.testbed_engine = None - - raise Exception("Could not connect to database") - - def check(self): - """ Check if a table exists by trying a selection - on the table. - - """ - self.testbed_engine.execute("select 1").scalar() - - - def session(self): - """ - Creates a SQLalchemy session. Once the session object is created - it should be used throughout the code for all the operations on - tables for this given database. - - """ - if self.testbed_session is None: - Session = sessionmaker() - self.testbed_session = Session(bind=self.testbed_engine) - return self.testbed_session - - def close_session(self): - """ - Closes connection to database. - - """ - if self.testbed_session is None: - return - self.testbed_session.close() - self.testbed_session = None - - - def update_experiments_in_additional_sfa_db(self, - experiment_list_from_testbed, experiment_list_in_db): - """ Cleans the iotlab db by deleting expired and cancelled jobs. - - Compares the list of experiment ids given by the testbed with the - experiment ids that are already in the database, deletes the - experiments that are no longer in the testbed experiment id list. - - :param experiment_list_from_testbed: list of experiment ids coming - from testbed - :type experiment_list_from_testbed: list - :param experiment_list_in_db: list of experiment ids from the sfa - additionnal database. - :type experiment_list_in_db: list - - :returns: None - """ - #Turn the list into a set - set_experiment_list_in_db = set(experiment_list_in_db) - - kept_experiments = set(experiment_list_from_testbed).intersection(set_experiment_list_in_db) - logger.debug("\r\n \t update_experiments_in_additional_sfa_db \ - experiment_list_in_db %s \r\n \ - experiment_list_from_testbed %s \ - kept_experiments %s " - % (set_experiment_list_in_db, - experiment_list_from_testbed, kept_experiments)) - deleted_experiments = set_experiment_list_in_db.difference( - kept_experiments) - deleted_experiments = list(deleted_experiments) - if len(deleted_experiments) > 0: - self.testbed_session.query(LeaseTableXP).filter(LeaseTableXP.experiment_id.in_(deleted_experiments)).delete(synchronize_session='fetch') - self.testbed_session.commit() - return - - def __init__(self, config, debug=False): - self.sl_base = TestbedBase - - # Check whether we already have an instance - if TestbedAdditionalSfaDB._connection_singleton is None: - TestbedAdditionalSfaDB._connection_singleton = \ - TestbedAdditionalSfaDB.Singleton(config, debug) - - # Store instance reference as the only member in the handle - self._EventHandler_singleton = \ - TestbedAdditionalSfaDB._connection_singleton - - def __getattr__(self, aAttr): - """ - Delegate access to implementation. - - :param aAttr: Attribute wanted. - :returns: Attribute - """ - return getattr(self._connection_singleton, aAttr) - - - - # def __setattr__(self, aAttr, aValue): - # """Delegate access to implementation. - - # :param attr: Attribute wanted. - # :param value: Vaule to be set. - # :return: Result of operation. - # """ - # return setattr(self._connection_singleton, aAttr, aValue) - - def exists(self, tablename): - """ - Checks if the table specified as tablename exists. - :param tablename: name of the table in the db that has to be checked. - :type tablename: string - :returns: True if the table exists, False otherwise. - :rtype: bool - - """ - metadata = MetaData(bind=self.testbed_engine) - try: - table = Table(tablename, metadata, autoload=True) - return True - - except NoSuchTableError: - logger.log_exc("SLABPOSTGRES tablename %s does not exist" - % (tablename)) - return False - - def createtable(self): - """ - Creates all the table sof the engine. - Uses the global dictionnary holding the tablenames and the table schema. - - """ - - logger.debug("IOTLABPOSTGRES createtable \ - TestbedBase.metadata.sorted_tables %s \r\n engine %s" - % (TestbedBase.metadata.sorted_tables, self.testbed_engine)) - TestbedBase.metadata.create_all(self.testbed_engine) - return +# class TestbedAdditionalSfaDB(object): +# """ SQL Alchemy connection class. +# From alchemy.py +# """ +# # Stores the unique Singleton instance- +# _connection_singleton = None +# # defines the database name +# dbname = "testbed_xp" + +# class Singleton: +# """ +# Class used with this Python singleton design pattern to allow the +# definition of one single instance of iotlab db session in the whole +# code. Wherever a conenction to the database is needed, this class +# returns the same instance every time. Removes the need for global +# variable throughout the code. +# """ + +# def __init__(self, config, debug=False): +# self.testbed_engine = None +# self.testbed_session = None +# self.url = None +# self.create_testbed_engine(config, debug) +# self.session() + +# def create_testbed_engine(self, config, debug=False): +# """Creates the SQLAlchemy engine, which is the starting point for +# any SQLAlchemy application. +# :param config: configuration object created by SFA based on the +# configuration file in /etc +# :param debug: if set to true, echo and echo pool will be set to true +# as well. If echo is True, all statements as well as a repr() of +# their parameter lists to the engines logger, which defaults to +# sys.stdout. If echo_pool is True, the connection pool will log all +# checkouts/checkins to the logging stream. A python logger can be +# used to configure this logging directly but so far it has not been +# configured. Refer to sql alchemy engine documentation. + +# :type config: Config instance (sfa.util.config) +# :type debug: bool + +# """ + +# if debug is True: +# l_echo_pool = True +# l_echo = True +# else: +# l_echo_pool = False +# l_echo = False +# # the former PostgreSQL.py used the psycopg2 directly and was doing +# #self.connection.set_client_encoding("UNICODE") +# # it's unclear how to achieve this in sqlalchemy, nor if it's needed +# # at all +# # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode +# # we indeed have /var/lib/pgsql/data/postgresql.conf where +# # this setting is unset, it might be an angle to tweak that if need +# # be try a unix socket first +# # - omitting the hostname does the trick +# unix_url = "postgresql+psycopg2://%s:%s@:%s/%s" \ +# % (config.SFA_DB_USER, config.SFA_DB_PASSWORD, +# config.SFA_DB_PORT, TestbedAdditionalSfaDB.dbname) + +# # the TCP fallback method +# tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s" \ +# % (config.SFA_DB_USER, config.SFA_DB_PASSWORD, +# config.SFA_DB_HOST, config.SFA_DB_PORT, TestbedAdditionalSfaDB.dbname) + +# for url in [unix_url, tcp_url]: +# try: +# self.testbed_engine = create_engine( +# url, echo_pool=l_echo_pool, echo=l_echo) +# self.check() +# self.url = url +# return +# except: +# pass +# self.testbed_engine = None + +# raise Exception("Could not connect to database") + +# def check(self): +# """ Check if a table exists by trying a selection +# on the table. + +# """ +# self.testbed_engine.execute("select 1").scalar() + + +# def session(self): +# """ +# Creates a SQLalchemy session. Once the session object is created +# it should be used throughout the code for all the operations on +# tables for this given database. + +# """ +# if self.testbed_session is None: +# Session = sessionmaker() +# self.testbed_session = Session(bind=self.testbed_engine) +# return self.testbed_session + +# def close_session(self): +# """ +# Closes connection to database. + +# """ +# if self.testbed_session is None: +# return +# self.testbed_session.close() +# self.testbed_session = None + + + # def update_experiments_in_additional_sfa_db(self, + # experiment_list_from_testbed, experiment_list_in_db): + # """ Cleans the iotlab db by deleting expired and cancelled jobs. + + # Compares the list of experiment ids given by the testbed with the + # experiment ids that are already in the database, deletes the + # experiments that are no longer in the testbed experiment id list. + + # :param experiment_list_from_testbed: list of experiment ids coming + # from testbed + # :type experiment_list_from_testbed: list + # :param experiment_list_in_db: list of experiment ids from the sfa + # additionnal database. + # :type experiment_list_in_db: list + + # :returns: None + # """ + # #Turn the list into a set + # set_experiment_list_in_db = set(experiment_list_in_db) + + # kept_experiments = set(experiment_list_from_testbed).intersection(set_experiment_list_in_db) + # logger.debug("\r\n \t update_experiments_in_additional_sfa_db \ + # experiment_list_in_db %s \r\n \ + # experiment_list_from_testbed %s \ + # kept_experiments %s " + # % (set_experiment_list_in_db, + # experiment_list_from_testbed, kept_experiments)) + # deleted_experiments = set_experiment_list_in_db.difference( + # kept_experiments) + # deleted_experiments = list(deleted_experiments) + # if len(deleted_experiments) > 0: + # self.testbed_session.query(LeaseTableXP).filter(LeaseTableXP.experiment_id.in_(deleted_experiments)).delete(synchronize_session='fetch') + # self.testbed_session.commit() + # return + + # def __init__(self, config, debug=False): + # self.sl_base = TestbedBase + + # # Check whether we already have an instance + # if TestbedAdditionalSfaDB._connection_singleton is None: + # TestbedAdditionalSfaDB._connection_singleton = \ + # TestbedAdditionalSfaDB.Singleton(config, debug) + + # # Store instance reference as the only member in the handle + # self._EventHandler_singleton = \ + # TestbedAdditionalSfaDB._connection_singleton + + # def __getattr__(self, aAttr): + # """ + # Delegate access to implementation. + + # :param aAttr: Attribute wanted. + # :returns: Attribute + # """ + # return getattr(self._connection_singleton, aAttr) + + + + # # def __setattr__(self, aAttr, aValue): + # # """Delegate access to implementation. + + # # :param attr: Attribute wanted. + # # :param value: Vaule to be set. + # # :return: Result of operation. + # # """ + # # return setattr(self._connection_singleton, aAttr, aValue) + + # def exists(self, tablename): + # """ + # Checks if the table specified as tablename exists. + # :param tablename: name of the table in the db that has to be checked. + # :type tablename: string + # :returns: True if the table exists, False otherwise. + # :rtype: bool + + # """ + # metadata = MetaData(bind=self.testbed_engine) + # try: + # table = Table(tablename, metadata, autoload=True) + # return True + + # except NoSuchTableError: + # logger.log_exc("SLABPOSTGRES tablename %s does not exist" + # % (tablename)) + # return False + + # def createtable(self): + # """ + # Creates all the table sof the engine. + # Uses the global dictionnary holding the tablenames and the table schema. + + # """ + + # logger.debug("IOTLABPOSTGRES createtable \ + # TestbedBase.metadata.sorted_tables %s \r\n engine %s" + # % (TestbedBase.metadata.sorted_tables, self.testbed_engine)) + # TestbedBase.metadata.create_all(self.testbed_engine) + # return diff --git a/sfa/iotlab/iotlabshell.py b/sfa/iotlab/iotlabshell.py index afc170c8..406bed90 100644 --- a/sfa/iotlab/iotlabshell.py +++ b/sfa/iotlab/iotlabshell.py @@ -11,7 +11,7 @@ from datetime import datetime from sfa.util.sfalogging import logger -from sfa.iotlab.iotlabpostgres import TestbedAdditionalSfaDB, LeaseTableXP +from sfa.iotlab.iotlabpostgres import LeaseTableXP from sfa.iotlab.OARrestapi import OARrestapi from sfa.iotlab.LDAPapi import LDAPapi @@ -34,7 +34,7 @@ class IotlabShell(): :type config: Config object """ - self.leases_db = TestbedAdditionalSfaDB(config) + # self.leases_db = TestbedAdditionalSfaDB(config) self.oar = OARrestapi() self.ldap = LDAPapi() self.time_format = "%Y-%m-%d %H:%M:%S" @@ -628,64 +628,7 @@ class IotlabShell(): return jobid - def AddLeases(self, hostname_list, slice_record, - lease_start_time, lease_duration): - """Creates a job in OAR corresponding to the information provided - as parameters. Adds the job id and the slice hrn in the iotlab - database so that we are able to know which slice has which nodes. - - :param hostname_list: list of nodes' OAR hostnames. - :param slice_record: sfa slice record, must contain login and hrn. - :param lease_start_time: starting time , unix timestamp format - :param lease_duration: duration in minutes - - :type hostname_list: list - :type slice_record: dict - :type lease_start_time: integer - :type lease_duration: integer - :returns: job_id, can be None if the job request failed. - - """ - logger.debug("IOTLAB_API \r\n \r\n \t AddLeases hostname_list %s \ - slice_record %s lease_start_time %s lease_duration %s "\ - %( hostname_list, slice_record , lease_start_time, \ - lease_duration)) - - #tmp = slice_record['reg-researchers'][0].split(".") - username = slice_record['login'] - #username = tmp[(len(tmp)-1)] - job_id = self.LaunchExperimentOnOAR(hostname_list, \ - slice_record['hrn'], \ - lease_start_time, lease_duration, \ - username) - if job_id is not None: - start_time = \ - datetime.fromtimestamp(int(lease_start_time)).\ - strftime(self.time_format) - end_time = lease_start_time + lease_duration - - - logger.debug("IOTLAB_API \r\n \r\n \t AddLeases TURN ON LOGGING SQL \ - %s %s %s "%(slice_record['hrn'], job_id, end_time)) - - - logger.debug("IOTLAB_API \r\n \r\n \t AddLeases %s %s %s " \ - %(type(slice_record['hrn']), type(job_id), type(end_time))) - - iotlab_ex_row = LeaseTableXP(slice_hrn = slice_record['hrn'], - experiment_id=job_id, - end_time= end_time) - - logger.debug("IOTLAB_API \r\n \r\n \t AddLeases iotlab_ex_row %s" \ - %(iotlab_ex_row)) - self.leases_db.testbed_session.add(iotlab_ex_row) - self.leases_db.testbed_session.commit() - - logger.debug("IOTLAB_API \t AddLeases hostname_list start_time %s " - %(start_time)) - - return job_id #Delete the jobs from job_iotlab table @@ -756,106 +699,6 @@ class IotlabShell(): return filtered_reservation_list - def GetLeases(self, lease_filter_dict=None, login=None): - """ - - Get the list of leases from OAR with complete information - about which slice owns which jobs and nodes. - Two purposes: - -Fetch all the jobs from OAR (running, waiting..) - complete the reservation information with slice hrn - found in testbed_xp table. If not available in the table, - assume it is a iotlab slice. - -Updates the iotlab table, deleting jobs when necessary. - - :returns: reservation_list, list of dictionaries with 'lease_id', - 'reserved_nodes','slice_id', 'state', 'user', 'component_id_list', - 'slice_hrn', 'resource_ids', 't_from', 't_until' - :rtype: list - - """ - - unfiltered_reservation_list = self.GetReservedNodes(login) - - reservation_list = [] - #Find the slice associated with this user iotlab ldap uid - logger.debug(" IOTLAB_API.PY \tGetLeases login %s\ - unfiltered_reservation_list %s " - % (login, unfiltered_reservation_list)) - #Create user dict first to avoid looking several times for - #the same user in LDAP SA 27/07/12 - job_oar_list = [] - jobs_psql_query = self.leases_db.testbed_session.query(LeaseTableXP).all() - jobs_psql_dict = dict([(row.experiment_id, row.__dict__) - for row in jobs_psql_query]) - #jobs_psql_dict = jobs_psql_dict) - logger.debug("IOTLAB_API \tGetLeases jobs_psql_dict %s" - % (jobs_psql_dict)) - jobs_psql_id_list = [row.experiment_id for row in jobs_psql_query] - - for resa in unfiltered_reservation_list: - logger.debug("IOTLAB_API \tGetLeases USER %s" - % (resa['user'])) - #Construct list of jobs (runing, waiting..) in oar - job_oar_list.append(resa['lease_id']) - #If there is information on the job in IOTLAB DB ] - #(slice used and job id) - if resa['lease_id'] in jobs_psql_dict: - job_info = jobs_psql_dict[resa['lease_id']] - logger.debug("IOTLAB_API \tGetLeases job_info %s" - % (job_info)) - resa['slice_hrn'] = job_info['slice_hrn'] - resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice') - - #otherwise, assume it is a iotlab slice: - else: - resa['slice_id'] = hrn_to_urn(self.root_auth + '.' + - resa['user'] + "_slice", 'slice') - resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn() - - resa['component_id_list'] = [] - #Transform the hostnames into urns (component ids) - for node in resa['reserved_nodes']: - - iotlab_xrn = xrn_object(self.root_auth, node) - resa['component_id_list'].append(iotlab_xrn.urn) - - if lease_filter_dict: - logger.debug("IOTLAB_API \tGetLeases \ - \r\n leasefilter %s" % ( lease_filter_dict)) - - filter_dict_functions = { - 'slice_hrn' : IotlabShell.filter_lease_name, - 't_from' : IotlabShell.filter_lease_start_time - } - reservation_list = list(unfiltered_reservation_list) - for filter_type in lease_filter_dict: - logger.debug("IOTLAB_API \tGetLeases reservation_list %s" \ - % (reservation_list)) - reservation_list = filter_dict_functions[filter_type](\ - reservation_list,lease_filter_dict[filter_type] ) - - # Filter the reservation list with a maximum timespan so that the - # leases and jobs running after this timestamp do not appear - # in the result leases. - # if 'start_time' in : - # if resa['start_time'] < lease_filter_dict['start_time']: - # reservation_list.append(resa) - - - # if 'name' in lease_filter_dict and \ - # lease_filter_dict['name'] == resa['slice_hrn']: - # reservation_list.append(resa) - - - if lease_filter_dict is None: - reservation_list = unfiltered_reservation_list - - self.leases_db.update_experiments_in_additional_sfa_db(job_oar_list, jobs_psql_id_list) - - logger.debug(" IOTLAB_API.PY \tGetLeases reservation_list %s" - % (reservation_list)) - return reservation_list diff --git a/sfa/iotlab/iotlabslices.py b/sfa/iotlab/iotlabslices.py index a74d79a9..f5ebfa7c 100644 --- a/sfa/iotlab/iotlabslices.py +++ b/sfa/iotlab/iotlabslices.py @@ -108,7 +108,7 @@ class IotlabSlices: logger.debug("IOTLABSLICES verify_slice_leases sfa_slice %s " % (sfa_slice)) #First get the list of current leases from OAR - leases = self.driver.testbed_shell.GetLeases({'slice_hrn': sfa_slice['hrn']}) + leases = self.driver.GetLeases({'slice_hrn': sfa_slice['hrn']}) logger.debug("IOTLABSLICES verify_slice_leases requested_jobs_dict %s \ leases %s " % (requested_jobs_dict, leases)) @@ -207,12 +207,12 @@ class IotlabSlices: logger.debug("IOTLABSLICES \ NEWLEASE slice %s job %s" % (sfa_slice, job)) - job_id = self.driver.testbed_shell.AddLeases( + job_id = self.driver.AddLeases( job['hostname'], sfa_slice, int(job['start_time']), int(job['duration'])) if job_id is not None: - new_leases = self.driver.testbed_shell.GetLeases(login= + new_leases = self.driver.GetLeases(login= sfa_slice['login']) for new_lease in new_leases: leases.append(new_lease) @@ -228,7 +228,7 @@ class IotlabSlices: if reschedule_jobs_dict: for start_time in reschedule_jobs_dict: job = reschedule_jobs_dict[start_time] - self.driver.testbed_shell.AddLeases( + self.driver.AddLeases( job['hostname'], sfa_slice, int(job['start_time']), int(job['duration'])) -- 2.43.0