2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA
5 # #################### history
7 # see also Substrate.readme
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity,
14 # hence the 'timestamp-*' steps in TestPlc
16 # this should be much more flexible:
17 # * supports several plc boxes
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
21 # #################### howto use
23 # each site is to write its own LocalSubstrate.py,
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
28 # . the vserver-capable boxes used for hosting myplcs
29 # . and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
32 # . the kvm-qemu capable boxes to host qemu instances
33 # . and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
36 # #################### implem. note
38 # this model relies on 'sensing' the substrate,
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies
41 # however there is a 'black hole' between the time where a given address is
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class
47 # ####################
55 from optparse import OptionParser
58 from TestSsh import TestSsh
59 from TestMapper import TestMapper
60 from functools import reduce
62 # too painful to propagate this cleanly
65 def header (message, banner=True):
66 if not message: return
68 print("===============", end=' ')
72 def timestamp_key(o): return o.timestamp
74 def short_hostname (hostname):
75 return hostname.split('.')[0]
78 # the place were other test instances tell about their not-yet-started
79 # instances, that go undetected through sensing
82 location = '/root/starting'
89 with open(Starting.location) as starting:
90 self.tuples = [line.strip().split('@') for line in starting.readlines()]
96 return [ x for (x, _) in self.tuples ]
98 def add (self, vname, bname):
99 if not vname in self.vnames():
100 with open(Starting.location, 'a') as out:
101 out.write("{}@{}\n".format(vname, bname))
103 def delete_vname (self, vname):
105 if vname in self.vnames():
106 with open(Starting.location, 'w') as f:
107 for (v, b) in self.tuples:
109 f.write("{}@{}\n".format(v, b))
113 # allows to pick an available IP among a pool
114 # input is expressed as a list of tuples (hostname,ip,user_data)
115 # that can be searched iteratively for a free slot
117 # pool = [ (hostname1,user_data1),
118 # (hostname2,user_data2),
119 # (hostname3,user_data2),
120 # (hostname4,user_data4) ]
121 # assuming that ip1 and ip3 are taken (pingable), then we'd get
123 # pool.next_free() -> entry2
124 # pool.next_free() -> entry4
125 # pool.next_free() -> None
126 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
129 def __init__ (self, hostname, userdata):
130 self.hostname = hostname
131 self.userdata = userdata
132 # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
133 # 'mine' is for our own stuff, 'starting' from the concurrent tests
138 return "Pooled {} ({}) -> {}".format(self.hostname, self.userdata, self.status)
141 if self.status == None: return '?'
142 elif self.status == 'busy': return '+'
143 elif self.status == 'free': return '-'
144 elif self.status == 'mine': return 'M'
145 elif self.status == 'starting': return 'S'
148 if self.ip: return self.ip
149 ip=socket.gethostbyname(self.hostname)
155 def __init__ (self, tuples, message, substrate):
156 self.pool_items = [ PoolItem (hostname, userdata) for (hostname, userdata) in tuples ]
157 self.message = message
158 # where to send notifications upon load_starting
159 self.substrate = substrate
161 def list (self, verbose=False):
162 for i in self.pool_items: print(i.line())
166 for i in self.pool_items: line += ' ' + i.char()
169 def _item (self, hostname):
170 for i in self.pool_items:
171 if i.hostname == hostname: return i
172 raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
174 def retrieve_userdata (self, hostname):
175 return self._item(hostname).userdata
177 def get_ip (self, hostname):
179 return self._item(hostname).get_ip()
181 return socket.gethostbyname(hostname)
183 def set_mine (self, hostname):
185 self._item(hostname).status='mine'
187 print('WARNING: host %s not found in IP pool %s'%(hostname,self.message))
189 def next_free (self):
190 for i in self.pool_items:
191 if i.status == 'free':
193 return (i.hostname, i.userdata)
197 # we have a starting instance of our own
198 def add_starting (self, vname, bname):
199 Starting().add(vname, bname)
200 for i in self.pool_items:
201 if i.hostname == vname:
204 # load the starting instances from the common file
205 # remember that might be ours
206 # return the list of (vname,bname) that are not ours
207 def load_starting (self):
208 starting = Starting()
211 for (v, b) in starting.tuples:
212 for i in self.pool_items:
213 if i.hostname == v and i.status == 'free':
214 i.status = 'starting'
215 new_tuples.append( (v, b,) )
218 def release_my_starting (self):
219 for i in self.pool_items:
220 if i.status == 'mine':
221 Starting().delete_vname(i.hostname)
227 for item in self.pool_items:
228 if item.status is not None:
229 print(item.char(), end=' ')
231 if self.check_ping (item.hostname):
239 print('Sensing IP pool', self.message, end=' ')
242 for (vname,bname) in self.load_starting():
243 self.substrate.add_starting_dummy(bname, vname)
244 print("After having loaded 'starting': IP pool")
246 # OS-dependent ping option (support for macos, for convenience)
247 ping_timeout_option = None
248 # returns True when a given hostname/ip responds to ping
249 def check_ping (self, hostname):
250 if not Pool.ping_timeout_option:
251 (status, osname) = subprocess.getstatusoutput("uname -s")
253 raise Exception("TestPool: Cannot figure your OS name")
254 if osname == "Linux":
255 Pool.ping_timeout_option = "-w"
256 elif osname == "Darwin":
257 Pool.ping_timeout_option = "-t"
259 command="ping -c 1 {} 1 {}".format(Pool.ping_timeout_option, hostname)
260 (status, output) = subprocess.getstatusoutput(command)
265 def __init__ (self, hostname):
266 self.hostname = hostname
268 def shortname (self):
269 return short_hostname(self.hostname)
271 return TestSsh(self.hostname, username='root', unknown_host=False)
272 def reboot (self, options):
273 self.test_ssh().run("shutdown -r now",
274 message="Rebooting {}".format(self.hostname),
275 dry_run=options.dry_run)
277 def hostname_fedora (self, virt=None):
278 # this truly is an opening bracket
279 result = "{}".format(self.hostname) + " {"
281 result += "{}-".format(virt)
282 result += "{} {}".format(self.fedora(), self.memory())
283 # too painful to propagate this cleanly
286 result += "-{}".format(self.uname())
287 # and the matching closing bracket
291 separator = "===composite==="
294 # take this chance to gather useful stuff
297 if self._probed is not None: return self._probed
298 composite_command = [ ]
299 composite_command += [ "hostname" ]
300 composite_command += [ ";" , "echo", Box.separator , ";" ]
301 composite_command += [ "uptime" ]
302 composite_command += [ ";" , "echo", Box.separator , ";" ]
303 composite_command += [ "uname", "-r"]
304 composite_command += [ ";" , "echo", Box.separator , ";" ]
305 composite_command += [ "cat" , "/etc/fedora-release" ]
306 composite_command += [ ";" , "echo", Box.separator , ";" ]
307 composite_command += [ "grep", "MemTotal", "/proc/meminfo" ]
309 # due to colons and all, this is going wrong on the local box (typically testmaster)
310 # I am reluctant to change TestSsh as it might break all over the place, so
311 if self.test_ssh().is_local():
312 probe_argv = [ "bash", "-c", " ".join (composite_command) ]
314 probe_argv = self.test_ssh().actual_argv(composite_command)
315 composite = self.backquote ( probe_argv, trash_err=True )
316 self._hostname = self._uptime = self._uname = self._fedora = self._memory = "** Unknown **"
318 print("root@{} unreachable".format(self.hostname))
322 pieces = composite.split(Box.separator)
323 pieces = [ x.strip() for x in pieces ]
325 [hostname, uptime, uname, fedora, memory] = pieces
327 self._hostname = hostname
328 self._uptime = ', '.join([ x.strip() for x in uptime.split(',')[2:]]).replace("load average", "load")
330 self._fedora = fedora.replace("Fedora release ","f").split(" ")[0]
332 self._memory = int(memory.split()[1])/(1024)
333 except Exception as e:
335 print('BEG issue with pieces')
336 traceback.print_exc()
337 self._probed = self._hostname
340 # use argv=['bash','-c',"the command line"]
343 if hasattr(self,'_uptime') and self._uptime: return self._uptime
344 return '*unprobed* uptime'
347 if hasattr(self,'_uname') and self._uname: return self._uname
348 return '*unprobed* uname'
351 if hasattr(self,'_fedora') and self._fedora: return self._fedora
352 return '*unprobed* fedora'
355 if hasattr(self,'_memory') and self._memory: return "{} Mb".format(self._memory)
356 return '*unprobed* memory'
358 def run(self, argv, message=None, trash_err=False, dry_run=False):
360 print('DRY_RUN:', end=' ')
361 print(" ".join(argv))
366 return subprocess.call(argv)
368 with open('/dev/null', 'w') as null:
369 return subprocess.call(argv, stderr=null)
371 def run_ssh (self, argv, message, trash_err=False, dry_run=False):
372 ssh_argv = self.test_ssh().actual_argv(argv)
373 result = self.run (ssh_argv, message, trash_err, dry_run=dry_run)
375 print("WARNING: failed to run {} on {}".format(" ".join(argv), self.hostname))
378 def backquote (self, argv, trash_err=False):
379 # in python3 we need to set universal_newlines=True
381 out_err = subprocess.Popen(argv, stdout=subprocess.PIPE,
382 universal_newlines=True).communicate()
384 with open('/dev/null', 'w') as null:
385 out_err = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=null,
386 universal_newlines=True).communicate()
387 # only interested in stdout here
390 # if you have any shell-expanded arguments like *
391 # and if there's any chance the command is adressed to the local host
392 def backquote_ssh (self, argv, trash_err=False):
393 if not self.probe(): return ''
394 return self.backquote(self.test_ssh().actual_argv(argv), trash_err)
396 ############################################################
398 def __init__ (self, buildname, pid, buildbox):
399 self.buildname = buildname
400 self.buildbox = buildbox
403 def add_pid(self,pid):
404 self.pids.append(pid)
407 return "== {} == (pids={})".format(self.buildname, self.pids)
409 class BuildBox (Box):
410 def __init__ (self, hostname):
411 Box.__init__(self, hostname)
412 self.build_instances = []
414 def add_build(self, buildname, pid):
415 for build in self.build_instances:
416 if build.buildname == buildname:
419 self.build_instances.append(BuildInstance(buildname, pid, self))
421 def list(self, verbose=False):
422 if not self.build_instances:
423 header ('No build process on {} ({})'.format(self.hostname_fedora(), self.uptime()))
425 header ("Builds on {} ({})".format(self.hostname_fedora(), self.uptime()))
426 for b in self.build_instances:
427 header (b.line(), banner=False)
429 def reboot (self, options):
431 Box.reboot(self, options)
433 self.soft_reboot (options)
435 build_matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
436 build_matcher_initvm=re.compile("\s*(?P<pid>[0-9]+).*initvm.*\s+(?P<buildname>[^\s]+)\s*\Z")
438 class BuildLxcBox (BuildBox):
439 def soft_reboot (self, options):
440 command=['pkill','lbuild']
441 self.run_ssh(command, "Terminating vbuild processes", dry_run=options.dry_run)
443 # inspect box and find currently running builds
444 def sense(self, options):
446 pids = self.backquote_ssh(['pgrep','lbuild'], trash_err=True)
448 command = ['ps', '-o', 'pid,command'] + [ pid for pid in pids.split("\n") if pid]
449 ps_lines = self.backquote_ssh(command).split('\n')
450 for line in ps_lines:
451 if not line.strip() or line.find('PID') >= 0: continue
452 m = build_matcher.match(line)
454 date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
455 buildname = m.group('buildname').replace('@DATE@', date)
456 self.add_build(buildname, m.group('pid'))
458 m = build_matcher_initvm.match(line)
460 # buildname is expansed here
461 self.add_build(buildname, m.group('pid'))
463 header('BuildLxcBox.sense: command {} returned line that failed to match'.format(command))
464 header(">>{}<<".format(line))
466 ############################################################
468 def __init__ (self, plcbox):
469 self.plc_box = plcbox
473 def set_timestamp (self,timestamp):
474 self.timestamp = timestamp
476 self.timestamp = int(time.time())
477 def pretty_timestamp (self):
478 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
480 class PlcLxcInstance (PlcInstance):
481 # does lxc have a context id of any kind ?
482 def __init__ (self, plcbox, lxcname, pid):
483 PlcInstance.__init__(self, plcbox)
484 self.lxcname = lxcname
488 return self.lxcname.split('-')[-1]
489 def buildname (self):
490 return self.lxcname.rsplit('-',2)[0]
493 msg="== {} ==".format(self.vplcname())
494 msg += " [={}]".format(self.lxcname)
495 if self.pid==-1: msg+=" not (yet?) running"
496 else: msg+=" (pid={})".format(self.pid)
497 if self.timestamp: msg += " @ {}".format(self.pretty_timestamp())
498 else: msg += " *unknown timestamp*"
502 command="rsync lxc-driver.sh {}:/root".format(self.plc_box.hostname)
503 subprocess.getstatusoutput(command)
504 msg="lxc container stopping {} on {}".format(self.lxcname, self.plc_box.hostname)
505 self.plc_box.run_ssh(['/root/lxc-driver.sh', '-c', 'stop_lxc', '-n', self.lxcname], msg)
506 self.plc_box.forget(self)
510 def __init__ (self, hostname, max_plcs):
511 Box.__init__(self, hostname)
512 self.plc_instances = []
513 self.max_plcs = max_plcs
515 def free_slots (self):
516 return self.max_plcs - len(self.plc_instances)
518 # fill one slot even though this one is not started yet
519 def add_dummy (self, plcname):
520 dummy=PlcLxcInstance(self, 'dummy_'+plcname, 0)
522 self.plc_instances.append(dummy)
524 def forget (self, plc_instance):
525 self.plc_instances.remove(plc_instance)
527 def reboot (self, options):
529 Box.reboot(self, options)
531 self.soft_reboot (options)
533 def list(self, verbose=False):
534 if not self.plc_instances:
535 header ('No plc running on {}'.format(self.line()))
537 header ("Active plc VMs on {}".format(self.line()))
538 self.plc_instances.sort(key=timestamp_key)
539 for p in self.plc_instances:
540 header (p.line(), banner=False)
542 ## we do not this at INRIA any more
543 class PlcLxcBox (PlcBox):
545 def add_lxc (self, lxcname, pid):
546 for plc in self.plc_instances:
547 if plc.lxcname == lxcname:
548 header("WARNING, duplicate myplc {} running on {}"\
549 .format(lxcname, self.hostname), banner=False)
551 self.plc_instances.append(PlcLxcInstance(self, lxcname, pid))
554 # a line describing the box
556 return "{} [max={},free={}] ({})".format(self.hostname_fedora(virt="lxc"),
557 self.max_plcs, self.free_slots(),
560 def plc_instance_by_lxcname(self, lxcname):
561 for p in self.plc_instances:
562 if p.lxcname == lxcname:
566 # essentially shutdown all running containers
567 def soft_reboot(self, options):
568 command="rsync lxc-driver.sh {}:/root".format(self.hostname)
569 subprocess.getstatusoutput(command)
570 self.run_ssh( ['/root/lxc-driver.sh','-c','stop_all'],
571 "Stopping all running lxc containers on {}".format(self.hostname),
572 dry_run=options.dry_run)
575 # sense is expected to fill self.plc_instances with PlcLxcInstance's
576 # to describe the currently running VM's
577 def sense(self, options):
579 command = "rsync lxc-driver.sh {}:/root".format(self.hostname)
580 subprocess.getstatusoutput(command)
581 command = ['/root/lxc-driver.sh', '-c', 'sense_all']
582 lxc_stat = self.backquote_ssh (command)
583 for lxc_line in lxc_stat.split("\n"):
584 if not lxc_line: continue
585 lxcname = lxc_line.split(";")[0]
586 pid = lxc_line.split(";")[1]
587 timestamp = lxc_line.split(";")[2]
588 self.add_lxc(lxcname,pid)
589 try: timestamp = int(timestamp)
590 except: timestamp = 0
591 p = self.plc_instance_by_lxcname(lxcname)
593 print('WARNING zombie plc',self.hostname,lxcname)
594 print('... was expecting',lxcname,'in',[i.lxcname for i in self.plc_instances])
596 p.set_timestamp(timestamp)
598 ############################################################
600 def __init__(self, nodename, pid, qemubox):
601 self.nodename = nodename
603 self.qemu_box = qemubox
605 self.buildname = None
608 def set_buildname (self, buildname):
609 self.buildname = buildname
610 def set_timestamp (self, timestamp):
611 self.timestamp = timestamp
613 self.timestamp = int(time.time())
614 def pretty_timestamp (self):
615 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
618 msg = "== {} ==".format(short_hostname(self.nodename))
619 msg += " [={}]".format(self.buildname)
620 if self.pid: msg += " (pid={})".format(self.pid)
621 else: msg += " not (yet?) running"
622 if self.timestamp: msg += " @ {}".format(self.pretty_timestamp())
623 else: msg += " *unknown timestamp*"
628 print("cannot kill qemu {} with pid==0".format(self.nodename))
630 msg = "Killing qemu {} with pid={} on box {}".format(self.nodename, self.pid, self.qemu_box.hostname)
631 self.qemu_box.run_ssh(['kill', "{}".format(self.pid)], msg)
632 self.qemu_box.forget(self)
636 def __init__ (self, hostname, max_qemus):
637 Box.__init__(self, hostname)
638 self.qemu_instances = []
639 self.max_qemus = max_qemus
641 def add_node(self, nodename, pid):
642 for qemu in self.qemu_instances:
643 if qemu.nodename == nodename:
644 header("WARNING, duplicate qemu {} running on {}"\
645 .format(nodename,self.hostname), banner=False)
647 self.qemu_instances.append(QemuInstance(nodename, pid, self))
649 def node_names (self):
650 return [ qi.nodename for qi in self.qemu_instances ]
652 def forget (self, qemu_instance):
653 self.qemu_instances.remove(qemu_instance)
655 # fill one slot even though this one is not started yet
656 def add_dummy(self, nodename):
657 dummy=QemuInstance('dummy_'+nodename, 0, self)
659 self.qemu_instances.append(dummy)
662 return "{} [max={},free={}] ({}) {}"\
663 .format(self.hostname_fedora(virt="qemu"),
664 self.max_qemus, self.free_slots(),
665 self.uptime(), self.driver())
667 def list(self, verbose=False):
668 if not self.qemu_instances:
669 header ('No qemu on {}'.format(self.line()))
671 header ("Qemus on {}".format(self.line()))
672 self.qemu_instances.sort(key=timestamp_key)
673 for q in self.qemu_instances:
674 header (q.line(), banner=False)
676 def free_slots (self):
677 return self.max_qemus - len(self.qemu_instances)
680 if hasattr(self,'_driver') and self._driver:
682 return '*undef* driver'
684 def qemu_instance_by_pid(self, pid):
685 for q in self.qemu_instances:
690 def qemu_instance_by_nodename_buildname (self, nodename, buildname):
691 for q in self.qemu_instances:
692 if q.nodename == nodename and q.buildname == buildname:
696 def reboot (self, options):
698 Box.reboot(self, options)
700 self.run_ssh(['pkill','qemu'], "Killing qemu instances",
701 dry_run=options.dry_run)
703 matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
705 def sense(self, options):
707 modules = self.backquote_ssh(['lsmod']).split('\n')
708 self._driver = '*NO kqemu/kvm_intel MODULE LOADED*'
709 for module in modules:
710 if module.find('kqemu') == 0:
711 self._driver = 'kqemu module loaded'
712 # kvm might be loaded without kvm_intel (we dont have AMD)
713 elif module.find('kvm_intel') == 0:
714 self._driver = 'kvm_intel OK'
715 ########## find out running pids
716 pids = self.backquote_ssh(['pgrep','qemu'])
719 command = ['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
720 ps_lines = self.backquote_ssh(command).split("\n")
721 for line in ps_lines:
722 if not line.strip() or line.find('PID') >=0 :
724 m = QemuBox.matcher.match(line)
726 self.add_node(m.group('nodename'), m.group('pid'))
728 header('QemuBox.sense: command {} returned line that failed to match'.format(command))
729 header(">>{}<<".format(line))
730 ########## retrieve alive instances and map to build
732 command = ['grep', '.', '/vservers/*/*/qemu.pid', '/dev/null']
733 pid_lines = self.backquote_ssh(command, trash_err=True).split('\n')
734 for pid_line in pid_lines:
735 if not pid_line.strip():
737 # expect <build>/<nodename>/qemu.pid:<pid>pid
739 (_, __, buildname, nodename, tail) = pid_line.split('/')
740 (_,pid) = tail.split(':')
741 q = self.qemu_instance_by_pid(pid)
744 q.set_buildname(buildname)
745 live_builds.append(buildname)
747 print('WARNING, could not parse pid line', pid_line)
748 # retrieve timestamps
751 command = ['grep','.']
752 command += ['/vservers/{}/*/timestamp'.format(b) for b in live_builds]
753 command += ['/dev/null']
754 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
755 for ts_line in ts_lines:
756 if not ts_line.strip():
758 # expect <build>/<nodename>/timestamp:<timestamp>
760 (_, __, buildname, nodename, tail) = ts_line.split('/')
761 nodename = nodename.replace('qemu-', '')
762 (_, timestamp) = tail.split(':')
763 timestamp = int(timestamp)
764 q = self.qemu_instance_by_nodename_buildname(nodename, buildname)
766 # this warning corresponds to qemu instances that were not killed properly
767 # and that have a dangling qemu.pid - and not even all of them as they need
768 # to be attached to a build that has a node running...
769 # it is more confusing than helpful, so let's just trash it
770 #print 'WARNING zombie qemu',self.hostname,ts_line
771 #print '... was expecting (',short_hostname(nodename),buildname,') in',\
772 # [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
774 q.set_timestamp(timestamp)
776 print('WARNING, could not parse ts line',ts_line)
780 def __init__(self, buildname, pid=0):
784 self.buildname = buildname
788 self.broken_steps = []
791 def set_timestamp(self, timestamp):
792 self.timestamp = timestamp
794 self.timestamp = int(time.time())
795 def pretty_timestamp(self):
796 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
797 def is_running (self):
798 return len(self.pids) != 0
799 def add_pid(self, pid):
800 self.pids.append(pid)
801 def set_broken(self, plcindex, step):
802 self.broken_steps.append( (plcindex, step,) )
804 def second_letter(self):
805 if not self.broken_steps:
808 really_broken = [ step for (i,step) in self.broken_steps if '_ignore' not in step ]
809 # W is for warning like what's in the build mail
810 if len(really_broken) == 0:
816 # make up a 2-letter sign
817 # first letter : '=', unless build is running : '*'
818 double = '*' if self.pids else '='
819 # second letter : '=' if fine, 'W' for warnings (only ignored steps) 'B' for broken
820 letter2 = self.second_letter()
822 msg = " {} {} ==".format(double, self.buildname)
825 elif len(self.pids)==1:
826 msg += " (pid={})".format(self.pids[0])
828 msg += " !!!pids={}!!!".format(self.pids)
829 msg += " @{}".format(self.pretty_timestamp())
831 msg2 = ( ' BROKEN' if letter2 == 'B' else ' WARNING' )
832 # sometimes we have an empty plcindex
833 msg += " [{}=".format(msg2) \
834 + " ".join(["{}@{}".format(s, i) if i else s for (i, s) in self.broken_steps]) \
839 def __init__(self, hostname):
840 Box.__init__(self, hostname)
841 self.starting_ips = []
842 self.test_instances = []
844 def reboot(self, options):
845 # can't reboot a vserver VM
846 self.run_ssh(['pkill', 'run_log'], "Terminating current runs",
847 dry_run=options.dry_run)
848 self.run_ssh(['rm', '-f', Starting.location], "Cleaning {}".format(Starting.location),
849 dry_run=options.dry_run)
851 def get_test(self, buildname):
852 for i in self.test_instances:
853 if i.buildname == buildname:
856 # we scan ALL remaining test results, even the ones not running
857 def add_timestamp(self, buildname, timestamp):
858 i = self.get_test(buildname)
860 i.set_timestamp(timestamp)
862 i = TestInstance(buildname, 0)
863 i.set_timestamp(timestamp)
864 self.test_instances.append(i)
866 def add_running_test(self, pid, buildname):
867 i = self.get_test(buildname)
869 self.test_instances.append(TestInstance(buildname, pid))
872 print("WARNING: 2 concurrent tests run on same build {}".format(buildname))
875 def add_broken(self, buildname, plcindex, step):
876 i = self.get_test(buildname)
878 i = TestInstance(buildname)
879 self.test_instances.append(i)
880 i.set_broken(plcindex, step)
882 matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
883 matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
884 matcher_grep_missing=re.compile ("grep: /root/(?P<buildname>[^/]+)/logs/trace: No such file or directory")
886 def sense(self, options):
888 self.starting_ips = [x for x in self.backquote_ssh(['cat',Starting.location], trash_err=True).strip().split('\n') if x]
890 # scan timestamps on all tests
891 # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
892 # xxx would make sense above too
893 command = ['bash', '-c', "grep . /root/*/timestamp /dev/null"]
894 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
895 for ts_line in ts_lines:
896 if not ts_line.strip():
898 # expect /root/<buildname>/timestamp:<timestamp>
900 (ts_file, timestamp) = ts_line.split(':')
901 ts_file = os.path.dirname(ts_file)
902 buildname = os.path.basename(ts_file)
903 timestamp = int(timestamp)
904 t = self.add_timestamp(buildname, timestamp)
906 print('WARNING, could not parse ts line', ts_line)
908 # let's try to be robust here -- tests that fail very early like e.g.
909 # "Cannot make space for a PLC instance: vplc IP pool exhausted", that occurs as part of provision
910 # will result in a 'trace' symlink to an inexisting 'trace-<>.txt' because no step has gone through
911 # simple 'trace' should exist though as it is created by run_log
912 command = ['bash', '-c', "grep KO /root/*/logs/trace /dev/null 2>&1" ]
913 trace_lines = self.backquote_ssh(command).split('\n')
914 for line in trace_lines:
917 m = TestBox.matcher_grep_missing.match(line)
919 buildname = m.group('buildname')
920 self.add_broken(buildname, '', 'NO STEP DONE')
922 m = TestBox.matcher_grep.match(line)
924 buildname = m.group('buildname')
925 plcindex = m.group('plcindex')
926 step = m.group('step')
927 self.add_broken(buildname, plcindex, step)
929 header("TestBox.sense: command {} returned line that failed to match\n{}".format(command, line))
930 header(">>{}<<".format(line))
932 pids = self.backquote_ssh (['pgrep', 'run_log'], trash_err=True)
935 command = ['ls','-ld'] + ["/proc/{}/cwd".format(pid) for pid in pids.split("\n") if pid]
936 ps_lines = self.backquote_ssh(command).split('\n')
937 for line in ps_lines:
940 m = TestBox.matcher_proc.match(line)
943 buildname = m.group('buildname')
944 self.add_running_test(pid, buildname)
946 header("TestBox.sense: command {} returned line that failed to match\n{}".format(command, line))
947 header(">>{}<<".format(line))
951 return self.hostname_fedora()
953 def list (self, verbose=False):
954 # verbose shows all tests
956 instances = self.test_instances
959 instances = [ i for i in self.test_instances if i.is_running() ]
963 header ("No {} on {}".format(msg, self.line()))
965 header ("{} on {}".format(msg, self.line()))
966 instances.sort(key=timestamp_key)
969 # show 'starting' regardless of verbose
970 if self.starting_ips:
971 header("Starting IP addresses on {}".format(self.line()))
972 self.starting_ips.sort()
973 for starting in self.starting_ips:
976 header("Empty 'starting' on {}".format(self.line()))
978 ############################################################
984 self.options = Options()
985 self.options.dry_run = False
986 self.options.verbose = False
987 self.options.reboot = False
988 self.options.soft = False
989 self.test_box = TestBox (self.test_box_spec())
990 self.build_lxc_boxes = [ BuildLxcBox(h) for h in self.build_lxc_boxes_spec() ]
991 self.plc_lxc_boxes = [ PlcLxcBox (h, m) for (h, m) in self.plc_lxc_boxes_spec ()]
992 self.qemu_boxes = [ QemuBox (h, m) for (h, m) in self.qemu_boxes_spec ()]
995 self.vplc_pool = Pool(self.vplc_ips(), "for vplcs", self)
996 self.vnode_pool = Pool(self.vnode_ips(), "for vnodes", self)
998 self.build_boxes = self.build_lxc_boxes
999 self.plc_boxes = self.plc_lxc_boxes
1000 self.default_boxes = self.plc_boxes + self.qemu_boxes
1001 self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
1003 def summary_line (self):
1005 msg += " {} xp".format(len(self.plc_lxc_boxes))
1006 msg += " {} tried plc boxes".format(len(self.plc_boxes))
1010 def fqdn (self, hostname):
1011 if hostname.find('.') < 0:
1012 return "{}.{}".format(hostname, self.domain())
1015 # return True if actual sensing takes place
1016 def sense(self, force=False):
1017 if self._sensed and not force:
1019 print('Sensing local substrate...', end=' ')
1020 for b in self.default_boxes:
1021 b.sense(self.options)
1026 def list(self, verbose=False):
1027 for b in self.default_boxes:
1030 def add_dummy_plc(self, plc_boxname, plcname):
1031 for pb in self.plc_boxes:
1032 if pb.hostname == plc_boxname:
1033 pb.add_dummy(plcname)
1035 def add_dummy_qemu(self, qemu_boxname, qemuname):
1036 for qb in self.qemu_boxes:
1037 if qb.hostname == qemu_boxname:
1038 qb.add_dummy(qemuname)
1041 def add_starting_dummy(self, bname, vname):
1042 return self.add_dummy_plc(bname, vname) or self.add_dummy_qemu(bname, vname)
1045 def provision(self, plcs, options):
1047 # attach each plc to a plc box and an IP address
1048 plcs = [ self.provision_plc(plc, options) for plc in plcs ]
1049 # attach each node/qemu to a qemu box with an IP address
1050 plcs = [ self.provision_qemus(plc,options) for plc in plcs ]
1051 # update the SFA spec accordingly
1052 plcs = [ self.localize_sfa_rspec(plc, options) for plc in plcs ]
1055 except Exception as e:
1056 print('* Could not provision this test on current substrate','--',e,'--','exiting')
1057 traceback.print_exc()
1060 # it is expected that a couple of options like ips_bplc and ips_vplc
1061 # are set or unset together
1063 def check_options(x, y):
1066 return len(x) == len(y)
1068 # find an available plc box (or make space)
1069 # and a free IP address (using options if present)
1070 def provision_plc(self, plc, options):
1072 assert Substrate.check_options(options.ips_bplc, options.ips_vplc)
1074 #### let's find an IP address for that plc
1076 if options.ips_vplc:
1078 # we don't check anything here,
1079 # it is the caller's responsability to cleanup and make sure this makes sense
1080 plc_boxname = options.ips_bplc.pop()
1081 vplc_hostname = options.ips_vplc.pop()
1086 vplc_hostname = None
1087 # try to find an available IP
1088 self.vplc_pool.sense()
1089 couple = self.vplc_pool.next_free()
1091 (vplc_hostname, unused) = couple
1092 #### we need to find one plc box that still has a slot
1094 # use the box that has max free spots for load balancing
1095 for pb in self.plc_boxes:
1096 free = pb.free_slots()
1098 plc_boxname = pb.hostname
1100 # if there's no available slot in the plc_boxes, or we need a free IP address
1101 # make space by killing the oldest running instance
1102 if not plc_boxname or not vplc_hostname:
1103 # find the oldest of all our instances
1104 all_plc_instances = reduce(lambda x, y: x+y,
1105 [ pb.plc_instances for pb in self.plc_boxes ],
1107 all_plc_instances.sort(key=timestamp_key)
1109 plc_instance_to_kill = all_plc_instances[0]
1113 msg += " PLC boxes are full"
1114 if not vplc_hostname:
1115 msg += " vplc IP pool exhausted"
1116 msg += " {}".format(self.summary_line())
1117 raise Exception("Cannot make space for a PLC instance:" + msg)
1118 freed_plc_boxname = plc_instance_to_kill.plc_box.hostname
1119 freed_vplc_hostname = plc_instance_to_kill.vplcname()
1120 message = 'killing oldest plc instance = {} on {}'\
1121 .format(plc_instance_to_kill.line(), freed_plc_boxname)
1122 plc_instance_to_kill.kill()
1123 # use this new plcbox if that was the problem
1125 plc_boxname = freed_plc_boxname
1126 # ditto for the IP address
1127 if not vplc_hostname:
1128 vplc_hostname = freed_vplc_hostname
1129 # record in pool as mine
1130 self.vplc_pool.set_mine(vplc_hostname)
1133 self.add_dummy_plc(plc_boxname, plc['name'])
1134 vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
1135 self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
1137 #### compute a helpful vserver name
1138 # remove domain in hostname
1139 vplc_short = short_hostname(vplc_hostname)
1140 vservername = "{}-{}-{}".format(options.buildname, plc['index'], vplc_short)
1141 plc_name = "{}_{}".format(plc['name'], vplc_short)
1143 utils.header('PROVISION plc {} in box {} at IP {} as {}'\
1144 .format(plc['name'], plc_boxname, vplc_hostname, vservername))
1146 #### apply in the plc_spec
1148 # label = options.personality.replace("linux","")
1149 mapper = {'plc' : [ ('*' , {'host_box' : plc_boxname,
1151 'vservername' : vservername,
1152 'vserverip' : vplc_ip,
1153 'settings:PLC_DB_HOST' : vplc_hostname,
1154 'settings:PLC_API_HOST' : vplc_hostname,
1155 'settings:PLC_BOOT_HOST' : vplc_hostname,
1156 'settings:PLC_WWW_HOST' : vplc_hostname,
1157 'settings:PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
1158 'settings:PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
1163 # mappers only work on a list of plcs
1164 return TestMapper([plc], options).map(mapper)[0]
1167 def provision_qemus(self, plc, options):
1169 assert Substrate.check_options(options.ips_bnode, options.ips_vnode)
1171 test_mapper = TestMapper([plc], options)
1172 nodenames = test_mapper.node_names()
1174 for nodename in nodenames:
1176 if options.ips_vnode:
1177 # as above, it's a rerun, take it for granted
1178 qemu_boxname = options.ips_bnode.pop()
1179 vnode_hostname = options.ips_vnode.pop()
1184 vnode_hostname = None
1185 # try to find an available IP
1186 self.vnode_pool.sense()
1187 couple = self.vnode_pool.next_free()
1189 (vnode_hostname, unused) = couple
1190 # find a physical box
1192 # use the box that has max free spots for load balancing
1193 for qb in self.qemu_boxes:
1194 free = qb.free_slots()
1196 qemu_boxname = qb.hostname
1198 # if we miss the box or the IP, kill the oldest instance
1199 if not qemu_boxname or not vnode_hostname:
1200 # find the oldest of all our instances
1201 all_qemu_instances = reduce(lambda x, y: x+y,
1202 [ qb.qemu_instances for qb in self.qemu_boxes ],
1204 all_qemu_instances.sort(key=timestamp_key)
1206 qemu_instance_to_kill = all_qemu_instances[0]
1209 if not qemu_boxname:
1210 msg += " QEMU boxes are full"
1211 if not vnode_hostname:
1212 msg += " vnode IP pool exhausted"
1213 msg += " {}".format(self.summary_line())
1214 raise Exception("Cannot make space for a QEMU instance:"+msg)
1215 freed_qemu_boxname = qemu_instance_to_kill.qemu_box.hostname
1216 freed_vnode_hostname = short_hostname(qemu_instance_to_kill.nodename)
1218 message = 'killing oldest qemu node = {} on {}'.format(qemu_instance_to_kill.line(),
1220 qemu_instance_to_kill.kill()
1221 # use these freed resources where needed
1222 if not qemu_boxname:
1223 qemu_boxname = freed_qemu_boxname
1224 if not vnode_hostname:
1225 vnode_hostname = freed_vnode_hostname
1226 self.vnode_pool.set_mine(vnode_hostname)
1228 self.add_dummy_qemu(qemu_boxname, vnode_hostname)
1229 mac = self.vnode_pool.retrieve_userdata(vnode_hostname)
1230 ip = self.vnode_pool.get_ip(vnode_hostname)
1231 self.vnode_pool.add_starting(vnode_hostname, qemu_boxname)
1233 vnode_fqdn = self.fqdn(vnode_hostname)
1234 nodemap = {'host_box' : qemu_boxname,
1235 'node_fields:hostname' : vnode_fqdn,
1236 'interface_fields:ip' : ip,
1237 'ipaddress_fields:ip_addr' : ip,
1238 'interface_fields:mac' : mac,
1240 nodemap.update(self.network_settings())
1241 maps.append( (nodename, nodemap) )
1243 utils.header("PROVISION node {} in box {} at IP {} with MAC {}"\
1244 .format(nodename, qemu_boxname, vnode_hostname, mac))
1246 return test_mapper.map({'node':maps})[0]
1248 def localize_sfa_rspec(self, plc, options):
1250 plc['sfa']['settings']['SFA_REGISTRY_HOST'] = plc['settings']['PLC_DB_HOST']
1251 plc['sfa']['settings']['SFA_AGGREGATE_HOST'] = plc['settings']['PLC_DB_HOST']
1252 plc['sfa']['settings']['SFA_SM_HOST'] = plc['settings']['PLC_DB_HOST']
1253 plc['sfa']['settings']['SFA_DB_HOST'] = plc['settings']['PLC_DB_HOST']
1254 plc['sfa']['settings']['SFA_PLC_URL'] = 'https://{}:443/PLCAPI/'.format(plc['settings']['PLC_API_HOST'])
1257 #################### release:
1258 def release(self, options):
1259 self.vplc_pool.release_my_starting()
1260 self.vnode_pool.release_my_starting()
1263 #################### show results for interactive mode
1264 def get_box(self, boxname):
1265 for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1266 if b.shortname() == boxname:
1269 if b.shortname() == boxname.split('.')[0]:
1273 print("Could not find box {}".format(boxname))
1276 # deal with the mix of boxes and names and stores the current focus
1277 # as a list of Box instances in self.focus_all
1278 def normalize(self, box_or_names):
1280 for box in box_or_names:
1281 if not isinstance(box, Box):
1282 box = self.get_box(box)
1284 print('Warning - could not handle box',box)
1285 self.focus_all.append(box)
1287 self.focus_build = [ x for x in self.focus_all if isinstance(x, BuildBox) ]
1288 self.focus_plc = [ x for x in self.focus_all if isinstance(x, PlcBox) ]
1289 self.focus_qemu = [ x for x in self.focus_all if isinstance(x, QemuBox) ]
1291 def list_boxes(self):
1292 print('Sensing', end=' ')
1293 for box in self.focus_all:
1294 box.sense(self.options)
1296 for box in self.focus_all:
1297 box.list(self.options.verbose)
1299 def reboot_boxes(self):
1300 for box in self.focus_all:
1301 box.reboot(self.options)
1303 def sanity_check(self):
1304 print('Sanity check')
1305 self.sanity_check_plc()
1306 self.sanity_check_qemu()
1308 def sanity_check_plc(self):
1311 def sanity_check_qemu(self):
1313 for box in self.focus_qemu:
1314 all_nodes += box.node_names()
1316 for node in all_nodes:
1317 if node not in hash:
1320 for (node,count) in list(hash.items()):
1322 print('WARNING - duplicate node', node)
1325 ####################
1326 # can be run as a utility to probe/display/manage the local infrastructure
1328 parser = OptionParser()
1329 parser.add_option('-r', "--reboot", action='store_true', dest='reboot', default=False,
1330 help='reboot mode (use shutdown -r)')
1331 parser.add_option('-s', "--soft", action='store_true', dest='soft', default=False,
1332 help='soft mode for reboot (terminates processes)')
1333 parser.add_option('-t', "--testbox", action='store_true', dest='testbox', default=False,
1334 help='add test box')
1335 parser.add_option('-b', "--build", action='store_true', dest='builds', default=False,
1336 help='add build boxes')
1337 parser.add_option('-p', "--plc", action='store_true', dest='plcs', default=False,
1338 help='add plc boxes')
1339 parser.add_option('-q', "--qemu", action='store_true', dest='qemus', default=False,
1340 help='add qemu boxes')
1341 parser.add_option('-a', "--all", action='store_true', dest='all', default=False,
1342 help='address all known boxes, like -b -t -p -q')
1343 parser.add_option('-v', "--verbose", action='store_true', dest='verbose', default=False,
1344 help='verbose mode')
1345 parser.add_option('-n', "--dry_run", action='store_true', dest='dry_run', default=False,
1346 help='dry run mode')
1347 (self.options, args) = parser.parse_args()
1350 if self.options.testbox: boxes += [self.test_box]
1351 if self.options.builds: boxes += self.build_boxes
1352 if self.options.plcs: boxes += self.plc_boxes
1353 if self.options.qemus: boxes += self.qemu_boxes
1354 if self.options.all: boxes += self.all_boxes
1357 verbose = self.options.verbose
1358 # default scope is -b -p -q -t
1360 boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box]
1362 self.normalize(boxes)
1364 if self.options.reboot: