X-Git-Url: http://git.onelab.eu/?p=tests.git;a=blobdiff_plain;f=system%2FTestSliceSfa.py;h=61ae978eafeb437950f99d46add6581de6a8fc0f;hp=c0c868c20b2c72697ba3be485797d43a09088e2e;hb=f1ddea71ade10047c25a22d312b7b7cb0ff2bccd;hpb=5148b9f217612f5044034a3f2efd7a69a9a25529 diff --git a/system/TestSliceSfa.py b/system/TestSliceSfa.py index c0c868c..61ae978 100644 --- a/system/TestSliceSfa.py +++ b/system/TestSliceSfa.py @@ -32,8 +32,8 @@ class TestSliceSfa: # send back up to the TestAuthSfa def sfi_path (self): return self.test_auth_sfa.sfi_path() def rspec_style (self): return self.test_auth_sfa.rspec_style() - def sfi_pi(self,*args,**kwds): return self.test_auth_sfa.sfi_pi(*args, **kwds) - def sfi_user(self,*args,**kwds): return self.test_auth_sfa.sfi_user(*args, **kwds) + def sfi_pi(self, *args, **kwds): return self.test_auth_sfa.sfi_pi(*args, **kwds) + def sfi_user(self, *args, **kwds): return self.test_auth_sfa.sfi_user(*args, **kwds) def discover_option(self): if self.rspec_style() == 'pg': @@ -44,7 +44,7 @@ class TestSliceSfa: # those are step names exposed as methods of TestPlc, hence the _sfa # needs to be run as pi - def sfa_register_slice(self,options): + def sfa_register_slice(self, options): "run sfi register (on Registry)" sfi_command = "register" sfi_command += " --type slice" @@ -60,7 +60,7 @@ class TestSliceSfa: too_late = "+12m" # one_month = "+4w" # we expect this to fail on too long term attemps, but to succeed otherwise - overall=True + overall = True for ( renew_until, expected) in [ (too_late, False), (one_month, True) ] : sfi_command = "renew" sfi_command += " {}".format(self.hrn()) @@ -79,7 +79,7 @@ class TestSliceSfa: def sfa_get_expires (self, options): filename = "{}.json".format(self.hrn()) # /root/sfi/pg/<> - inplc_filename = os.path.join(self.sfi_path(),filename) + inplc_filename = os.path.join(self.sfi_path(), filename) # /vservers/<>/root/sfi/... - cannot use os.path inbox_filename = "{}{}".format(self.test_plc.vm_root_in_host(), inplc_filename) sfi_command = "" @@ -88,7 +88,7 @@ class TestSliceSfa: sfi_command += " {}".format(self.hrn()) # cannot find it if sfi status returns an error if self.test_plc.run_in_guest (self.sfi_user(sfi_command)) !=0: return - if self.test_plc.test_ssh.fetch(inbox_filename,filename)!=0: return + if self.test_plc.test_ssh.fetch(inbox_filename, filename)!=0: return try: with open(filename) as f: status = json.loads(f.read()) @@ -101,20 +101,20 @@ class TestSliceSfa: traceback.print_exc() # helper - filename to store a given result - def _resname (self,name,ext): return "{}.{}".format(name, ext) - def adfile (self): return self._resname("ad","rspec") - def reqfile (self): return self._resname("req","rspec") + def _resname (self, name, ext): return "{}.{}".format(name, ext) + def adfile (self): return self._resname("ad", "rspec") + def reqfile (self): return self._resname("req", "rspec") def empty_reqfile (self): return "empty-rspec.xml" - def nodefile (self): return self._resname("nodes","txt") + def nodefile (self): return self._resname("nodes", "txt") # run as user - def sfa_discover(self,options): + def sfa_discover(self, options): "discover resources into resouces_in.rspec" return self.test_plc.run_in_guest(self.sfi_user(\ "resources {} -o {}/{}"\ .format(self.discover_option(),self.sfi_path(),self.adfile()))) == 0 - def sfa_rspec(self,options): + def sfa_rspec(self, options): "invoke sfiListNodes and sfiAddSlivers to prepare a rspec" commands = [ "sfiListNodes.py -i {}/{} -o {}/{}".format(self.sfi_path(), self.adfile(), @@ -127,18 +127,18 @@ class TestSliceSfa: if self.test_plc.run_in_guest(command) != 0: return False return True - def _sfa_allocate(self,file,options): + def _sfa_allocate(self, file, options): command = self.sfi_user("allocate {} {}".format(self.hrn(), file)) return self.test_plc.run_in_guest(command) == 0 - def sfa_allocate(self,options): + def sfa_allocate(self, options): "invoke run sfi allocate (on SM)" - return self._sfa_allocate(self.reqfile(),options) - def sfa_allocate_empty(self,options): + return self._sfa_allocate(self.reqfile(), options) + def sfa_allocate_empty(self, options): "invoke run sfi allocate (on SM) with an empty rspec" - return self._sfa_allocate(self.empty_reqfile(),options) + return self._sfa_allocate(self.empty_reqfile(), options) - def sfa_provision(self,options): + def sfa_provision(self, options): "invoke run sfi provision (on SM)" command = self.sfi_user("provision {}".format(self.hrn())) return self.test_plc.run_in_guest(command) == 0 @@ -149,7 +149,7 @@ class TestSliceSfa: return "{}_{}".format(self.test_auth_sfa.login_base, self.slice_spec['name']) # all local nodes in slice ? - def sfa_check_slice_plc (self,options): + def sfa_check_slice_plc (self, options): "check the slice has been created at the plc - all local nodes should be in slice" slice = self.test_plc.apiserver.GetSlices(self.test_plc.auth_root(), self.plc_name())[0] nodes = self.test_plc.apiserver.GetNodes(self.test_plc.auth_root(), {'peer_id':None}) @@ -164,22 +164,22 @@ class TestSliceSfa: return result # no node left in slice ? - def sfa_check_slice_plc_empty (self,options): + def sfa_check_slice_plc_empty (self, options): "check the slice have been emptied at the plcs - no node should be in slice" slices = self.test_plc.apiserver.GetSlices(self.test_plc.auth_root(), - self.plc_name(), - ['node_ids']) + self.plc_name(), + ['node_ids']) return not slices[0]['node_ids'] # xxx historically this used to do the same as sfa-create-slice # which was later on split into 3 distinct steps, # and we can ignore the first that is about setting up the rspec - def sfa_update_slice(self,options): + def sfa_update_slice(self, options): "re-run sfi allocate and provision (on SM) on existing object" return self.sfa_allocate(options) and self.sfa_provision(options) # run as pi - def sfa_delete_slice(self,options): + def sfa_delete_slice(self, options): "run sfi delete" self.test_plc.run_in_guest(self.sfi_pi("delete {}".format(self.hrn()))) return self.test_plc.run_in_guest(self.sfi_pi("remove -t slice {}".format(self.hrn()))) == 0