1 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
2 # Copyright (C) 2010 INRIA
8 from TestSsh import TestSsh
12 def __init__(self, test_plc, test_node, test_slice):
13 self.test_plc = test_plc
14 self.test_node = test_node
15 self.test_slice = test_slice
16 self.test_ssh = self.create_test_ssh()
18 def get_privateKey(self):
19 slice_spec = self.test_slice.slice_spec
21 (found, privatekey) = self.test_slice.locate_key()
22 return (found, privatekey)
23 except Exception as e:
26 def create_test_ssh(self):
27 private_key = self.test_slice.locate_private_key()
29 raise Exception("Cannot find the private key for slice {}".format(self.test_slice.name()))
30 return TestSsh (self.test_node.name(), key=private_key, username=self.test_slice.name(),
31 # so that copies end up in the home dir
35 return "{}@{}".format(self.test_slice.name(), self.test_node.name())
37 def check_initscript_stamp(self, stamp):
38 utils.header("Checking for initscript stamp {} on sliver {}".format(stamp, self.name()))
39 return self.test_ssh.run("ls -l /var/tmp/{}.stamp".format(stamp)) == 0
41 def check_tcp_ready (self, port):
42 ready_command = "./tcptest.py ready -p {}".format(port)
43 return self.test_ssh.copy("tcptest.py") == 0 and \
44 self.test_ssh.run(ready_command) == 0
46 def run_tcp_server (self, port, timeout=10):
47 server_command = "./tcptest.py server -p {} -t {}".format(port, timeout)
48 return self.test_ssh.copy("tcptest.py") == 0 and \
49 self.test_ssh.run(server_command, background=True) == 0
51 def run_tcp_client (self, servername, port, retry=5):
52 client_command = "./tcptest.py client -a {} -p {}".format(servername, port)
53 if self.test_ssh.copy("tcptest.py") != 0:
55 if self.test_ssh.run(client_command) == 0:
59 # use the node's main ssh root entrance, as the slice entrance might be down
60 #def tar_var_logs (self):
61 # return self.test_ssh.actual_command("sudo tar -C /var/log -cf - .")
62 def tar_var_logs (self):
63 test_ssh = self.test_node.create_test_ssh()
64 dir_to_tar = "/vservers/{}/var/log".format(self.test_slice.name())
65 return test_ssh.actual_command("tar -C {} -cf - .".format(dir_to_tar))
67 def check_hooks (self):
68 print('NOTE: slice hooks check scripts NOT (yet?) run in sudo')
69 extensions = [ 'py','pl','sh' ]
71 scripts = utils.locate_hooks_scripts ('sliver '+self.name(), path,extensions)
73 for script in scripts:
74 if not self.check_hooks_script (script):
78 def check_hooks_script (self,local_script):
79 script_name = os.path.basename(local_script)
80 utils.header ("SLIVER hook {} ({})".format(script_name, self.name()))
81 test_ssh = self.create_test_ssh()
82 test_ssh.copy_home(local_script)
83 if test_ssh.run("./"+script_name) != 0:
84 utils.header ("WARNING: hooks check script {} FAILED (ignored)".format(script_name))
88 utils.header ("SUCCESS: sliver hook {} OK".format(script_name))