Updated slice manager.
authorTony Mack <tmack@cs.princeton.edu>
Wed, 14 Jan 2009 12:36:20 +0000 (12:36 +0000)
committerTony Mack <tmack@cs.princeton.edu>
Wed, 14 Jan 2009 12:36:20 +0000 (12:36 +0000)
plc/slicemgr.py

index a61909a..16041e8 100644 (file)
-##
-# SliceMgr is a GeniServer that implements the Slice interface at PLC
-
-import tempfile
 import os
-import time
 import sys
+import datetime
+import time
 
-from util.hierarchy import Hierarchy
-from util.trustedroot import TrustedRootList
-from util.cert import Keypair, Certificate
-from util.gid import GID
-from util.geniserver import GeniServer
-from util.record import GeniRecord
-from util.genitable import GeniTable
-from util.geniticket import Ticket
+from util.geniserver import *
+from util.geniclient import *
+from util.cert import *
+from util.trustedroot import *
 from util.excep import *
 from util.misc import *
-
-from util.config import *
-
-##
-# SliceMgr class extends GeniServer class
+from util.config import Config
 
 class SliceMgr(GeniServer):
+
+    hrn = None
+    key_file = None
+    cert_file = None
+    components_file = None
+    slices_file = None 
+    components_ttl = None
+    components = []
+    slices = []        
+    policies = {}
+    timestamp = None
+    threshold = None   
+    shell = None
+    aggregates = {}
+       
+  
     ##
     # Create a new slice manager object.
     #
     # @param ip the ip address to listen on
     # @param port the port to listen on
     # @param key_file private key filename of registry
-    # @param cert_file certificate filename containing public key (could be a GID file)
-
-    def __init__(self, ip, port, key_file, cert_file):
-        GeniServer.__init__(self, ip, port, key_file, cert_file)
-
-        # get PL account settings from config module
-        self.pl_auth = get_pl_auth()
-
-        # connect to planetlab
-        if "Url" in self.pl_auth:
-            self.connect_remote_shell()
-        else:
-            self.connect_local_shell()
-
-    ##
-    # Connect to a remote shell via XMLRPC
-
-    def connect_remote_shell(self):
-        import remoteshell
-        self.shell = remoteshell.RemoteShell()
-
-    ##
-    # Connect to a local shell via local API functions
-
-    def connect_local_shell(self):
-        import PLC.Shell
-        self.shell = PLC.Shell.Shell(globals = globals())
-
-    ##
-    # Register the server RPCs for the slice interface
+    # @param cert_file certificate filename containing public key (could be a GID file)     
+
+    def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/util/geni_config"):
+        GeniServer.__init__(ip, port, key_file, cert_file)
+       self.key_file = key_file
+       self.cert_file = cert_file
+       self.conf = Config(config)
+        basedir = self.conf.GENI_BASE_DIR + os.sep
+        server_basedir = basedir + os.sep + "plc" + os.sep
+       self.hrn = conf.GENI_INTERFACE_HRN
+       
+       # Get list of aggregates this sm talks to
+       aggregates_file = server_basedir + os.sep + 'aggregates'
+       self.load_aggregates(aggregates_file) 
+       self.components_file = os.sep.join([server_basedir, 'components', 'slicemgr.' + hrn + '.comp'])
+       self.slices_file = os.sep.join([server_basedir, 'components', 'slicemgr' + hrn + '.slices'])
+       self.timestamp_file = os.sep.join([server_basedir, 'components', 'slicemgr' + hrn + '.timestamp']) 
+       self.components_ttl = components_ttl
+       self.connect()
+
+    def load_aggregates(self, aggregates_file):
+       """
+       Get info about the aggregates available to us from file and create 
+         an xmlrpc connection to each. If any info is invalid, skip it. 
+       """
+       lines = []
+        try:
+           f = open(aggregates_file, 'r')
+           lines = f.readlines()
+           f.close()
+       except: raise 
+       
+       for line in lines:
+           # Skip comments
+           if line.strip.startswith("#"):
+               continue
+           agg_info = line.split("\t").split(" ")
+           
+           # skip invalid info
+           if len(agg_info) != 3:
+               continue
+
+           # create xmlrpc connection using GeniClient
+           hrn, address, port = agg_info[0], agg_info[1], agg_info[2]
+           url = 'https://%(address)s:%(port)s' % locals()
+           self.aggregates[hrn] = GeniClient(url, self.key_file, self.cert_file)
+
+    def item_hrns(self, items):
+       """
+       Take a list of items (components or slices) and return a dictionary where
+       the key is the authoritative hrn and the value is a list of items at that 
+       hrn.
+       """
+       item_hrns = {}
+       agg_hrns = self.aggregates.keys()
+       for agg_hrn in agg_hrns:
+           item_hrns[agg_hrn] = []
+       for item in items:
+           for agg_hrn in agg_hrns:
+               if item.startswith(agg_hrn):
+                  item_hrns[agg_hrn] = item
+
+       return item_hrns        
+                       
+
+    def hostname_to_hrn(self, login_base, hostname):
+       """
+       Convert hrn to plantelab name.
+       """
+        genihostname = "_".join(hostname.split("."))
+        return ".".join([self.hrn, login_base, genihostname])
+
+    def slicename_to_hrn(self, slicename):
+       """
+       Convert hrn to planetlab name.
+       """
+        slicename = slicename.replace("_", ".")
+        return ".".join([self.hrn, slicename])
+
+    def refresh_components(self):
+       """
+       Update the cached list of nodes and slices.
+       """
+       print "refreshing"
+       
+       aggregates = self.aggregates.keys()
+       all_nodes = []
+       all_slices = []
+       for aggregate in aggregates:
+           try:
+               # resolve components hostnames
+               nodes = self.aggregates[aggregate].get_components()
+               all_nodes.extend(nodes) 
+               # resolve slices
+               slices = self.aggregates[aggregate].get_slices()
+               all_slices.extend(slice)
+               # update timestamp and threshold
+               self.timestamp = datetime.datetime.now()
+               delta = datetime.timedelta(hours=self.components_ttl)
+               self.threshold = self.timestamp + delta 
+           except:
+               # XX print out to some error log
+               pass    
+   
+       self.components = all_nodes
+       self.slices = all_slices        
+       f = open(self.components_file, 'w')
+       f.write(str(self.components))
+       f.close()
+       f = open(self.slices_file, 'w')
+       f.write(str(self.slices))
+       f.close()
+       f = open(self.timestamp_file, 'w')
+       f.write(str(self.threshold))
+       f.close()
+    def load_components(self):
+       """
+       Read cached list of nodes and slices.
+       """
+       print "loading"
+       # Read component list from cached file 
+       if os.path.exists(self.components_file):
+           f = open(self.components_file, 'r')
+           self.components = eval(f.read())
+           f.close()
+       
+       if os.path.exists(self.slices_file):
+            f = open(self.components_file, 'r')
+            self.slices = eval(f.read())
+            f.close()
+
+       time_format = "%Y-%m-%d %H:%M:%S"
+       if os.path.exists(self.timestamp_file):
+           f = open(self.timestamp_file, 'r')
+           timestamp = str(f.read()).split(".")[0]
+           self.timestamp = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format)))
+           delta = datetime.timedelta(hours=self.components_ttl)
+            self.threshold = self.timestamp + delta
+           f.close()   
+
+    def get_components(self):
+       """
+       Return a list of components managed by this slice manager.
+       """
+       # Reload components list
+       now = datetime.datetime.now()
+       #self.load_components()
+       if not self.threshold or not self.timestamp or now > self.threshold:
+           self.refresh_components()
+       elif now < self.threshold and not self.components: 
+           self.load_components()
+       return self.components
+   
+     
+    def get_slices(self):
+       """
+       Return a list of instnatiated managed by this slice manager.
+       """
+       now = datetime.datetime.now()
+       #self.load_components()
+       if not self.threshold or not self.timestamp or now > self.threshold:
+           self.refresh_components()
+       elif now < self.threshold and not self.slices:
+           self.load_components()
+       return self.slices
+
+    def get_rspec(self, hrn, type):
+       #rspec = Rspec()
+       if type in ['node']:
+           nodes = self.shell.GetNodes(self.auth)
+       elif type in ['slice']:
+           slices = self.shell.GetSlices(self.auth)
+       elif type in ['aggregate']:
+           pass
+
+    def get_resources(self, slice_hrn):
+       """
+       Return the current rspec for the specified slice.
+       """
+       slicename = hrn_to_plcslicename(slice_hrn)
+       rspec = self.get_rspec(slicenamem, 'slice' )
+        
+       return rspec
+    def create_slice(self, slice_hrn, rspec):
+       """
+       Instantiate the specified slice according to whats defined in the rspec.
+       """
+       slicename = self.hrn_to_plcslicename(slice_hrn)
+       #spec = Rspec(rspec)
+       #components = spec.components()
+       #shell.AddSliceToNodes(self.auth, slicename, components)
+       return 1
+       
+    def delete_slice_(self, slice_hrn):
+       """
+       Remove this slice from all components it was previouly associated with and 
+       free up the resources it was using.
+       """
+       slicename = self.hrn_to_plcslicename(slice_hrn)
+       rspec = self.get_resources(slice_hrn)
+       components = rspec.components()
+       shell.DeleteSliceFromNodes(self.auth, slicename, components)
+       return 1
+
+    def start_slice(self, slice_hrn):
+       """
+       Stop the slice at plc.
+       """
+       slicename = hrn_to_plcslicename(slice_hrn)
+       slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id'])
+       if not slices:
+           raise RecordNotFound(slice_hrn)
+       slice_id = slices[0]
+       atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
+       attribute_id = attreibutes[0] 
+       self.shell.UpdateSliceAttribute(self.auth, attribute_id, "1" )
+       return 1
+
+    def stop_slice(self, slice_hrn):
+       """
+       Stop the slice at plc
+       """
+       slicename = hrn_to_plcslicename(slice_hrn)
+       slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id'])
+        if not slices:
+            raise RecordNotFound(slice_hrn)
+        slice_id = slices[0]
+        atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
+        attribute_id = attreibutes[0]
+       self.shell.UpdateSliceAttribute(self.auth, attribute_id, "0")
+       return 1
+
+    def reset_slice(self, slice_hrn):
+       """
+       Reset the slice
+       """
+       slicename = self.hrn_to_plcslicename(slice_hrn)
+       return 1
+
+    def get_policy(self):
+       """
+       Return this aggregates policy as an rspec
+       """
+       rspec = self.get_rspec(self.hrn, 'aggregate')
+       return rspec
+       
+       
+
+##############################
+## Server methods here for now
+##############################
+
+    def nodes(self):
+        return self..get_components()
+
+    def slices(self):
+        return self.get_slices()
+
+    def resources(self, cred, hrn):
+        self.decode_authentication(cred, 'info')
+        self.verify_object_belongs_to_me(hrn)
+
+        return self.get_resources(hrn)
+
+    def create(self, cred, hrn, rspec):
+        self.decode_authentication(cred, 'embed')
+        self.verify_object_belongs_to_me(hrn, rspec)
+        return self.create(hrn)
+
+    def delete(self, cred, hrn):
+        self.decode_authentication(cred, 'embed')
+        self.verify_object_belongs_to_me(hrn)
+        return self.delete_slice(hrn)
+
+    def start(self, cred, hrn):
+        self.decode_authentication(cred, 'control')
+        return self.start(hrn)
+
+    def stop(self, cred, hrn):
+        self.decode_authentication(cred, 'control')
+        return self.stop(hrn)
+
+    def reset(self, cred, hrn):
+        self.decode_authentication(cred, 'control')
+        return self.reset(hrn)
+
+    def policy(self, cred):
+        self.decode_authentication(cred, 'info')
+        return self.get_policy()
 
     def register_functions(self):
         GeniServer.register_functions(self)
-        # slice interface
-        self.server.register_function(self.create_slice)
-        self.server.register_function(self.get_ticket)
-        self.server.register_function(self.redeem_ticket)
-        self.server.register_function(self.start_slice)
-        self.server.register_function(self.stop_slice)
-        self.server.register_function(self.reset_slice)
-        self.server.register_function(self.delete_slice)
-        self.server.register_function(self.get_slice_resources)
-        self.server.register_function(self.list_slices)
-        self.server.register_function(self.list_nodes)
-
-    ##
-    # create_slice: Create (instantiate) a slice. 
-    #
-    # @param cred credential string
-    # @param name name of the slice to retrieve a ticket for
-    # @param rspec resource specification dictionary
-    #
-    # @return the string representation of a ticket object
-
-    def create_slice(self, cred, name, rspec):
-        self.decode_authentication(cred, "createslice")
-        slicename = hrn_to_pl_slicename(self.object_gid.get_hrn())
-        # extract per-aggregate netspec from rspec
-        # call create_slice on each aggregate
-
-    ##
-    # get_ticket: Retrieve a ticket. 
-    #
-    # This operation is not supported as part of a slice manager
-    #
-    # @param cred credential string
-    # @param name name of the slice to retrieve a ticket for
-    # @param rspec resource specification dictionary
-    #
-
-    def get_ticket(self, cred, name, rspec):
-        return anything
-
-    ##
-    # redeem_ticket: Redeem a ticket. 
-    #
-    # This operation is not supported as part of a slice manager
-    #
-    # @param cred credential string
-    # @param name name of the slice to retrieve a ticket for
-    # @param rspec resource specification dictionary
-    #
-
-    def redeem_ticket(self, cred, name, rspec):
-        return anything
-
-    ##
-    # stop_slice: Stop a slice.
-    #
-    # @param cred a credential identifying the caller (callerGID) and the slice
-    #     (objectGID)
-
-    def stop_slice(self, cred_str):
-        self.decode_authentication(cred_str, "stopslice")
-        slicename = hrn_to_pl_slicename(self.object_gid.get_hrn())
-        # call stop_slice on each aggregate that hosts the slice
-
-    ##
-    # start_slice: Start a slice.
-    #
-    # @param cred a credential identifying the caller (callerGID) and the slice
-    #     (objectGID)
-
-    def start_slice(self, cred_str):
-        self.decode_authentication(cred_str, "startslice")
-        slicename = hrn_to_pl_slicename(self.object_gid.get_hrn())
-        # call start_slice on each aggregate that hosts the slice
-
-    ##
-    # reset_slice: Reset a slice.
-    #
-    # @param cred a credential identifying the caller (callerGID) and the slice
-    #     (objectGID)
-
-    def reset_slice(self, cred_str):
-        self.decode_authentication(cred_str, "resetslice")
-        slicename = hrn_to_pl_slicename(self.object_gid.get_hrn())
-        # call reset_slice on each aggregate that hosts the slice
-
-    ##
-    # delete_slice: Delete a slice.
-    #
-    # @param cred a credential identifying the caller (callerGID) and the slice
-    #     (objectGID)
-
-    def delete_slice(self, cred_str):
-        self.decode_authentication(cred_str, "deleteslice")
-        slicename = hrn_to_pl_slicename(self.object_gid.get_hrn())
-        # call delete_slice on each aggregate that hosts the slice
-
-    ##
-    # get_slice_resources: Get resources allocated to slice
-    #
-    # @param cred a credential identifying the caller (callerGID) and the slice
-    #     (objectGID)
-
-    def get_slice_resources(self, cred_str):
-        self.decode_authentication(cred_str, "getsliceresources")
-        slicename = hrn_to_pl_slicename(self.object_gid.get_hrn())
-        # call get_resources on each aggregate that hosts the slice
-        # merge returned netspecs into one big rspec
-
-    ##
-    # list_slices: List hosted slices.
-    #
-    # @param cred a credential identifying the caller (callerGID)
-
-    def list_slices(self, cred_str):
-        self.decode_authentication(cred_str, "listslices")
-        # probably have this information cached, so return that
-        # otherwise, call list_slices on all peer aggregates
-
-    ##
-    # list_nodes: List available nodes.
-    #
-    # @param cred a credential identifying the caller (callerGID)
 
-    def list_nodes(self, cred_str):
-        self.decode_authentication(cred_str, "listslices")
-        # probably have this information cached, so return that
-        # otherwise, call list_nodes on all peer aggregates
+        # Aggregate interface methods
+        self.server.register_function(self.components)
+        self.server.register_function(self.slices)
+        self.server.register_function(self.resources)
+        self.server.register_function(self.create)
+        self.server.register_function(self.delete)
+        self.server.register_function(self.start)
+        self.server.register_function(self.stop)
+        self.server.register_function(self.reset)
+        self.server.register_function(self.policy)
+