2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010-2015 INRIA
5 # #################### history
7 # see also Substrate.readme
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity,
14 # hence the 'timestamp-*' steps in TestPlc
16 # this should be much more flexible:
17 # * supports several plc boxes
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
21 # #################### howto use
23 # each site is to write its own LocalSubstrate.py,
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
28 # . the vserver-capable boxes used for hosting myplcs
29 # . and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
32 # . the kvm-qemu capable boxes to host qemu instances
33 # . and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
36 # #################### implem. note
38 # this model relies on 'sensing' the substrate,
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies
41 # however there is a 'black hole' between the time where a given address is
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class
47 # ####################
55 from optparse import OptionParser
58 from TestSsh import TestSsh
59 from TestMapper import TestMapper
60 from functools import reduce
62 # too painful to propagate this cleanly
65 def header (message, banner=True):
66 if not message: return
68 print("===============", end=' ')
72 def timestamp_key(o): return o.timestamp
74 def short_hostname (hostname):
75 return hostname.split('.')[0]
78 # the place were other test instances tell about their not-yet-started
79 # instances, that go undetected through sensing
82 location = '/root/starting'
92 with open(Starting.location) as starting:
93 self.tuples = [line.strip().split('@') for line in starting.readlines()]
99 return [ x for (x, _) in self.tuples ]
101 def add (self, vname, bname):
102 if not vname in self.vnames():
103 with open(Starting.location, 'a') as out:
104 out.write("{}@{}\n".format(vname, bname))
106 def delete_vname (self, vname):
108 if vname in self.vnames():
109 with open(Starting.location, 'w') as f:
110 for (v, b) in self.tuples:
112 f.write("{}@{}\n".format(v, b))
116 # allows to pick an available IP among a pool
117 # input is expressed as a list of tuples (hostname,ip,user_data)
118 # that can be searched iteratively for a free slot
120 # pool = [ (hostname1,user_data1),
121 # (hostname2,user_data2),
122 # (hostname3,user_data2),
123 # (hostname4,user_data4) ]
124 # assuming that ip1 and ip3 are taken (pingable), then we'd get
126 # pool.next_free() -> entry2
127 # pool.next_free() -> entry4
128 # pool.next_free() -> None
129 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
132 def __init__ (self, hostname, userdata):
133 self.hostname = hostname
134 self.userdata = userdata
135 # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
136 # 'mine' is for our own stuff, 'starting' from the concurrent tests
141 return "<PoolItem {} {}>".format(self.hostname, self.userdata)
144 return "Pooled {} ({}) -> {}".format(self.hostname, self.userdata, self.status)
147 if self.status == None: return '?'
148 elif self.status == 'busy': return '+'
149 elif self.status == 'free': return '-'
150 elif self.status == 'mine': return 'M'
151 elif self.status == 'starting': return 'S'
154 if self.ip: return self.ip
155 ip=socket.gethostbyname(self.hostname)
161 def __init__ (self, tuples, message, substrate):
162 self.pool_items = [ PoolItem (hostname, userdata) for (hostname, userdata) in tuples ]
163 self.message = message
164 # where to send notifications upon load_starting
165 self.substrate = substrate
168 return "<Pool {} {}>".format(self.message, self.tuples)
170 def list (self, verbose=False):
171 for i in self.pool_items: print(i.line())
175 for i in self.pool_items: line += ' ' + i.char()
178 def _item (self, hostname):
179 for i in self.pool_items:
180 if i.hostname == hostname: return i
181 raise Exception ("Could not locate hostname {} in pool {}".format(hostname, self.message))
183 def retrieve_userdata (self, hostname):
184 return self._item(hostname).userdata
186 def get_ip (self, hostname):
188 return self._item(hostname).get_ip()
190 return socket.gethostbyname(hostname)
192 def set_mine (self, hostname):
194 self._item(hostname).status='mine'
196 print('WARNING: host {} not found in IP pool {}'.format(hostname, self.message))
198 def next_free (self):
199 for i in self.pool_items:
200 if i.status == 'free':
202 return (i.hostname, i.userdata)
206 # we have a starting instance of our own
207 def add_starting (self, vname, bname):
208 Starting().add(vname, bname)
209 for i in self.pool_items:
210 if i.hostname == vname:
213 # load the starting instances from the common file
214 # remember that might be ours
215 # return the list of (vname,bname) that are not ours
216 def load_starting (self):
217 starting = Starting()
220 for (v, b) in starting.tuples:
221 for i in self.pool_items:
222 if i.hostname == v and i.status == 'free':
223 i.status = 'starting'
224 new_tuples.append( (v, b,) )
227 def release_my_starting (self):
228 for i in self.pool_items:
229 if i.status == 'mine':
230 Starting().delete_vname(i.hostname)
236 for item in self.pool_items:
237 if item.status is not None:
238 print(item.char(), end=' ')
240 if self.check_ping (item.hostname):
248 print('Sensing IP pool', self.message, end=' ')
252 for (vname,bname) in self.load_starting():
253 self.substrate.add_starting_dummy(bname, vname)
254 print("After having loaded 'starting': IP pool")
256 # OS-dependent ping option (support for macos, for convenience)
257 ping_timeout_option = None
258 # returns True when a given hostname/ip responds to ping
259 def check_ping (self, hostname):
260 if not Pool.ping_timeout_option:
261 (status, osname) = subprocess.getstatusoutput("uname -s")
263 raise Exception("TestPool: Cannot figure your OS name")
264 if osname == "Linux":
265 Pool.ping_timeout_option = "-w"
266 elif osname == "Darwin":
267 Pool.ping_timeout_option = "-t"
269 command="ping -c 1 {} 1 {}".format(Pool.ping_timeout_option, hostname)
270 (status, output) = subprocess.getstatusoutput(command)
275 def __init__ (self, hostname):
276 self.hostname = hostname
279 return "<Box {}>".format(self.hostname)
280 def shortname (self):
281 return short_hostname(self.hostname)
283 return TestSsh(self.hostname, username='root', unknown_host=False)
284 def reboot (self, options):
285 self.test_ssh().run("shutdown -r now",
286 message="Rebooting {}".format(self.hostname),
287 dry_run=options.dry_run)
289 def hostname_fedora (self, virt=None):
290 # this truly is an opening bracket
291 result = "{}".format(self.hostname) + " {"
293 result += "{}-".format(virt)
294 result += "{} {}".format(self.fedora(), self.memory())
295 # too painful to propagate this cleanly
298 result += "-{}".format(self.uname())
299 # and the matching closing bracket
303 separator = "===composite==="
306 # take this chance to gather useful stuff
309 if self._probed is not None: return self._probed
310 composite_command = [ ]
311 composite_command += [ "hostname" ]
312 composite_command += [ ";" , "echo", Box.separator , ";" ]
313 composite_command += [ "uptime" ]
314 composite_command += [ ";" , "echo", Box.separator , ";" ]
315 composite_command += [ "uname", "-r"]
316 composite_command += [ ";" , "echo", Box.separator , ";" ]
317 composite_command += [ "cat" , "/etc/fedora-release" ]
318 composite_command += [ ";" , "echo", Box.separator , ";" ]
319 composite_command += [ "grep", "MemTotal", "/proc/meminfo" ]
321 # due to colons and all, this is going wrong on the local box (typically testmaster)
322 # I am reluctant to change TestSsh as it might break all over the place, so
323 if self.test_ssh().is_local():
324 probe_argv = [ "bash", "-c", " ".join (composite_command) ]
326 probe_argv = self.test_ssh().actual_argv(composite_command)
327 composite = self.backquote ( probe_argv, trash_err=True )
328 self._hostname = self._uptime = self._uname = self._fedora = self._memory = "** Unknown **"
330 print("root@{} unreachable".format(self.hostname))
334 pieces = composite.split(Box.separator)
335 pieces = [ x.strip() for x in pieces ]
337 [hostname, uptime, uname, fedora, memory] = pieces
339 self._hostname = hostname
340 self._uptime = ', '.join([ x.strip() for x in uptime.split(',')[2:]]).replace("load average", "load")
342 self._fedora = fedora.replace("Fedora release ","f").split(" ")[0]
344 self._memory = int(memory.split()[1])/(1024)
345 except Exception as e:
347 print('BEG issue with pieces')
348 traceback.print_exc()
349 self._probed = self._hostname
352 # use argv=['bash','-c',"the command line"]
355 if hasattr(self,'_uptime') and self._uptime: return self._uptime
356 return '*unprobed* uptime'
359 if hasattr(self,'_uname') and self._uname: return self._uname
360 return '*unprobed* uname'
363 if hasattr(self,'_fedora') and self._fedora: return self._fedora
364 return '*unprobed* fedora'
367 if hasattr(self,'_memory') and self._memory: return "{} Mb".format(self._memory)
368 return '*unprobed* memory'
370 def run(self, argv, message=None, trash_err=False, dry_run=False):
372 print('DRY_RUN:', end=' ')
373 print(" ".join(argv))
378 return subprocess.call(argv)
380 with open('/dev/null', 'w') as null:
381 return subprocess.call(argv, stderr=null)
383 def run_ssh (self, argv, message, trash_err=False, dry_run=False):
384 ssh_argv = self.test_ssh().actual_argv(argv)
385 result = self.run (ssh_argv, message, trash_err, dry_run=dry_run)
387 print("WARNING: failed to run {} on {}".format(" ".join(argv), self.hostname))
390 def backquote (self, argv, trash_err=False):
391 # in python3 we need to set universal_newlines=True
393 out_err = subprocess.Popen(argv, stdout=subprocess.PIPE,
394 universal_newlines=True).communicate()
396 with open('/dev/null', 'w') as null:
397 out_err = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=null,
398 universal_newlines=True).communicate()
399 # only interested in stdout here
402 # if you have any shell-expanded arguments like *
403 # and if there's any chance the command is adressed to the local host
404 def backquote_ssh (self, argv, trash_err=False):
405 if not self.probe(): return ''
406 return self.backquote(self.test_ssh().actual_argv(argv), trash_err)
408 ############################################################
410 def __init__ (self, buildname, pid, buildbox):
411 self.buildname = buildname
412 self.buildbox = buildbox
415 return "<BuildInstance {} in {}>".format(self.buildname, self.buildbox)
417 def add_pid(self,pid):
418 self.pids.append(pid)
421 return "== {} == (pids={})".format(self.buildname, self.pids)
423 class BuildBox (Box):
424 def __init__ (self, hostname):
425 Box.__init__(self, hostname)
426 self.build_instances = []
428 return "<BuildBox {}>".format(self.hostname)
430 def add_build(self, buildname, pid):
431 for build in self.build_instances:
432 if build.buildname == buildname:
435 self.build_instances.append(BuildInstance(buildname, pid, self))
437 def list(self, verbose=False):
438 if not self.build_instances:
439 header ('No build process on {} ({})'.format(self.hostname_fedora(), self.uptime()))
441 header ("Builds on {} ({})".format(self.hostname_fedora(), self.uptime()))
442 for b in self.build_instances:
443 header (b.line(), banner=False)
445 def reboot (self, options):
447 Box.reboot(self, options)
449 self.soft_reboot (options)
451 build_matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
452 build_matcher_initvm=re.compile("\s*(?P<pid>[0-9]+).*initvm.*\s+(?P<buildname>[^\s]+)\s*\Z")
454 class BuildLxcBox (BuildBox):
455 def soft_reboot (self, options):
456 command=['pkill','lbuild']
457 self.run_ssh(command, "Terminating vbuild processes", dry_run=options.dry_run)
459 # inspect box and find currently running builds
460 def sense(self, options):
463 pids = self.backquote_ssh(['pgrep','lbuild'], trash_err=True)
465 command = ['ps', '-o', 'pid,command'] + [ pid for pid in pids.split("\n") if pid]
466 ps_lines = self.backquote_ssh(command).split('\n')
467 for line in ps_lines:
468 if not line.strip() or line.find('PID') >= 0: continue
469 m = build_matcher.match(line)
471 date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
472 buildname = m.group('buildname').replace('@DATE@', date)
473 self.add_build(buildname, m.group('pid'))
475 m = build_matcher_initvm.match(line)
477 # buildname is expansed here
478 self.add_build(buildname, m.group('pid'))
480 header('BuildLxcBox.sense: command {} returned line that failed to match'.format(command))
481 header(">>{}<<".format(line))
483 ############################################################
485 def __init__ (self, plcbox):
486 self.plc_box = plcbox
490 return "<PlcInstance {}>".format(self.plc_box)
492 def set_timestamp (self,timestamp):
493 self.timestamp = timestamp
495 self.timestamp = int(time.time())
496 def pretty_timestamp (self):
497 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
499 class PlcLxcInstance (PlcInstance):
500 # does lxc have a context id of any kind ?
501 def __init__ (self, plcbox, lxcname, pid):
502 PlcInstance.__init__(self, plcbox)
503 self.lxcname = lxcname
506 return "<PlcLxcInstance {}>".format(self.lxcname)
509 return self.lxcname.split('-')[-1]
510 def buildname (self):
511 return self.lxcname.rsplit('-',2)[0]
514 msg="== {} ==".format(self.vplcname())
515 msg += " [={}]".format(self.lxcname)
516 if self.pid==-1: msg+=" not (yet?) running"
517 else: msg+=" (pid={})".format(self.pid)
518 if self.timestamp: msg += " @ {}".format(self.pretty_timestamp())
519 else: msg += " *unknown timestamp*"
523 command="rsync lxc-driver.sh {}:/root".format(self.plc_box.hostname)
524 subprocess.getstatusoutput(command)
525 msg="lxc container stopping {} on {}".format(self.lxcname, self.plc_box.hostname)
526 self.plc_box.run_ssh(['/root/lxc-driver.sh', '-c', 'stop_lxc', '-n', self.lxcname], msg)
527 self.plc_box.forget(self)
531 def __init__ (self, hostname, max_plcs):
532 Box.__init__(self, hostname)
533 self.plc_instances = []
534 self.max_plcs = max_plcs
536 return "<PlcBox {}>".format(self.hostname)
538 def free_slots (self):
539 return self.max_plcs - len(self.plc_instances)
541 # fill one slot even though this one is not started yet
542 def add_dummy (self, plcname):
543 dummy=PlcLxcInstance(self, 'dummy_'+plcname, 0)
545 self.plc_instances.append(dummy)
547 def forget (self, plc_instance):
548 self.plc_instances.remove(plc_instance)
550 def reboot (self, options):
552 Box.reboot(self, options)
554 self.soft_reboot (options)
556 def list(self, verbose=False):
557 if not self.plc_instances:
558 header ('No plc running on {}'.format(self.line()))
560 header ("Active plc VMs on {}".format(self.line()))
561 self.plc_instances.sort(key=timestamp_key)
562 for p in self.plc_instances:
563 header (p.line(), banner=False)
565 ## we do not this at INRIA any more
566 class PlcLxcBox (PlcBox):
568 def add_lxc (self, lxcname, pid):
569 for plc in self.plc_instances:
570 if plc.lxcname == lxcname:
571 header("WARNING, duplicate myplc {} running on {}"\
572 .format(lxcname, self.hostname), banner=False)
574 self.plc_instances.append(PlcLxcInstance(self, lxcname, pid))
577 # a line describing the box
579 return "{} [max={},free={}] ({})".format(self.hostname_fedora(virt="lxc"),
580 self.max_plcs, self.free_slots(),
583 def plc_instance_by_lxcname(self, lxcname):
584 for p in self.plc_instances:
585 if p.lxcname == lxcname:
589 # essentially shutdown all running containers
590 def soft_reboot(self, options):
591 command="rsync lxc-driver.sh {}:/root".format(self.hostname)
592 subprocess.getstatusoutput(command)
593 self.run_ssh( ['/root/lxc-driver.sh','-c','stop_all'],
594 "Stopping all running lxc containers on {}".format(self.hostname),
595 dry_run=options.dry_run)
598 # sense is expected to fill self.plc_instances with PlcLxcInstance's
599 # to describe the currently running VM's
600 def sense(self, options):
603 command = "rsync lxc-driver.sh {}:/root".format(self.hostname)
604 subprocess.getstatusoutput(command)
605 command = ['/root/lxc-driver.sh', '-c', 'sense_all']
606 lxc_stat = self.backquote_ssh (command)
607 for lxc_line in lxc_stat.split("\n"):
610 # we mix build and plc VMs
611 if 'vplc' not in lxc_line:
613 lxcname = lxc_line.split(";")[0]
614 pid = lxc_line.split(";")[1]
615 timestamp = lxc_line.split(";")[2]
616 self.add_lxc(lxcname,pid)
617 try: timestamp = int(timestamp)
618 except: timestamp = 0
619 p = self.plc_instance_by_lxcname(lxcname)
621 print('WARNING zombie plc',self.hostname,lxcname)
622 print('... was expecting',lxcname,'in',[i.lxcname for i in self.plc_instances])
624 p.set_timestamp(timestamp)
626 ############################################################
628 def __init__(self, nodename, pid, qemubox):
629 self.nodename = nodename
631 self.qemu_box = qemubox
633 self.buildname = None
636 return "<QemuInstance {}>".format(self.nodename)
638 def set_buildname (self, buildname):
639 self.buildname = buildname
640 def set_timestamp (self, timestamp):
641 self.timestamp = timestamp
643 self.timestamp = int(time.time())
644 def pretty_timestamp (self):
645 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
648 msg = "== {} ==".format(short_hostname(self.nodename))
649 msg += " [={}]".format(self.buildname)
650 if self.pid: msg += " (pid={})".format(self.pid)
651 else: msg += " not (yet?) running"
652 if self.timestamp: msg += " @ {}".format(self.pretty_timestamp())
653 else: msg += " *unknown timestamp*"
658 print("cannot kill qemu {} with pid==0".format(self.nodename))
660 msg = "Killing qemu {} with pid={} on box {}".format(self.nodename, self.pid, self.qemu_box.hostname)
661 self.qemu_box.run_ssh(['kill', "{}".format(self.pid)], msg)
662 self.qemu_box.forget(self)
666 def __init__ (self, hostname, max_qemus):
667 Box.__init__(self, hostname)
668 self.qemu_instances = []
669 self.max_qemus = max_qemus
671 return "<QemuBox {}>".format(self.hostname)
673 def add_node(self, nodename, pid):
674 for qemu in self.qemu_instances:
675 if qemu.nodename == nodename:
676 header("WARNING, duplicate qemu {} running on {}"\
677 .format(nodename,self.hostname), banner=False)
679 self.qemu_instances.append(QemuInstance(nodename, pid, self))
681 def node_names (self):
682 return [ qi.nodename for qi in self.qemu_instances ]
684 def forget (self, qemu_instance):
685 self.qemu_instances.remove(qemu_instance)
687 # fill one slot even though this one is not started yet
688 def add_dummy(self, nodename):
689 dummy=QemuInstance('dummy_'+nodename, 0, self)
691 self.qemu_instances.append(dummy)
694 return "{} [max={},free={}] ({}) {}"\
695 .format(self.hostname_fedora(virt="qemu"),
696 self.max_qemus, self.free_slots(),
697 self.uptime(), self.driver())
699 def list(self, verbose=False):
700 if not self.qemu_instances:
701 header ('No qemu on {}'.format(self.line()))
703 header ("Qemus on {}".format(self.line()))
704 self.qemu_instances.sort(key=timestamp_key)
705 for q in self.qemu_instances:
706 header (q.line(), banner=False)
708 def free_slots (self):
709 return self.max_qemus - len(self.qemu_instances)
712 if hasattr(self,'_driver') and self._driver:
714 return '*undef* driver'
716 def qemu_instance_by_pid(self, pid):
717 for q in self.qemu_instances:
722 def qemu_instance_by_nodename_buildname (self, nodename, buildname):
723 for q in self.qemu_instances:
724 if q.nodename == nodename and q.buildname == buildname:
728 def reboot (self, options):
730 Box.reboot(self, options)
732 self.run_ssh(['pkill','qemu'], "Killing qemu instances",
733 dry_run=options.dry_run)
735 matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
737 def sense(self, options):
740 modules = self.backquote_ssh(['lsmod']).split('\n')
741 self._driver = '*NO kqemu/kvm_intel MODULE LOADED*'
742 for module in modules:
743 if module.find('kqemu') == 0:
744 self._driver = 'kqemu module loaded'
745 # kvm might be loaded without kvm_intel (we dont have AMD)
746 elif module.find('kvm_intel') == 0:
747 self._driver = 'kvm_intel OK'
748 ########## find out running pids
749 pids = self.backquote_ssh(['pgrep','qemu'])
752 command = ['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
753 ps_lines = self.backquote_ssh(command).split("\n")
754 for line in ps_lines:
755 if not line.strip() or line.find('PID') >=0 :
757 m = QemuBox.matcher.match(line)
759 self.add_node(m.group('nodename'), m.group('pid'))
761 header('QemuBox.sense: command {} returned line that failed to match'.format(command))
762 header(">>{}<<".format(line))
763 ########## retrieve alive instances and map to build
765 command = ['grep', '.', '/vservers/*/*/qemu.pid', '/dev/null']
766 pid_lines = self.backquote_ssh(command, trash_err=True).split('\n')
767 for pid_line in pid_lines:
768 if not pid_line.strip():
770 # expect <build>/<nodename>/qemu.pid:<pid>pid
772 (_, __, buildname, nodename, tail) = pid_line.split('/')
773 (_,pid) = tail.split(':')
774 q = self.qemu_instance_by_pid(pid)
777 q.set_buildname(buildname)
778 live_builds.append(buildname)
780 print('WARNING, could not parse pid line', pid_line)
781 # retrieve timestamps
784 command = ['grep','.']
785 command += ['/vservers/{}/*/timestamp'.format(b) for b in live_builds]
786 command += ['/dev/null']
787 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
788 for ts_line in ts_lines:
789 if not ts_line.strip():
791 # expect <build>/<nodename>/timestamp:<timestamp>
793 (_, __, buildname, nodename, tail) = ts_line.split('/')
794 nodename = nodename.replace('qemu-', '')
795 (_, timestamp) = tail.split(':')
796 timestamp = int(timestamp)
797 q = self.qemu_instance_by_nodename_buildname(nodename, buildname)
799 # this warning corresponds to qemu instances that were not killed properly
800 # and that have a dangling qemu.pid - and not even all of them as they need
801 # to be attached to a build that has a node running...
802 # it is more confusing than helpful, so let's just trash it
803 #print 'WARNING zombie qemu',self.hostname,ts_line
804 #print '... was expecting (',short_hostname(nodename),buildname,') in',\
805 # [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
807 q.set_timestamp(timestamp)
809 print('WARNING, could not parse ts line',ts_line)
813 def __init__(self, buildname, pid=0):
817 self.buildname = buildname
821 self.broken_steps = []
824 return "<TestInstance {}>".format(self.buildname)
826 def set_timestamp(self, timestamp):
827 self.timestamp = timestamp
829 self.timestamp = int(time.time())
830 def pretty_timestamp(self):
831 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
832 def is_running (self):
833 return len(self.pids) != 0
834 def add_pid(self, pid):
835 self.pids.append(pid)
836 def set_broken(self, plcindex, step):
837 self.broken_steps.append( (plcindex, step,) )
839 def second_letter(self):
840 if not self.broken_steps:
843 really_broken = [ step for (i,step) in self.broken_steps if '_ignore' not in step ]
844 # W is for warning like what's in the build mail
845 if len(really_broken) == 0:
851 # make up a 2-letter sign
852 # first letter : '=', unless build is running : '*'
853 double = '*' if self.pids else '='
854 # second letter : '=' if fine, 'W' for warnings (only ignored steps) 'B' for broken
855 letter2 = self.second_letter()
857 msg = " {} {} ==".format(double, self.buildname)
860 elif len(self.pids)==1:
861 msg += " (pid={})".format(self.pids[0])
863 msg += " !!!pids={}!!!".format(self.pids)
864 msg += " @{}".format(self.pretty_timestamp())
866 msg2 = ( ' BROKEN' if letter2 == 'B' else ' WARNING' )
867 # sometimes we have an empty plcindex
868 msg += " [{}=".format(msg2) \
869 + " ".join(["{}@{}".format(s, i) if i else s for (i, s) in self.broken_steps]) \
874 def __init__(self, hostname):
875 Box.__init__(self, hostname)
876 self.starting_ips = []
877 self.test_instances = []
879 return "<TestBox {}>".format(self.hostname)
881 def reboot(self, options):
882 # can't reboot a vserver VM
883 self.run_ssh(['pkill', 'run_log'], "Terminating current runs",
884 dry_run=options.dry_run)
885 self.run_ssh(['rm', '-f', Starting.location], "Cleaning {}".format(Starting.location),
886 dry_run=options.dry_run)
888 def get_test(self, buildname):
889 for i in self.test_instances:
890 if i.buildname == buildname:
893 # we scan ALL remaining test results, even the ones not running
894 def add_timestamp(self, buildname, timestamp):
895 i = self.get_test(buildname)
897 i.set_timestamp(timestamp)
899 i = TestInstance(buildname, 0)
900 i.set_timestamp(timestamp)
901 self.test_instances.append(i)
903 def add_running_test(self, pid, buildname):
904 i = self.get_test(buildname)
906 self.test_instances.append(TestInstance(buildname, pid))
909 print("WARNING: 2 concurrent tests run on same build {}".format(buildname))
912 def add_broken(self, buildname, plcindex, step):
913 i = self.get_test(buildname)
915 i = TestInstance(buildname)
916 self.test_instances.append(i)
917 i.set_broken(plcindex, step)
919 matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
920 matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
921 matcher_grep_missing=re.compile ("grep: /root/(?P<buildname>[^/]+)/logs/trace: No such file or directory")
923 def sense(self, options):
925 self.starting_ips = [ x for x in self.backquote_ssh( ['cat', Starting.location], trash_err=True).strip().split('\n') if x ]
927 # scan timestamps on all tests
928 # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
929 # xxx would make sense above too
930 command = ['bash', '-c', "grep . /root/*/timestamp /dev/null"]
931 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
932 for ts_line in ts_lines:
933 if not ts_line.strip():
935 # expect /root/<buildname>/timestamp:<timestamp>
937 (ts_file, timestamp) = ts_line.split(':')
938 ts_file = os.path.dirname(ts_file)
939 buildname = os.path.basename(ts_file)
940 timestamp = int(timestamp)
941 t = self.add_timestamp(buildname, timestamp)
943 print('WARNING, could not parse ts line', ts_line)
945 # let's try to be robust here -- tests that fail very early like e.g.
946 # "Cannot make space for a PLC instance: vplc IP pool exhausted", that occurs as part of provision
947 # will result in a 'trace' symlink to an inexisting 'trace-<>.txt' because no step has gone through
948 # simple 'trace' should exist though as it is created by run_log
949 command = ['bash', '-c', "grep KO /root/*/logs/trace /dev/null 2>&1" ]
950 trace_lines = self.backquote_ssh(command).split('\n')
951 for line in trace_lines:
954 m = TestBox.matcher_grep_missing.match(line)
956 buildname = m.group('buildname')
957 self.add_broken(buildname, '', 'NO STEP DONE')
959 m = TestBox.matcher_grep.match(line)
961 buildname = m.group('buildname')
962 plcindex = m.group('plcindex')
963 step = m.group('step')
964 self.add_broken(buildname, plcindex, step)
966 header("TestBox.sense: command {} returned line that failed to match\n{}".format(command, line))
967 header(">>{}<<".format(line))
969 pids = self.backquote_ssh (['pgrep', 'run_log'], trash_err=True)
972 command = ['ls','-ld'] + ["/proc/{}/cwd".format(pid) for pid in pids.split("\n") if pid]
973 ps_lines = self.backquote_ssh(command).split('\n')
974 for line in ps_lines:
977 m = TestBox.matcher_proc.match(line)
980 buildname = m.group('buildname')
981 self.add_running_test(pid, buildname)
983 header("TestBox.sense: command {} returned line that failed to match\n{}".format(command, line))
984 header(">>{}<<".format(line))
988 return self.hostname_fedora()
990 def list (self, verbose=False):
991 # verbose shows all tests
993 instances = self.test_instances
996 instances = [ i for i in self.test_instances if i.is_running() ]
1000 header ("No {} on {}".format(msg, self.line()))
1002 header ("{} on {}".format(msg, self.line()))
1003 instances.sort(key=timestamp_key)
1006 # show 'starting' regardless of verbose
1007 if self.starting_ips:
1008 header("Starting IP addresses on {}".format(self.line()))
1009 self.starting_ips.sort()
1010 for starting in self.starting_ips:
1013 header("Empty 'starting' on {}".format(self.line()))
1015 ############################################################
1021 self.options = Options()
1022 self.options.dry_run = False
1023 self.options.verbose = False
1024 self.options.reboot = False
1025 self.options.soft = False
1026 self.test_box = TestBox (self.test_box_spec())
1027 self.build_lxc_boxes = [ BuildLxcBox(h) for h in self.build_lxc_boxes_spec() ]
1028 self.plc_lxc_boxes = [ PlcLxcBox (h, m) for (h, m) in self.plc_lxc_boxes_spec ()]
1029 self.qemu_boxes = [ QemuBox (h, m) for (h, m) in self.qemu_boxes_spec ()]
1030 self._sensed = False
1032 self.vplc_pool = Pool(self.vplc_ips(), "for vplcs", self)
1033 self.vnode_pool = Pool(self.vnode_ips(), "for vnodes", self)
1035 self.build_boxes = self.build_lxc_boxes
1036 self.plc_boxes = self.plc_lxc_boxes
1037 self.default_boxes = self.plc_boxes + self.qemu_boxes
1038 self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
1040 return "<Substrate>".format()
1042 def summary_line (self):
1044 msg += " {} xp".format(len(self.plc_lxc_boxes))
1045 msg += " {} tried plc boxes".format(len(self.plc_boxes))
1049 def fqdn (self, hostname):
1050 if hostname.find('.') < 0:
1051 return "{}.{}".format(hostname, self.domain())
1054 # return True if actual sensing takes place
1055 def sense(self, force=False):
1056 if self._sensed and not force:
1058 print('Sensing local substrate...', end=' ')
1060 for b in self.default_boxes:
1061 b.sense(self.options)
1066 def list(self, verbose=False):
1067 for b in self.default_boxes:
1070 def add_dummy_plc(self, plc_boxname, plcname):
1071 for pb in self.plc_boxes:
1072 if pb.hostname == plc_boxname:
1073 pb.add_dummy(plcname)
1075 def add_dummy_qemu(self, qemu_boxname, qemuname):
1076 for qb in self.qemu_boxes:
1077 if qb.hostname == qemu_boxname:
1078 qb.add_dummy(qemuname)
1081 def add_starting_dummy(self, bname, vname):
1082 return self.add_dummy_plc(bname, vname) or self.add_dummy_qemu(bname, vname)
1085 def provision(self, plcs, options):
1087 # attach each plc to a plc box and an IP address
1088 plcs = [ self.provision_plc(plc, options) for plc in plcs ]
1089 # attach each node/qemu to a qemu box with an IP address
1090 plcs = [ self.provision_qemus(plc,options) for plc in plcs ]
1091 # update the SFA spec accordingly
1092 plcs = [ self.localize_sfa_rspec(plc, options) for plc in plcs ]
1095 except Exception as e:
1096 print('* Could not provision this test on current substrate','--',e,'--','exiting')
1097 traceback.print_exc()
1100 # it is expected that a couple of options like ips_bplc and ips_vplc
1101 # are set or unset together
1103 def check_options(x, y):
1106 return len(x) == len(y)
1108 # find an available plc box (or make space)
1109 # and a free IP address (using options if present)
1110 def provision_plc(self, plc, options):
1112 assert Substrate.check_options(options.ips_bplc, options.ips_vplc)
1114 #### let's find an IP address for that plc
1116 if options.ips_vplc:
1118 # we don't check anything here,
1119 # it is the caller's responsability to cleanup and make sure this makes sense
1120 plc_boxname = options.ips_bplc.pop()
1121 vplc_hostname = options.ips_vplc.pop()
1126 vplc_hostname = None
1127 # try to find an available IP
1128 self.vplc_pool.sense()
1129 couple = self.vplc_pool.next_free()
1131 (vplc_hostname, unused) = couple
1132 #### we need to find one plc box that still has a slot
1134 # use the box that has max free spots for load balancing
1135 for pb in self.plc_boxes:
1136 free = pb.free_slots()
1138 plc_boxname = pb.hostname
1140 # if there's no available slot in the plc_boxes, or we need a free IP address
1141 # make space by killing the oldest running instance
1142 if not plc_boxname or not vplc_hostname:
1143 # find the oldest of all our instances
1144 all_plc_instances = reduce(lambda x, y: x+y,
1145 [ pb.plc_instances for pb in self.plc_boxes ],
1147 all_plc_instances.sort(key=timestamp_key)
1149 plc_instance_to_kill = all_plc_instances[0]
1153 msg += " PLC boxes are full"
1154 if not vplc_hostname:
1155 msg += " vplc IP pool exhausted"
1156 msg += " {}".format(self.summary_line())
1157 raise Exception("Cannot make space for a PLC instance:" + msg)
1158 freed_plc_boxname = plc_instance_to_kill.plc_box.hostname
1159 freed_vplc_hostname = plc_instance_to_kill.vplcname()
1160 message = 'killing oldest plc instance = {} on {}'\
1161 .format(plc_instance_to_kill.line(), freed_plc_boxname)
1162 plc_instance_to_kill.kill()
1163 # use this new plcbox if that was the problem
1165 plc_boxname = freed_plc_boxname
1166 # ditto for the IP address
1167 if not vplc_hostname:
1168 vplc_hostname = freed_vplc_hostname
1169 # record in pool as mine
1170 self.vplc_pool.set_mine(vplc_hostname)
1173 self.add_dummy_plc(plc_boxname, plc['name'])
1174 vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
1175 self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
1177 #### compute a helpful vserver name
1178 # remove domain in hostname
1179 vplc_short = short_hostname(vplc_hostname)
1180 vservername = "{}-{}-{}".format(options.buildname, plc['index'], vplc_short)
1181 plc_name = "{}_{}".format(plc['name'], vplc_short)
1183 utils.header('PROVISION plc {} in box {} at IP {} as {}'\
1184 .format(plc['name'], plc_boxname, vplc_hostname, vservername))
1186 #### apply in the plc_spec
1188 # label = options.personality.replace("linux","")
1189 mapper = {'plc' : [ ('*' , {'host_box' : plc_boxname,
1191 'vservername' : vservername,
1192 'vserverip' : vplc_ip,
1193 'settings:PLC_DB_HOST' : vplc_hostname,
1194 'settings:PLC_API_HOST' : vplc_hostname,
1195 'settings:PLC_BOOT_HOST' : vplc_hostname,
1196 'settings:PLC_WWW_HOST' : vplc_hostname,
1197 'settings:PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
1198 'settings:PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
1203 # mappers only work on a list of plcs
1204 return TestMapper([plc], options).map(mapper)[0]
1207 def provision_qemus(self, plc, options):
1209 assert Substrate.check_options(options.ips_bnode, options.ips_vnode)
1211 test_mapper = TestMapper([plc], options)
1212 nodenames = test_mapper.node_names()
1214 for nodename in nodenames:
1216 if options.ips_vnode:
1217 # as above, it's a rerun, take it for granted
1218 qemu_boxname = options.ips_bnode.pop()
1219 vnode_hostname = options.ips_vnode.pop()
1224 vnode_hostname = None
1225 # try to find an available IP
1226 self.vnode_pool.sense()
1227 couple = self.vnode_pool.next_free()
1229 (vnode_hostname, unused) = couple
1230 # find a physical box
1232 # use the box that has max free spots for load balancing
1233 for qb in self.qemu_boxes:
1234 free = qb.free_slots()
1236 qemu_boxname = qb.hostname
1238 # if we miss the box or the IP, kill the oldest instance
1239 if not qemu_boxname or not vnode_hostname:
1240 # find the oldest of all our instances
1241 all_qemu_instances = reduce(lambda x, y: x+y,
1242 [ qb.qemu_instances for qb in self.qemu_boxes ],
1244 all_qemu_instances.sort(key=timestamp_key)
1246 qemu_instance_to_kill = all_qemu_instances[0]
1249 if not qemu_boxname:
1250 msg += " QEMU boxes are full"
1251 if not vnode_hostname:
1252 msg += " vnode IP pool exhausted"
1253 msg += " {}".format(self.summary_line())
1254 raise Exception("Cannot make space for a QEMU instance:"+msg)
1255 freed_qemu_boxname = qemu_instance_to_kill.qemu_box.hostname
1256 freed_vnode_hostname = short_hostname(qemu_instance_to_kill.nodename)
1258 message = 'killing oldest qemu node = {} on {}'.format(qemu_instance_to_kill.line(),
1260 qemu_instance_to_kill.kill()
1261 # use these freed resources where needed
1262 if not qemu_boxname:
1263 qemu_boxname = freed_qemu_boxname
1264 if not vnode_hostname:
1265 vnode_hostname = freed_vnode_hostname
1266 self.vnode_pool.set_mine(vnode_hostname)
1268 self.add_dummy_qemu(qemu_boxname, vnode_hostname)
1269 mac = self.vnode_pool.retrieve_userdata(vnode_hostname)
1270 ip = self.vnode_pool.get_ip(vnode_hostname)
1271 self.vnode_pool.add_starting(vnode_hostname, qemu_boxname)
1273 vnode_fqdn = self.fqdn(vnode_hostname)
1274 nodemap = {'host_box' : qemu_boxname,
1275 'node_fields:hostname' : vnode_fqdn,
1276 'interface_fields:ip' : ip,
1277 'ipaddress_fields:ip_addr' : ip,
1278 'interface_fields:mac' : mac,
1280 nodemap.update(self.network_settings())
1281 maps.append( (nodename, nodemap) )
1283 utils.header("PROVISION node {} in box {} at IP {} with MAC {}"\
1284 .format(nodename, qemu_boxname, vnode_hostname, mac))
1286 return test_mapper.map({'node':maps})[0]
1288 def localize_sfa_rspec(self, plc, options):
1290 plc['sfa']['settings']['SFA_REGISTRY_HOST'] = plc['settings']['PLC_DB_HOST']
1291 plc['sfa']['settings']['SFA_AGGREGATE_HOST'] = plc['settings']['PLC_DB_HOST']
1292 plc['sfa']['settings']['SFA_SM_HOST'] = plc['settings']['PLC_DB_HOST']
1293 plc['sfa']['settings']['SFA_DB_HOST'] = plc['settings']['PLC_DB_HOST']
1294 plc['sfa']['settings']['SFA_PLC_URL'] = 'https://{}:443/PLCAPI/'.format(plc['settings']['PLC_API_HOST'])
1297 #################### release:
1298 def release(self, options):
1299 self.vplc_pool.release_my_starting()
1300 self.vnode_pool.release_my_starting()
1303 #################### show results for interactive mode
1304 def get_box(self, boxname):
1305 for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1306 if b.shortname() == boxname:
1309 if b.shortname() == boxname.split('.')[0]:
1313 print("Could not find box {}".format(boxname))
1316 # deal with the mix of boxes and names and stores the current focus
1317 # as a list of Box instances in self.focus_all
1318 def normalize(self, box_or_names):
1320 for box in box_or_names:
1321 if not isinstance(box, Box):
1322 box = self.get_box(box)
1324 print('Warning - could not handle box',box)
1325 self.focus_all.append(box)
1327 self.focus_build = [ x for x in self.focus_all if isinstance(x, BuildBox) ]
1328 self.focus_plc = [ x for x in self.focus_all if isinstance(x, PlcBox) ]
1329 self.focus_qemu = [ x for x in self.focus_all if isinstance(x, QemuBox) ]
1331 def list_boxes(self):
1332 print('Sensing', end=' ')
1334 for box in self.focus_all:
1335 box.sense(self.options)
1337 for box in self.focus_all:
1338 box.list(self.options.verbose)
1340 def reboot_boxes(self):
1341 for box in self.focus_all:
1342 box.reboot(self.options)
1344 def sanity_check(self):
1345 print('Sanity check')
1346 self.sanity_check_plc()
1347 self.sanity_check_qemu()
1349 def sanity_check_plc(self):
1352 def sanity_check_qemu(self):
1354 for box in self.focus_qemu:
1355 all_nodes += box.node_names()
1357 for node in all_nodes:
1358 if node not in hash:
1361 for (node,count) in list(hash.items()):
1363 print('WARNING - duplicate node', node)
1366 ####################
1367 # can be run as a utility to probe/display/manage the local infrastructure
1369 parser = OptionParser()
1370 parser.add_option('-r', "--reboot", action='store_true', dest='reboot', default=False,
1371 help='reboot mode (use shutdown -r)')
1372 parser.add_option('-s', "--soft", action='store_true', dest='soft', default=False,
1373 help='soft mode for reboot (terminates processes)')
1374 parser.add_option('-t', "--testbox", action='store_true', dest='testbox', default=False,
1375 help='add test box')
1376 parser.add_option('-b', "--build", action='store_true', dest='builds', default=False,
1377 help='add build boxes')
1378 parser.add_option('-p', "--plc", action='store_true', dest='plcs', default=False,
1379 help='add plc boxes')
1380 parser.add_option('-q', "--qemu", action='store_true', dest='qemus', default=False,
1381 help='add qemu boxes')
1382 parser.add_option('-a', "--all", action='store_true', dest='all', default=False,
1383 help='address all known boxes, like -b -t -p -q')
1384 parser.add_option('-v', "--verbose", action='store_true', dest='verbose', default=False,
1385 help='verbose mode')
1386 parser.add_option('-n', "--dry_run", action='store_true', dest='dry_run', default=False,
1387 help='dry run mode')
1388 (self.options, args) = parser.parse_args()
1391 if self.options.testbox: boxes += [self.test_box]
1392 if self.options.builds: boxes += self.build_boxes
1393 if self.options.plcs: boxes += self.plc_boxes
1394 if self.options.qemus: boxes += self.qemu_boxes
1395 if self.options.all: boxes += self.all_boxes
1398 verbose = self.options.verbose
1399 # default scope is -b -p -q -t
1401 boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box]
1403 self.normalize(boxes)
1405 if self.options.reboot: