Using alchemy.py classes to create a session to the DB.
from sfa.util.xrn import Xrn, get_authority, hrn_to_urn
from sfa.iotlab.iotlabshell import IotlabShell
# from sfa.iotlab.iotlabdriver import IotlabDriver
-from sfa.iotlab.iotlabpostgres import TestbedAdditionalSfaDB
+# from sfa.iotlab.iotlabpostgres import TestbedAdditionalSfaDB
from sfa.trust.certificate import Keypair, convert_public_key
from sfa.trust.gid import create_uuid
# using global alchemy.session() here is fine
# as importer is on standalone one-shot process
-from sfa.storage.alchemy import global_dbsession
+
+from sfa.storage.alchemy import global_dbsession, engine
from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \
- RegUser, RegKey
+ RegUser, RegKey, init_tables
+from sqlalchemy import Table, MetaData
+from sqlalchemy.exc import SQLAlchemyError, NoSuchTableError
-from sqlalchemy.exc import SQLAlchemyError
class IotlabImporter:
self.auth_hierarchy = auth_hierarchy
self.logger = loc_logger
self.logger.setLevelDebug()
+
#retrieve all existing SFA objects
self.all_records = global_dbsession.query(RegRecord).all()
for record in self.all_records if record.pointer != -1])
+
+ def exists(self, tablename, engine):
+ """
+ Checks if the table specified as tablename exists.
+ :param tablename: name of the table in the db that has to be checked.
+ :type tablename: string
+ :returns: True if the table exists, False otherwise.
+ :rtype: bool
+
+ """
+ metadata = MetaData(bind=engine)
+ try:
+ table = Table(tablename, metadata, autoload=True)
+ return True
+
+ except NoSuchTableError:
+ self.logger.log_exc("Iotlabimporter tablename %s does not exist"
+ % (tablename))
+ return False
+
+
@staticmethod
def hostname_to_hrn_escaped(root_auth, hostname):
"""
root_auth = config.SFA_REGISTRY_ROOT_AUTH
testbed_shell = IotlabShell(config)
- leases_db = TestbedAdditionalSfaDB(config)
+ # leases_db = TestbedAdditionalSfaDB(config)
#Create special slice table for iotlab
- if not leases_db.exists('lease_table'):
- leases_db.createtable()
+ if not self.exists('lease_table', engine):
+ init_tables(engine)
self.logger.info("IotlabImporter.run: lease_table table created ")
# import site and node records in site into the SFA db.
logger.debug("IOTLABAGGREGATE get_all_leases ldap_username %s "
% (ldap_username))
- leases = self.driver.testbed_shell.GetLeases(login=ldap_username)
+ leases = self.driver.GetLeases(login=ldap_username)
grain = self.driver.testbed_shell.GetLeaseGranularity()
rspec_leases = []
if slice_hrn:
slices = self.driver.GetSlices(slice_hrn,
slice_filter_type)
- leases = self.driver.testbed_shell.GetLeases(
- {'slice_hrn':slice_hrn})
+ leases = self.driver.GetLeases({'slice_hrn':slice_hrn})
logger.debug("IotlabAggregate \t get_slivers \
slices %s leases %s\r\n" % (slices, leases ))
if not slices:
"""
Implements what a driver should provide for SFA to work.
"""
+from datetime import datetime
from sfa.util.faults import SliverDoesNotExist, Forbidden
from sfa.util.sfalogging import logger
from sfa.iotlab.iotlabshell import IotlabShell
from sqlalchemy.orm import joinedload
+from sfa.iotlab.iotlabpostgres import LeaseTableXP
class IotlabDriver(Driver):
""" Iotlab Driver class inherited from Driver generic class.
#jobs associated to this slice
leases_list = []
- leases_list = self.testbed_shell.GetLeases(login=login)
+ leases_list = self.GetLeases(login=login)
#If no job is running or no job scheduled
#return only the slice record
if leases_list == [] and fixed_slicerec_dict:
return return_slicerec_dictlist
+ def AddLeases(self, hostname_list, slice_record,
+ lease_start_time, lease_duration):
+
+ """Creates a job in OAR corresponding to the information provided
+ as parameters. Adds the job id and the slice hrn in the iotlab
+ database so that we are able to know which slice has which nodes.
+
+ :param hostname_list: list of nodes' OAR hostnames.
+ :param slice_record: sfa slice record, must contain login and hrn.
+ :param lease_start_time: starting time , unix timestamp format
+ :param lease_duration: duration in minutes
+
+ :type hostname_list: list
+ :type slice_record: dict
+ :type lease_start_time: integer
+ :type lease_duration: integer
+ :returns: job_id, can be None if the job request failed.
+
+ """
+ logger.debug("IOTLAB_API \r\n \r\n \t AddLeases hostname_list %s \
+ slice_record %s lease_start_time %s lease_duration %s "\
+ %( hostname_list, slice_record , lease_start_time, \
+ lease_duration))
+
+ #tmp = slice_record['reg-researchers'][0].split(".")
+ username = slice_record['login']
+ #username = tmp[(len(tmp)-1)]
+ job_id = self.testbed_shell.LaunchExperimentOnOAR(hostname_list, \
+ slice_record['hrn'], \
+ lease_start_time, lease_duration, \
+ username)
+ if job_id is not None:
+ start_time = \
+ datetime.fromtimestamp(int(lease_start_time)).\
+ strftime(self.testbed_shell.time_format)
+ end_time = lease_start_time + lease_duration
+
+
+ logger.debug("IOTLAB_API \r\n \r\n \t AddLeases TURN ON LOGGING SQL \
+ %s %s %s "%(slice_record['hrn'], job_id, end_time))
+
+
+ logger.debug("IOTLAB_API \r\n \r\n \t AddLeases %s %s %s " \
+ %(type(slice_record['hrn']), type(job_id), type(end_time)))
+
+ iotlab_ex_row = LeaseTableXP(slice_hrn = slice_record['hrn'],
+ experiment_id=job_id,
+ end_time= end_time)
+
+ logger.debug("IOTLAB_API \r\n \r\n \t AddLeases iotlab_ex_row %s" \
+ %(iotlab_ex_row))
+ self.api.dbsession().add(iotlab_ex_row)
+ self.api.dbsession().commit()
+
+ logger.debug("IOTLAB_API \t AddLeases hostname_list start_time %s "
+ %(start_time))
+
+ return job_id
+
+ def GetLeases(self, lease_filter_dict=None, login=None):
+ """
+
+ Get the list of leases from OAR with complete information
+ about which slice owns which jobs and nodes.
+ Two purposes:
+ -Fetch all the jobs from OAR (running, waiting..)
+ complete the reservation information with slice hrn
+ found in testbed_xp table. If not available in the table,
+ assume it is a iotlab slice.
+ -Updates the iotlab table, deleting jobs when necessary.
+
+ :returns: reservation_list, list of dictionaries with 'lease_id',
+ 'reserved_nodes','slice_id', 'state', 'user', 'component_id_list',
+ 'slice_hrn', 'resource_ids', 't_from', 't_until'
+ :rtype: list
+
+ """
+
+ unfiltered_reservation_list = self.testbed_shell.GetReservedNodes(login)
+
+ reservation_list = []
+ #Find the slice associated with this user iotlab ldap uid
+ logger.debug(" IOTLAB_API.PY \tGetLeases login %s\
+ unfiltered_reservation_list %s "
+ % (login, unfiltered_reservation_list))
+ #Create user dict first to avoid looking several times for
+ #the same user in LDAP SA 27/07/12
+ job_oar_list = []
+ jobs_psql_query = self.api.dbsession().query(LeaseTableXP).all()
+ jobs_psql_dict = dict([(row.experiment_id, row.__dict__)
+ for row in jobs_psql_query])
+ #jobs_psql_dict = jobs_psql_dict)
+ logger.debug("IOTLAB_API \tGetLeases jobs_psql_dict %s"
+ % (jobs_psql_dict))
+ jobs_psql_id_list = [row.experiment_id for row in jobs_psql_query]
+
+ for resa in unfiltered_reservation_list:
+ logger.debug("IOTLAB_API \tGetLeases USER %s"
+ % (resa['user']))
+ #Construct list of jobs (runing, waiting..) in oar
+ job_oar_list.append(resa['lease_id'])
+ #If there is information on the job in IOTLAB DB ]
+ #(slice used and job id)
+ if resa['lease_id'] in jobs_psql_dict:
+ job_info = jobs_psql_dict[resa['lease_id']]
+ logger.debug("IOTLAB_API \tGetLeases job_info %s"
+ % (job_info))
+ resa['slice_hrn'] = job_info['slice_hrn']
+ resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
+
+ #otherwise, assume it is a iotlab slice:
+ else:
+ resa['slice_id'] = hrn_to_urn(self.testbed_shell.root_auth \
+ + '.' + resa['user'] + "_slice",
+ 'slice')
+ resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
+
+ resa['component_id_list'] = []
+ #Transform the hostnames into urns (component ids)
+ for node in resa['reserved_nodes']:
+
+ iotlab_xrn = xrn_object(self.testbed_shell.root_auth, node)
+ resa['component_id_list'].append(iotlab_xrn.urn)
+
+ if lease_filter_dict:
+ logger.debug("IOTLAB_API \tGetLeases \
+ \r\n leasefilter %s" % ( lease_filter_dict))
+
+ filter_dict_functions = {
+ 'slice_hrn' : IotlabShell.filter_lease_name,
+ 't_from' : IotlabShell.filter_lease_start_time
+ }
+ reservation_list = list(unfiltered_reservation_list)
+ for filter_type in lease_filter_dict:
+ logger.debug("IOTLAB_API \tGetLeases reservation_list %s" \
+ % (reservation_list))
+ reservation_list = filter_dict_functions[filter_type](\
+ reservation_list,lease_filter_dict[filter_type] )
+
+ # Filter the reservation list with a maximum timespan so that the
+ # leases and jobs running after this timestamp do not appear
+ # in the result leases.
+ # if 'start_time' in :
+ # if resa['start_time'] < lease_filter_dict['start_time']:
+ # reservation_list.append(resa)
+
+
+ # if 'name' in lease_filter_dict and \
+ # lease_filter_dict['name'] == resa['slice_hrn']:
+ # reservation_list.append(resa)
+
+
+ if lease_filter_dict is None:
+ reservation_list = unfiltered_reservation_list
+
+ self.update_experiments_in_additional_sfa_db(job_oar_list, jobs_psql_id_list)
+
+ logger.debug(" IOTLAB_API.PY \tGetLeases reservation_list %s"
+ % (reservation_list))
+ return reservation_list
+
+
+
+ def update_experiments_in_additional_sfa_db(self,
+ experiment_list_from_testbed, experiment_list_in_db):
+ """ Cleans the iotlab db by deleting expired and cancelled jobs.
+
+ Compares the list of experiment ids given by the testbed with the
+ experiment ids that are already in the database, deletes the
+ experiments that are no longer in the testbed experiment id list.
+
+ :param experiment_list_from_testbed: list of experiment ids coming
+ from testbed
+ :type experiment_list_from_testbed: list
+ :param experiment_list_in_db: list of experiment ids from the sfa
+ additionnal database.
+ :type experiment_list_in_db: list
+
+ :returns: None
+ """
+ #Turn the list into a set
+ set_experiment_list_in_db = set(experiment_list_in_db)
+
+ kept_experiments = set(experiment_list_from_testbed).intersection(set_experiment_list_in_db)
+ logger.debug("\r\n \t update_experiments_in_additional_sfa_db \
+ experiment_list_in_db %s \r\n \
+ experiment_list_from_testbed %s \
+ kept_experiments %s "
+ % (set_experiment_list_in_db,
+ experiment_list_from_testbed, kept_experiments))
+ deleted_experiments = set_experiment_list_in_db.difference(
+ kept_experiments)
+ deleted_experiments = list(deleted_experiments)
+ if len(deleted_experiments) > 0:
+ self.api.dbsession().query(LeaseTableXP).filter(LeaseTableXP.experiment_id.in_(deleted_experiments)).delete(synchronize_session='fetch')
+ self.api.dbsession().commit()
+ return
def AddSlice(self, slice_record, user_record):
"""
File defining classes to handle the table in the iotlab dedicated database.
"""
-from sqlalchemy import create_engine
-from sqlalchemy.orm import sessionmaker
+# from sqlalchemy import create_engine
+# from sqlalchemy.orm import sessionmaker
# from sfa.util.config import Config
-from sfa.util.sfalogging import logger
-
+# from sfa.util.sfalogging import logger
+from sfa.storage.model import Base, AlchemyObj
from sqlalchemy import Column, Integer, String
-from sqlalchemy import Table, MetaData
-from sqlalchemy.ext.declarative import declarative_base
+# from sqlalchemy import Table, MetaData
+# # from sqlalchemy.ext.declarative import declarative_base
-# from sqlalchemy.dialects import postgresql
+# # from sqlalchemy.dialects import postgresql
-from sqlalchemy.exc import NoSuchTableError
+# from sqlalchemy.exc import NoSuchTableError
#Dict holding the columns names of the table as keys
# tablenames_dict = {'lease_table': slice_table}
-TestbedBase = declarative_base()
+# TestbedBase = declarative_base()
-class LeaseTableXP (TestbedBase):
+# class LeaseTableXP (TestbedBase):
+# class LeaseTableXP (Base,AlchemyObj):
+class LeaseTableXP (Base):
""" SQL alchemy class to manipulate the rows of the slice_iotlab table in
lease_table database. Handles the records representation and creates the
table if it does not exist yet.
return result
-class TestbedAdditionalSfaDB(object):
- """ SQL Alchemy connection class.
- From alchemy.py
- """
- # Stores the unique Singleton instance-
- _connection_singleton = None
- # defines the database name
- dbname = "testbed_xp"
-
- class Singleton:
- """
- Class used with this Python singleton design pattern to allow the
- definition of one single instance of iotlab db session in the whole
- code. Wherever a conenction to the database is needed, this class
- returns the same instance every time. Removes the need for global
- variable throughout the code.
- """
-
- def __init__(self, config, debug=False):
- self.testbed_engine = None
- self.testbed_session = None
- self.url = None
- self.create_testbed_engine(config, debug)
- self.session()
-
- def create_testbed_engine(self, config, debug=False):
- """Creates the SQLAlchemy engine, which is the starting point for
- any SQLAlchemy application.
- :param config: configuration object created by SFA based on the
- configuration file in /etc
- :param debug: if set to true, echo and echo pool will be set to true
- as well. If echo is True, all statements as well as a repr() of
- their parameter lists to the engines logger, which defaults to
- sys.stdout. If echo_pool is True, the connection pool will log all
- checkouts/checkins to the logging stream. A python logger can be
- used to configure this logging directly but so far it has not been
- configured. Refer to sql alchemy engine documentation.
-
- :type config: Config instance (sfa.util.config)
- :type debug: bool
-
- """
-
- if debug is True:
- l_echo_pool = True
- l_echo = True
- else:
- l_echo_pool = False
- l_echo = False
- # the former PostgreSQL.py used the psycopg2 directly and was doing
- #self.connection.set_client_encoding("UNICODE")
- # it's unclear how to achieve this in sqlalchemy, nor if it's needed
- # at all
- # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
- # we indeed have /var/lib/pgsql/data/postgresql.conf where
- # this setting is unset, it might be an angle to tweak that if need
- # be try a unix socket first
- # - omitting the hostname does the trick
- unix_url = "postgresql+psycopg2://%s:%s@:%s/%s" \
- % (config.SFA_DB_USER, config.SFA_DB_PASSWORD,
- config.SFA_DB_PORT, TestbedAdditionalSfaDB.dbname)
-
- # the TCP fallback method
- tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s" \
- % (config.SFA_DB_USER, config.SFA_DB_PASSWORD,
- config.SFA_DB_HOST, config.SFA_DB_PORT, TestbedAdditionalSfaDB.dbname)
-
- for url in [unix_url, tcp_url]:
- try:
- self.testbed_engine = create_engine(
- url, echo_pool=l_echo_pool, echo=l_echo)
- self.check()
- self.url = url
- return
- except:
- pass
- self.testbed_engine = None
-
- raise Exception("Could not connect to database")
-
- def check(self):
- """ Check if a table exists by trying a selection
- on the table.
-
- """
- self.testbed_engine.execute("select 1").scalar()
-
-
- def session(self):
- """
- Creates a SQLalchemy session. Once the session object is created
- it should be used throughout the code for all the operations on
- tables for this given database.
-
- """
- if self.testbed_session is None:
- Session = sessionmaker()
- self.testbed_session = Session(bind=self.testbed_engine)
- return self.testbed_session
-
- def close_session(self):
- """
- Closes connection to database.
-
- """
- if self.testbed_session is None:
- return
- self.testbed_session.close()
- self.testbed_session = None
-
-
- def update_experiments_in_additional_sfa_db(self,
- experiment_list_from_testbed, experiment_list_in_db):
- """ Cleans the iotlab db by deleting expired and cancelled jobs.
-
- Compares the list of experiment ids given by the testbed with the
- experiment ids that are already in the database, deletes the
- experiments that are no longer in the testbed experiment id list.
-
- :param experiment_list_from_testbed: list of experiment ids coming
- from testbed
- :type experiment_list_from_testbed: list
- :param experiment_list_in_db: list of experiment ids from the sfa
- additionnal database.
- :type experiment_list_in_db: list
-
- :returns: None
- """
- #Turn the list into a set
- set_experiment_list_in_db = set(experiment_list_in_db)
-
- kept_experiments = set(experiment_list_from_testbed).intersection(set_experiment_list_in_db)
- logger.debug("\r\n \t update_experiments_in_additional_sfa_db \
- experiment_list_in_db %s \r\n \
- experiment_list_from_testbed %s \
- kept_experiments %s "
- % (set_experiment_list_in_db,
- experiment_list_from_testbed, kept_experiments))
- deleted_experiments = set_experiment_list_in_db.difference(
- kept_experiments)
- deleted_experiments = list(deleted_experiments)
- if len(deleted_experiments) > 0:
- self.testbed_session.query(LeaseTableXP).filter(LeaseTableXP.experiment_id.in_(deleted_experiments)).delete(synchronize_session='fetch')
- self.testbed_session.commit()
- return
-
- def __init__(self, config, debug=False):
- self.sl_base = TestbedBase
-
- # Check whether we already have an instance
- if TestbedAdditionalSfaDB._connection_singleton is None:
- TestbedAdditionalSfaDB._connection_singleton = \
- TestbedAdditionalSfaDB.Singleton(config, debug)
-
- # Store instance reference as the only member in the handle
- self._EventHandler_singleton = \
- TestbedAdditionalSfaDB._connection_singleton
-
- def __getattr__(self, aAttr):
- """
- Delegate access to implementation.
-
- :param aAttr: Attribute wanted.
- :returns: Attribute
- """
- return getattr(self._connection_singleton, aAttr)
-
-
-
- # def __setattr__(self, aAttr, aValue):
- # """Delegate access to implementation.
-
- # :param attr: Attribute wanted.
- # :param value: Vaule to be set.
- # :return: Result of operation.
- # """
- # return setattr(self._connection_singleton, aAttr, aValue)
-
- def exists(self, tablename):
- """
- Checks if the table specified as tablename exists.
- :param tablename: name of the table in the db that has to be checked.
- :type tablename: string
- :returns: True if the table exists, False otherwise.
- :rtype: bool
-
- """
- metadata = MetaData(bind=self.testbed_engine)
- try:
- table = Table(tablename, metadata, autoload=True)
- return True
-
- except NoSuchTableError:
- logger.log_exc("SLABPOSTGRES tablename %s does not exist"
- % (tablename))
- return False
-
- def createtable(self):
- """
- Creates all the table sof the engine.
- Uses the global dictionnary holding the tablenames and the table schema.
-
- """
-
- logger.debug("IOTLABPOSTGRES createtable \
- TestbedBase.metadata.sorted_tables %s \r\n engine %s"
- % (TestbedBase.metadata.sorted_tables, self.testbed_engine))
- TestbedBase.metadata.create_all(self.testbed_engine)
- return
+# class TestbedAdditionalSfaDB(object):
+# """ SQL Alchemy connection class.
+# From alchemy.py
+# """
+# # Stores the unique Singleton instance-
+# _connection_singleton = None
+# # defines the database name
+# dbname = "testbed_xp"
+
+# class Singleton:
+# """
+# Class used with this Python singleton design pattern to allow the
+# definition of one single instance of iotlab db session in the whole
+# code. Wherever a conenction to the database is needed, this class
+# returns the same instance every time. Removes the need for global
+# variable throughout the code.
+# """
+
+# def __init__(self, config, debug=False):
+# self.testbed_engine = None
+# self.testbed_session = None
+# self.url = None
+# self.create_testbed_engine(config, debug)
+# self.session()
+
+# def create_testbed_engine(self, config, debug=False):
+# """Creates the SQLAlchemy engine, which is the starting point for
+# any SQLAlchemy application.
+# :param config: configuration object created by SFA based on the
+# configuration file in /etc
+# :param debug: if set to true, echo and echo pool will be set to true
+# as well. If echo is True, all statements as well as a repr() of
+# their parameter lists to the engines logger, which defaults to
+# sys.stdout. If echo_pool is True, the connection pool will log all
+# checkouts/checkins to the logging stream. A python logger can be
+# used to configure this logging directly but so far it has not been
+# configured. Refer to sql alchemy engine documentation.
+
+# :type config: Config instance (sfa.util.config)
+# :type debug: bool
+
+# """
+
+# if debug is True:
+# l_echo_pool = True
+# l_echo = True
+# else:
+# l_echo_pool = False
+# l_echo = False
+# # the former PostgreSQL.py used the psycopg2 directly and was doing
+# #self.connection.set_client_encoding("UNICODE")
+# # it's unclear how to achieve this in sqlalchemy, nor if it's needed
+# # at all
+# # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
+# # we indeed have /var/lib/pgsql/data/postgresql.conf where
+# # this setting is unset, it might be an angle to tweak that if need
+# # be try a unix socket first
+# # - omitting the hostname does the trick
+# unix_url = "postgresql+psycopg2://%s:%s@:%s/%s" \
+# % (config.SFA_DB_USER, config.SFA_DB_PASSWORD,
+# config.SFA_DB_PORT, TestbedAdditionalSfaDB.dbname)
+
+# # the TCP fallback method
+# tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s" \
+# % (config.SFA_DB_USER, config.SFA_DB_PASSWORD,
+# config.SFA_DB_HOST, config.SFA_DB_PORT, TestbedAdditionalSfaDB.dbname)
+
+# for url in [unix_url, tcp_url]:
+# try:
+# self.testbed_engine = create_engine(
+# url, echo_pool=l_echo_pool, echo=l_echo)
+# self.check()
+# self.url = url
+# return
+# except:
+# pass
+# self.testbed_engine = None
+
+# raise Exception("Could not connect to database")
+
+# def check(self):
+# """ Check if a table exists by trying a selection
+# on the table.
+
+# """
+# self.testbed_engine.execute("select 1").scalar()
+
+
+# def session(self):
+# """
+# Creates a SQLalchemy session. Once the session object is created
+# it should be used throughout the code for all the operations on
+# tables for this given database.
+
+# """
+# if self.testbed_session is None:
+# Session = sessionmaker()
+# self.testbed_session = Session(bind=self.testbed_engine)
+# return self.testbed_session
+
+# def close_session(self):
+# """
+# Closes connection to database.
+
+# """
+# if self.testbed_session is None:
+# return
+# self.testbed_session.close()
+# self.testbed_session = None
+
+
+ # def update_experiments_in_additional_sfa_db(self,
+ # experiment_list_from_testbed, experiment_list_in_db):
+ # """ Cleans the iotlab db by deleting expired and cancelled jobs.
+
+ # Compares the list of experiment ids given by the testbed with the
+ # experiment ids that are already in the database, deletes the
+ # experiments that are no longer in the testbed experiment id list.
+
+ # :param experiment_list_from_testbed: list of experiment ids coming
+ # from testbed
+ # :type experiment_list_from_testbed: list
+ # :param experiment_list_in_db: list of experiment ids from the sfa
+ # additionnal database.
+ # :type experiment_list_in_db: list
+
+ # :returns: None
+ # """
+ # #Turn the list into a set
+ # set_experiment_list_in_db = set(experiment_list_in_db)
+
+ # kept_experiments = set(experiment_list_from_testbed).intersection(set_experiment_list_in_db)
+ # logger.debug("\r\n \t update_experiments_in_additional_sfa_db \
+ # experiment_list_in_db %s \r\n \
+ # experiment_list_from_testbed %s \
+ # kept_experiments %s "
+ # % (set_experiment_list_in_db,
+ # experiment_list_from_testbed, kept_experiments))
+ # deleted_experiments = set_experiment_list_in_db.difference(
+ # kept_experiments)
+ # deleted_experiments = list(deleted_experiments)
+ # if len(deleted_experiments) > 0:
+ # self.testbed_session.query(LeaseTableXP).filter(LeaseTableXP.experiment_id.in_(deleted_experiments)).delete(synchronize_session='fetch')
+ # self.testbed_session.commit()
+ # return
+
+ # def __init__(self, config, debug=False):
+ # self.sl_base = TestbedBase
+
+ # # Check whether we already have an instance
+ # if TestbedAdditionalSfaDB._connection_singleton is None:
+ # TestbedAdditionalSfaDB._connection_singleton = \
+ # TestbedAdditionalSfaDB.Singleton(config, debug)
+
+ # # Store instance reference as the only member in the handle
+ # self._EventHandler_singleton = \
+ # TestbedAdditionalSfaDB._connection_singleton
+
+ # def __getattr__(self, aAttr):
+ # """
+ # Delegate access to implementation.
+
+ # :param aAttr: Attribute wanted.
+ # :returns: Attribute
+ # """
+ # return getattr(self._connection_singleton, aAttr)
+
+
+
+ # # def __setattr__(self, aAttr, aValue):
+ # # """Delegate access to implementation.
+
+ # # :param attr: Attribute wanted.
+ # # :param value: Vaule to be set.
+ # # :return: Result of operation.
+ # # """
+ # # return setattr(self._connection_singleton, aAttr, aValue)
+
+ # def exists(self, tablename):
+ # """
+ # Checks if the table specified as tablename exists.
+ # :param tablename: name of the table in the db that has to be checked.
+ # :type tablename: string
+ # :returns: True if the table exists, False otherwise.
+ # :rtype: bool
+
+ # """
+ # metadata = MetaData(bind=self.testbed_engine)
+ # try:
+ # table = Table(tablename, metadata, autoload=True)
+ # return True
+
+ # except NoSuchTableError:
+ # logger.log_exc("SLABPOSTGRES tablename %s does not exist"
+ # % (tablename))
+ # return False
+
+ # def createtable(self):
+ # """
+ # Creates all the table sof the engine.
+ # Uses the global dictionnary holding the tablenames and the table schema.
+
+ # """
+
+ # logger.debug("IOTLABPOSTGRES createtable \
+ # TestbedBase.metadata.sorted_tables %s \r\n engine %s"
+ # % (TestbedBase.metadata.sorted_tables, self.testbed_engine))
+ # TestbedBase.metadata.create_all(self.testbed_engine)
+ # return
from sfa.util.sfalogging import logger
-from sfa.iotlab.iotlabpostgres import TestbedAdditionalSfaDB, LeaseTableXP
+from sfa.iotlab.iotlabpostgres import LeaseTableXP
from sfa.iotlab.OARrestapi import OARrestapi
from sfa.iotlab.LDAPapi import LDAPapi
:type config: Config object
"""
- self.leases_db = TestbedAdditionalSfaDB(config)
+ # self.leases_db = TestbedAdditionalSfaDB(config)
self.oar = OARrestapi()
self.ldap = LDAPapi()
self.time_format = "%Y-%m-%d %H:%M:%S"
return jobid
- def AddLeases(self, hostname_list, slice_record,
- lease_start_time, lease_duration):
- """Creates a job in OAR corresponding to the information provided
- as parameters. Adds the job id and the slice hrn in the iotlab
- database so that we are able to know which slice has which nodes.
-
- :param hostname_list: list of nodes' OAR hostnames.
- :param slice_record: sfa slice record, must contain login and hrn.
- :param lease_start_time: starting time , unix timestamp format
- :param lease_duration: duration in minutes
-
- :type hostname_list: list
- :type slice_record: dict
- :type lease_start_time: integer
- :type lease_duration: integer
- :returns: job_id, can be None if the job request failed.
-
- """
- logger.debug("IOTLAB_API \r\n \r\n \t AddLeases hostname_list %s \
- slice_record %s lease_start_time %s lease_duration %s "\
- %( hostname_list, slice_record , lease_start_time, \
- lease_duration))
-
- #tmp = slice_record['reg-researchers'][0].split(".")
- username = slice_record['login']
- #username = tmp[(len(tmp)-1)]
- job_id = self.LaunchExperimentOnOAR(hostname_list, \
- slice_record['hrn'], \
- lease_start_time, lease_duration, \
- username)
- if job_id is not None:
- start_time = \
- datetime.fromtimestamp(int(lease_start_time)).\
- strftime(self.time_format)
- end_time = lease_start_time + lease_duration
-
-
- logger.debug("IOTLAB_API \r\n \r\n \t AddLeases TURN ON LOGGING SQL \
- %s %s %s "%(slice_record['hrn'], job_id, end_time))
-
-
- logger.debug("IOTLAB_API \r\n \r\n \t AddLeases %s %s %s " \
- %(type(slice_record['hrn']), type(job_id), type(end_time)))
-
- iotlab_ex_row = LeaseTableXP(slice_hrn = slice_record['hrn'],
- experiment_id=job_id,
- end_time= end_time)
-
- logger.debug("IOTLAB_API \r\n \r\n \t AddLeases iotlab_ex_row %s" \
- %(iotlab_ex_row))
- self.leases_db.testbed_session.add(iotlab_ex_row)
- self.leases_db.testbed_session.commit()
-
- logger.debug("IOTLAB_API \t AddLeases hostname_list start_time %s "
- %(start_time))
-
- return job_id
#Delete the jobs from job_iotlab table
return filtered_reservation_list
- def GetLeases(self, lease_filter_dict=None, login=None):
- """
-
- Get the list of leases from OAR with complete information
- about which slice owns which jobs and nodes.
- Two purposes:
- -Fetch all the jobs from OAR (running, waiting..)
- complete the reservation information with slice hrn
- found in testbed_xp table. If not available in the table,
- assume it is a iotlab slice.
- -Updates the iotlab table, deleting jobs when necessary.
-
- :returns: reservation_list, list of dictionaries with 'lease_id',
- 'reserved_nodes','slice_id', 'state', 'user', 'component_id_list',
- 'slice_hrn', 'resource_ids', 't_from', 't_until'
- :rtype: list
-
- """
-
- unfiltered_reservation_list = self.GetReservedNodes(login)
-
- reservation_list = []
- #Find the slice associated with this user iotlab ldap uid
- logger.debug(" IOTLAB_API.PY \tGetLeases login %s\
- unfiltered_reservation_list %s "
- % (login, unfiltered_reservation_list))
- #Create user dict first to avoid looking several times for
- #the same user in LDAP SA 27/07/12
- job_oar_list = []
- jobs_psql_query = self.leases_db.testbed_session.query(LeaseTableXP).all()
- jobs_psql_dict = dict([(row.experiment_id, row.__dict__)
- for row in jobs_psql_query])
- #jobs_psql_dict = jobs_psql_dict)
- logger.debug("IOTLAB_API \tGetLeases jobs_psql_dict %s"
- % (jobs_psql_dict))
- jobs_psql_id_list = [row.experiment_id for row in jobs_psql_query]
-
- for resa in unfiltered_reservation_list:
- logger.debug("IOTLAB_API \tGetLeases USER %s"
- % (resa['user']))
- #Construct list of jobs (runing, waiting..) in oar
- job_oar_list.append(resa['lease_id'])
- #If there is information on the job in IOTLAB DB ]
- #(slice used and job id)
- if resa['lease_id'] in jobs_psql_dict:
- job_info = jobs_psql_dict[resa['lease_id']]
- logger.debug("IOTLAB_API \tGetLeases job_info %s"
- % (job_info))
- resa['slice_hrn'] = job_info['slice_hrn']
- resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
-
- #otherwise, assume it is a iotlab slice:
- else:
- resa['slice_id'] = hrn_to_urn(self.root_auth + '.' +
- resa['user'] + "_slice", 'slice')
- resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
-
- resa['component_id_list'] = []
- #Transform the hostnames into urns (component ids)
- for node in resa['reserved_nodes']:
-
- iotlab_xrn = xrn_object(self.root_auth, node)
- resa['component_id_list'].append(iotlab_xrn.urn)
-
- if lease_filter_dict:
- logger.debug("IOTLAB_API \tGetLeases \
- \r\n leasefilter %s" % ( lease_filter_dict))
-
- filter_dict_functions = {
- 'slice_hrn' : IotlabShell.filter_lease_name,
- 't_from' : IotlabShell.filter_lease_start_time
- }
- reservation_list = list(unfiltered_reservation_list)
- for filter_type in lease_filter_dict:
- logger.debug("IOTLAB_API \tGetLeases reservation_list %s" \
- % (reservation_list))
- reservation_list = filter_dict_functions[filter_type](\
- reservation_list,lease_filter_dict[filter_type] )
-
- # Filter the reservation list with a maximum timespan so that the
- # leases and jobs running after this timestamp do not appear
- # in the result leases.
- # if 'start_time' in :
- # if resa['start_time'] < lease_filter_dict['start_time']:
- # reservation_list.append(resa)
-
-
- # if 'name' in lease_filter_dict and \
- # lease_filter_dict['name'] == resa['slice_hrn']:
- # reservation_list.append(resa)
-
-
- if lease_filter_dict is None:
- reservation_list = unfiltered_reservation_list
-
- self.leases_db.update_experiments_in_additional_sfa_db(job_oar_list, jobs_psql_id_list)
-
- logger.debug(" IOTLAB_API.PY \tGetLeases reservation_list %s"
- % (reservation_list))
- return reservation_list
logger.debug("IOTLABSLICES verify_slice_leases sfa_slice %s "
% (sfa_slice))
#First get the list of current leases from OAR
- leases = self.driver.testbed_shell.GetLeases({'slice_hrn': sfa_slice['hrn']})
+ leases = self.driver.GetLeases({'slice_hrn': sfa_slice['hrn']})
logger.debug("IOTLABSLICES verify_slice_leases requested_jobs_dict %s \
leases %s " % (requested_jobs_dict, leases))
logger.debug("IOTLABSLICES \
NEWLEASE slice %s job %s"
% (sfa_slice, job))
- job_id = self.driver.testbed_shell.AddLeases(
+ job_id = self.driver.AddLeases(
job['hostname'],
sfa_slice, int(job['start_time']),
int(job['duration']))
if job_id is not None:
- new_leases = self.driver.testbed_shell.GetLeases(login=
+ new_leases = self.driver.GetLeases(login=
sfa_slice['login'])
for new_lease in new_leases:
leases.append(new_lease)
if reschedule_jobs_dict:
for start_time in reschedule_jobs_dict:
job = reschedule_jobs_dict[start_time]
- self.driver.testbed_shell.AddLeases(
+ self.driver.AddLeases(
job['hostname'],
sfa_slice, int(job['start_time']),
int(job['duration']))