2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010-2015 INRIA
5 # #################### history
7 # see also Substrate.readme
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity,
14 # hence the 'timestamp-*' steps in TestPlc
16 # this should be much more flexible:
17 # * supports several plc boxes
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
21 # #################### howto use
23 # each site is to write its own LocalSubstrate.py,
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
28 # . the vserver-capable boxes used for hosting myplcs
29 # . and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
32 # . the kvm-qemu capable boxes to host qemu instances
33 # . and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
36 # #################### implem. note
38 # this model relies on 'sensing' the substrate,
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies
41 # however there is a 'black hole' between the time where a given address is
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class
47 # ####################
55 from optparse import OptionParser
58 from TestSsh import TestSsh
59 from TestMapper import TestMapper
60 from functools import reduce
62 # too painful to propagate this cleanly
65 def header (message, banner=True):
66 if not message: return
68 print("===============", end=' ')
72 def timestamp_key(o): return o.timestamp
74 def short_hostname (hostname):
75 return hostname.split('.')[0]
78 # the place were other test instances tell about their not-yet-started
79 # instances, that go undetected through sensing
82 location = '/root/starting'
92 with open(Starting.location) as starting:
93 self.tuples = [line.strip().split('@') for line in starting.readlines()]
99 return [ x for (x, _) in self.tuples ]
101 def add (self, vname, bname):
102 if not vname in self.vnames():
103 with open(Starting.location, 'a') as out:
104 out.write("{}@{}\n".format(vname, bname))
106 def delete_vname (self, vname):
108 if vname in self.vnames():
109 with open(Starting.location, 'w') as f:
110 for (v, b) in self.tuples:
112 f.write("{}@{}\n".format(v, b))
116 # allows to pick an available IP among a pool
117 # input is expressed as a list of tuples (hostname,ip,user_data)
118 # that can be searched iteratively for a free slot
120 # pool = [ (hostname1,user_data1),
121 # (hostname2,user_data2),
122 # (hostname3,user_data2),
123 # (hostname4,user_data4) ]
124 # assuming that ip1 and ip3 are taken (pingable), then we'd get
126 # pool.next_free() -> entry2
127 # pool.next_free() -> entry4
128 # pool.next_free() -> None
129 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
132 def __init__ (self, hostname, userdata):
133 self.hostname = hostname
134 self.userdata = userdata
135 # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
136 # 'mine' is for our own stuff, 'starting' from the concurrent tests
141 return "<PoolItem {} {}>".format(self.hostname, self.userdata)
144 return "Pooled {} ({}) -> {}".format(self.hostname, self.userdata, self.status)
147 if self.status == None: return '?'
148 elif self.status == 'busy': return '*'
149 elif self.status == 'free': return '.'
150 elif self.status == 'mine': return 'M'
151 elif self.status == 'starting': return 'S'
156 self.ip = socket.gethostbyname(self.hostname)
161 def __init__ (self, tuples, message, substrate):
162 self.pool_items = [ PoolItem (hostname, userdata) for (hostname, userdata) in tuples ]
163 self.message = message
164 # where to send notifications upon load_starting
165 self.substrate = substrate
168 return "<Pool {} : {} .. {}>".format(self.message, self.pool_items[0], self.pool_items[-1])
170 def list (self, verbose=False):
172 for i in self.pool_items: print(i.line())
176 for i in self.pool_items: line += ' ' + i.char()
179 def _item (self, hostname):
180 for i in self.pool_items:
181 if i.hostname == hostname: return i
182 raise Exception ("Could not locate hostname {} in pool {}".format(hostname, self.message))
184 def retrieve_userdata (self, hostname):
185 return self._item(hostname).userdata
187 def get_ip (self, hostname):
189 return self._item(hostname).get_ip()
191 return socket.gethostbyname(hostname)
193 def set_mine (self, hostname):
195 self._item(hostname).status='mine'
197 print('WARNING: host {} not found in IP pool {}'.format(hostname, self.message))
199 def next_free (self):
200 for i in self.pool_items:
201 if i.status == 'free':
203 return (i.hostname, i.userdata)
207 # we have a starting instance of our own
208 def add_starting (self, vname, bname):
209 Starting().add(vname, bname)
210 for i in self.pool_items:
211 if i.hostname == vname:
214 # load the starting instances from the common file
215 # remember that might be ours
216 # return the list of (vname,bname) that are not ours
217 def load_starting (self):
218 starting = Starting()
221 for (v, b) in starting.tuples:
222 for i in self.pool_items:
223 if i.hostname == v and i.status == 'free':
224 i.status = 'starting'
225 new_tuples.append( (v, b,) )
228 def release_my_starting (self):
229 for i in self.pool_items:
230 if i.status == 'mine':
231 Starting().delete_vname(i.hostname)
237 for item in self.pool_items:
238 if item.status is not None:
239 print(item.char(), end=' ')
241 if self.check_ping (item.hostname):
250 print('Sensing IP pool', self.message, end=' ')
254 for vname, bname in self.load_starting():
255 self.substrate.add_starting_dummy(bname, vname)
256 print("After having loaded 'starting': IP pool")
258 # OS-dependent ping option (support for macos, for convenience)
259 ping_timeout_option = None
260 # returns True when a given hostname/ip responds to ping
261 def check_ping (self, hostname):
262 if '.' not in hostname:
263 hostname = self.substrate.fqdn(hostname)
264 if not Pool.ping_timeout_option:
265 (status, osname) = subprocess.getstatusoutput("uname -s")
267 raise Exception("TestPool: Cannot figure your OS name")
268 if osname == "Linux":
269 Pool.ping_timeout_option = "-w"
270 elif osname == "Darwin":
271 Pool.ping_timeout_option = "-t"
273 command = "ping -c 1 {} 1 {}".format(Pool.ping_timeout_option, hostname)
274 (status, output) = subprocess.getstatusoutput(command)
277 #Ping command <{command}> has returned {status}
281 # """.format(**locals()))
286 def __init__ (self, hostname):
287 self.hostname = hostname
290 return "<Box {}>".format(self.hostname)
291 def shortname (self):
292 return short_hostname(self.hostname)
294 return TestSsh(self.hostname, username='root', unknown_host=False)
295 def reboot (self, options):
296 self.test_ssh().run("shutdown -r now",
297 message="Rebooting {}".format(self.hostname),
298 dry_run=options.dry_run)
300 def hostname_fedora (self, virt=None):
301 # this truly is an opening bracket
302 result = "{}".format(self.hostname) + " {"
304 result += "{}-".format(virt)
305 result += "{} {}".format(self.fedora(), self.memory())
306 # too painful to propagate this cleanly
309 result += "-{}".format(self.uname())
310 # and the matching closing bracket
314 separator = "===composite==="
317 # take this chance to gather useful stuff
320 if self._probed is not None: return self._probed
321 composite_command = [ ]
322 composite_command += [ "hostname" ]
323 composite_command += [ ";" , "echo", Box.separator , ";" ]
324 composite_command += [ "uptime" ]
325 composite_command += [ ";" , "echo", Box.separator , ";" ]
326 composite_command += [ "uname", "-r"]
327 composite_command += [ ";" , "echo", Box.separator , ";" ]
328 composite_command += [ "cat" , "/etc/fedora-release" ]
329 composite_command += [ ";" , "echo", Box.separator , ";" ]
330 composite_command += [ "grep", "MemTotal", "/proc/meminfo" ]
332 # due to colons and all, this is going wrong on the local box (typically testmaster)
333 # I am reluctant to change TestSsh as it might break all over the place, so
334 if self.test_ssh().is_local():
335 probe_argv = [ "bash", "-c", " ".join (composite_command) ]
337 probe_argv = self.test_ssh().actual_argv(composite_command)
338 composite = self.backquote ( probe_argv, trash_err=True )
339 self._hostname = self._uptime = self._uname = self._fedora = self._memory = "** Unknown **"
341 print("root@{} unreachable".format(self.hostname))
345 pieces = composite.split(Box.separator)
346 pieces = [ x.strip() for x in pieces ]
348 [hostname, uptime, uname, fedora, memory] = pieces
350 self._hostname = hostname
351 self._uptime = ', '.join([ x.strip() for x in uptime.split(',')[2:]]).replace("load average", "load")
353 self._fedora = fedora.replace("Fedora release ","f").split(" ")[0]
355 self._memory = int(memory.split()[1])/(1024)
356 except Exception as e:
358 print('BEG issue with pieces')
359 traceback.print_exc()
360 self._probed = self._hostname
363 # use argv=['bash','-c',"the command line"]
366 if hasattr(self,'_uptime') and self._uptime: return self._uptime
367 return '*unprobed* uptime'
370 if hasattr(self,'_uname') and self._uname: return self._uname
371 return '*unprobed* uname'
374 if hasattr(self,'_fedora') and self._fedora: return self._fedora
375 return '*unprobed* fedora'
378 if hasattr(self,'_memory') and self._memory: return "{} Mb".format(self._memory)
379 return '*unprobed* memory'
381 def run(self, argv, message=None, trash_err=False, dry_run=False):
383 print('DRY_RUN:', end=' ')
384 print(" ".join(argv))
389 return subprocess.call(argv)
391 with open('/dev/null', 'w') as null:
392 return subprocess.call(argv, stderr=null)
394 def run_ssh (self, argv, message, trash_err=False, dry_run=False):
395 ssh_argv = self.test_ssh().actual_argv(argv)
396 result = self.run (ssh_argv, message, trash_err, dry_run=dry_run)
398 print("WARNING: failed to run {} on {}".format(" ".join(argv), self.hostname))
401 def backquote (self, argv, trash_err=False):
402 # in python3 we need to set universal_newlines=True
404 out_err = subprocess.Popen(argv, stdout=subprocess.PIPE,
405 universal_newlines=True).communicate()
407 with open('/dev/null', 'w') as null:
408 out_err = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=null,
409 universal_newlines=True).communicate()
410 # only interested in stdout here
413 # if you have any shell-expanded arguments like *
414 # and if there's any chance the command is adressed to the local host
415 def backquote_ssh (self, argv, trash_err=False):
416 if not self.probe(): return ''
417 return self.backquote(self.test_ssh().actual_argv(argv), trash_err)
419 ############################################################
421 def __init__ (self, buildname, pid, buildbox):
422 self.buildname = buildname
423 self.buildbox = buildbox
426 return "<BuildInstance {} in {}>".format(self.buildname, self.buildbox)
428 def add_pid(self,pid):
429 self.pids.append(pid)
432 return "== {} == (pids={})".format(self.buildname, self.pids)
434 class BuildBox (Box):
435 def __init__ (self, hostname):
436 Box.__init__(self, hostname)
437 self.build_instances = []
439 return "<BuildBox {}>".format(self.hostname)
441 def add_build(self, buildname, pid):
442 for build in self.build_instances:
443 if build.buildname == buildname:
446 self.build_instances.append(BuildInstance(buildname, pid, self))
448 def list(self, verbose=False):
449 if not self.build_instances:
450 header ('No build process on {} ({})'.format(self.hostname_fedora(), self.uptime()))
452 header ("Builds on {} ({})".format(self.hostname_fedora(), self.uptime()))
453 for b in self.build_instances:
454 header (b.line(), banner=False)
456 def reboot (self, options):
458 Box.reboot(self, options)
460 self.soft_reboot (options)
462 build_matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
463 build_matcher_initvm=re.compile("\s*(?P<pid>[0-9]+).*initvm.*\s+(?P<buildname>[^\s]+)\s*\Z")
465 class BuildLxcBox (BuildBox):
466 def soft_reboot (self, options):
467 command=['pkill','lbuild']
468 self.run_ssh(command, "Terminating vbuild processes", dry_run=options.dry_run)
470 # inspect box and find currently running builds
471 def sense(self, options):
474 pids = self.backquote_ssh(['pgrep','lbuild'], trash_err=True)
476 command = ['ps', '-o', 'pid,command'] + [ pid for pid in pids.split("\n") if pid]
477 ps_lines = self.backquote_ssh(command).split('\n')
478 for line in ps_lines:
479 if not line.strip() or line.find('PID') >= 0: continue
480 m = build_matcher.match(line)
482 date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
483 buildname = m.group('buildname').replace('@DATE@', date)
484 self.add_build(buildname, m.group('pid'))
486 m = build_matcher_initvm.match(line)
488 # buildname is expansed here
489 self.add_build(buildname, m.group('pid'))
491 header('BuildLxcBox.sense: command {} returned line that failed to match'.format(command))
492 header(">>{}<<".format(line))
494 ############################################################
496 def __init__ (self, plcbox):
497 self.plc_box = plcbox
501 return "<PlcInstance {}>".format(self.plc_box)
503 def set_timestamp (self,timestamp):
504 self.timestamp = timestamp
506 self.timestamp = int(time.time())
507 def pretty_timestamp (self):
508 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
510 class PlcLxcInstance (PlcInstance):
511 # does lxc have a context id of any kind ?
512 def __init__ (self, plcbox, lxcname, pid):
513 PlcInstance.__init__(self, plcbox)
514 self.lxcname = lxcname
517 return "<PlcLxcInstance {}>".format(self.lxcname)
520 return self.lxcname.split('-')[-1]
521 def buildname (self):
522 return self.lxcname.rsplit('-',2)[0]
525 msg="== {} ==".format(self.vplcname())
526 msg += " [={}]".format(self.lxcname)
527 if self.pid==-1: msg+=" not (yet?) running"
528 else: msg+=" (pid={})".format(self.pid)
529 if self.timestamp: msg += " @ {}".format(self.pretty_timestamp())
530 else: msg += " *unknown timestamp*"
534 command="rsync lxc-driver.sh {}:/root".format(self.plc_box.hostname)
535 subprocess.getstatusoutput(command)
536 msg="lxc container stopping {} on {}".format(self.lxcname, self.plc_box.hostname)
537 self.plc_box.run_ssh(['/root/lxc-driver.sh', '-c', 'stop_lxc', '-n', self.lxcname], msg)
538 self.plc_box.forget(self)
542 def __init__ (self, hostname, max_plcs):
543 Box.__init__(self, hostname)
544 self.plc_instances = []
545 self.max_plcs = max_plcs
547 return "<PlcBox {}>".format(self.hostname)
549 def free_slots (self):
550 return self.max_plcs - len(self.plc_instances)
552 # fill one slot even though this one is not started yet
553 def add_dummy (self, plcname):
554 dummy=PlcLxcInstance(self, 'dummy_'+plcname, 0)
556 self.plc_instances.append(dummy)
558 def forget (self, plc_instance):
559 self.plc_instances.remove(plc_instance)
561 def reboot (self, options):
563 Box.reboot(self, options)
565 self.soft_reboot (options)
567 def list(self, verbose=False):
568 if not self.plc_instances:
569 header ('No plc running on {}'.format(self.line()))
571 header ("Active plc VMs on {}".format(self.line()))
572 self.plc_instances.sort(key=timestamp_key)
573 for p in self.plc_instances:
574 header (p.line(), banner=False)
576 ## we do not this at INRIA any more
577 class PlcLxcBox (PlcBox):
579 def add_lxc (self, lxcname, pid):
580 for plc in self.plc_instances:
581 if plc.lxcname == lxcname:
582 header("WARNING, duplicate myplc {} running on {}"\
583 .format(lxcname, self.hostname), banner=False)
585 self.plc_instances.append(PlcLxcInstance(self, lxcname, pid))
588 # a line describing the box
590 return "{} [max={},free={}] ({})".format(self.hostname_fedora(virt="lxc"),
591 self.max_plcs, self.free_slots(),
594 def plc_instance_by_lxcname(self, lxcname):
595 for p in self.plc_instances:
596 if p.lxcname == lxcname:
600 # essentially shutdown all running containers
601 def soft_reboot(self, options):
602 command="rsync lxc-driver.sh {}:/root".format(self.hostname)
603 subprocess.getstatusoutput(command)
604 self.run_ssh( ['/root/lxc-driver.sh','-c','stop_all'],
605 "Stopping all running lxc containers on {}".format(self.hostname),
606 dry_run=options.dry_run)
609 # sense is expected to fill self.plc_instances with PlcLxcInstance's
610 # to describe the currently running VM's
611 def sense(self, options):
614 command = "rsync lxc-driver.sh {}:/root".format(self.hostname)
615 subprocess.getstatusoutput(command)
616 command = ['/root/lxc-driver.sh', '-c', 'sense_all']
617 lxc_stat = self.backquote_ssh (command)
618 for lxc_line in lxc_stat.split("\n"):
621 # we mix build and plc VMs
622 if 'vplc' not in lxc_line:
624 lxcname = lxc_line.split(";")[0]
625 pid = lxc_line.split(";")[1]
626 timestamp = lxc_line.split(";")[2]
627 self.add_lxc(lxcname,pid)
628 try: timestamp = int(timestamp)
629 except: timestamp = 0
630 p = self.plc_instance_by_lxcname(lxcname)
632 print('WARNING zombie plc',self.hostname,lxcname)
633 print('... was expecting',lxcname,'in',[i.lxcname for i in self.plc_instances])
635 p.set_timestamp(timestamp)
637 ############################################################
639 def __init__(self, nodename, pid, qemubox):
640 self.nodename = nodename
642 self.qemu_box = qemubox
644 self.buildname = None
647 return "<QemuInstance {}>".format(self.nodename)
649 def set_buildname (self, buildname):
650 self.buildname = buildname
651 def set_timestamp (self, timestamp):
652 self.timestamp = timestamp
654 self.timestamp = int(time.time())
655 def pretty_timestamp (self):
656 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
659 msg = "== {} ==".format(short_hostname(self.nodename))
660 msg += " [={}]".format(self.buildname)
661 if self.pid: msg += " (pid={})".format(self.pid)
662 else: msg += " not (yet?) running"
663 if self.timestamp: msg += " @ {}".format(self.pretty_timestamp())
664 else: msg += " *unknown timestamp*"
669 print("cannot kill qemu {} with pid==0".format(self.nodename))
671 msg = "Killing qemu {} with pid={} on box {}".format(self.nodename, self.pid, self.qemu_box.hostname)
672 self.qemu_box.run_ssh(['kill', "{}".format(self.pid)], msg)
673 self.qemu_box.forget(self)
677 def __init__ (self, hostname, max_qemus):
678 Box.__init__(self, hostname)
679 self.qemu_instances = []
680 self.max_qemus = max_qemus
682 return "<QemuBox {}>".format(self.hostname)
684 def add_node(self, nodename, pid):
685 for qemu in self.qemu_instances:
686 if qemu.nodename == nodename:
687 header("WARNING, duplicate qemu {} running on {}"\
688 .format(nodename,self.hostname), banner=False)
690 self.qemu_instances.append(QemuInstance(nodename, pid, self))
692 def node_names (self):
693 return [ qi.nodename for qi in self.qemu_instances ]
695 def forget (self, qemu_instance):
696 self.qemu_instances.remove(qemu_instance)
698 # fill one slot even though this one is not started yet
699 def add_dummy(self, nodename):
700 dummy=QemuInstance('dummy_'+nodename, 0, self)
702 self.qemu_instances.append(dummy)
705 return "{} [max={},free={}] ({}) {}"\
706 .format(self.hostname_fedora(virt="qemu"),
707 self.max_qemus, self.free_slots(),
708 self.uptime(), self.driver())
710 def list(self, verbose=False):
711 if not self.qemu_instances:
712 header ('No qemu on {}'.format(self.line()))
714 header ("Qemus on {}".format(self.line()))
715 self.qemu_instances.sort(key=timestamp_key)
716 for q in self.qemu_instances:
717 header (q.line(), banner=False)
719 def free_slots (self):
720 return self.max_qemus - len(self.qemu_instances)
723 if hasattr(self,'_driver') and self._driver:
725 return '*undef* driver'
727 def qemu_instance_by_pid(self, pid):
728 for q in self.qemu_instances:
733 def qemu_instance_by_nodename_buildname (self, nodename, buildname):
734 for q in self.qemu_instances:
735 if q.nodename == nodename and q.buildname == buildname:
739 def reboot (self, options):
741 Box.reboot(self, options)
743 self.run_ssh(['pkill','qemu'], "Killing qemu instances",
744 dry_run=options.dry_run)
746 matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
748 def sense(self, options):
751 modules = self.backquote_ssh(['lsmod']).split('\n')
752 self._driver = '*NO kqemu/kvm_intel MODULE LOADED*'
753 for module in modules:
754 if module.find('kqemu') == 0:
755 self._driver = 'kqemu module loaded'
756 # kvm might be loaded without kvm_intel (we dont have AMD)
757 elif module.find('kvm_intel') == 0:
758 self._driver = 'kvm_intel OK'
759 ########## find out running pids
760 pids = self.backquote_ssh(['pgrep','qemu'])
763 command = ['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
764 ps_lines = self.backquote_ssh(command).split("\n")
765 for line in ps_lines:
766 if not line.strip() or line.find('PID') >=0 :
768 m = QemuBox.matcher.match(line)
770 self.add_node(m.group('nodename'), m.group('pid'))
772 header('QemuBox.sense: command {} returned line that failed to match'.format(command))
773 header(">>{}<<".format(line))
774 ########## retrieve alive instances and map to build
776 command = ['grep', '.', '/vservers/*/*/qemu.pid', '/dev/null']
777 pid_lines = self.backquote_ssh(command, trash_err=True).split('\n')
778 for pid_line in pid_lines:
779 if not pid_line.strip():
781 # expect <build>/<nodename>/qemu.pid:<pid>pid
783 (_, __, buildname, nodename, tail) = pid_line.split('/')
784 (_,pid) = tail.split(':')
785 q = self.qemu_instance_by_pid(pid)
788 q.set_buildname(buildname)
789 live_builds.append(buildname)
791 print('WARNING, could not parse pid line', pid_line)
792 # retrieve timestamps
795 command = ['grep','.']
796 command += ['/vservers/{}/*/timestamp'.format(b) for b in live_builds]
797 command += ['/dev/null']
798 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
799 for ts_line in ts_lines:
800 if not ts_line.strip():
802 # expect <build>/<nodename>/timestamp:<timestamp>
804 (_, __, buildname, nodename, tail) = ts_line.split('/')
805 nodename = nodename.replace('qemu-', '')
806 (_, timestamp) = tail.split(':')
807 timestamp = int(timestamp)
808 q = self.qemu_instance_by_nodename_buildname(nodename, buildname)
810 # this warning corresponds to qemu instances that were not killed properly
811 # and that have a dangling qemu.pid - and not even all of them as they need
812 # to be attached to a build that has a node running...
813 # it is more confusing than helpful, so let's just trash it
814 #print 'WARNING zombie qemu',self.hostname,ts_line
815 #print '... was expecting (',short_hostname(nodename),buildname,') in',\
816 # [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
818 q.set_timestamp(timestamp)
820 print('WARNING, could not parse ts line',ts_line)
824 def __init__(self, buildname, pid=0):
828 self.buildname = buildname
832 self.broken_steps = []
835 return "<TestInstance {}>".format(self.buildname)
837 def set_timestamp(self, timestamp):
838 self.timestamp = timestamp
840 self.timestamp = int(time.time())
841 def pretty_timestamp(self):
842 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
843 def is_running (self):
844 return len(self.pids) != 0
845 def add_pid(self, pid):
846 self.pids.append(pid)
847 def set_broken(self, plcindex, step):
848 self.broken_steps.append( (plcindex, step,) )
850 def second_letter(self):
851 if not self.broken_steps:
854 really_broken = [ step for (i,step) in self.broken_steps if '_ignore' not in step ]
855 # W is for warning like what's in the build mail
856 if len(really_broken) == 0:
862 # make up a 2-letter sign
863 # first letter : '=', unless build is running : '*'
864 double = '*' if self.pids else '='
865 # second letter : '=' if fine, 'W' for warnings (only ignored steps) 'B' for broken
866 letter2 = self.second_letter()
868 msg = " {} {} ==".format(double, self.buildname)
871 elif len(self.pids)==1:
872 msg += " (pid={})".format(self.pids[0])
874 msg += " !!!pids={}!!!".format(self.pids)
875 msg += " @{}".format(self.pretty_timestamp())
877 msg2 = ( ' BROKEN' if letter2 == 'B' else ' WARNING' )
878 # sometimes we have an empty plcindex
879 msg += " [{}=".format(msg2) \
880 + " ".join(["{}@{}".format(s, i) if i else s for (i, s) in self.broken_steps]) \
885 def __init__(self, hostname):
886 Box.__init__(self, hostname)
887 self.starting_ips = []
888 self.test_instances = []
890 return "<TestBox {}>".format(self.hostname)
892 def reboot(self, options):
893 # can't reboot a vserver VM
894 self.run_ssh(['pkill', 'run_log'], "Terminating current runs",
895 dry_run=options.dry_run)
896 self.run_ssh(['rm', '-f', Starting.location], "Cleaning {}".format(Starting.location),
897 dry_run=options.dry_run)
899 def get_test(self, buildname):
900 for i in self.test_instances:
901 if i.buildname == buildname:
904 # we scan ALL remaining test results, even the ones not running
905 def add_timestamp(self, buildname, timestamp):
906 i = self.get_test(buildname)
908 i.set_timestamp(timestamp)
910 i = TestInstance(buildname, 0)
911 i.set_timestamp(timestamp)
912 self.test_instances.append(i)
914 def add_running_test(self, pid, buildname):
915 i = self.get_test(buildname)
917 self.test_instances.append(TestInstance(buildname, pid))
920 print("WARNING: 2 concurrent tests run on same build {}".format(buildname))
923 def add_broken(self, buildname, plcindex, step):
924 i = self.get_test(buildname)
926 i = TestInstance(buildname)
927 self.test_instances.append(i)
928 i.set_broken(plcindex, step)
930 matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
931 matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
932 matcher_grep_missing=re.compile ("grep: /root/(?P<buildname>[^/]+)/logs/trace: No such file or directory")
934 def sense(self, options):
936 self.starting_ips = [ x for x in self.backquote_ssh( ['cat', Starting.location], trash_err=True).strip().split('\n') if x ]
938 # scan timestamps on all tests
939 # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
940 # xxx would make sense above too
941 command = ['bash', '-c', "grep . /root/*/timestamp /dev/null"]
942 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
943 for ts_line in ts_lines:
944 if not ts_line.strip():
946 # expect /root/<buildname>/timestamp:<timestamp>
948 (ts_file, timestamp) = ts_line.split(':')
949 ts_file = os.path.dirname(ts_file)
950 buildname = os.path.basename(ts_file)
951 timestamp = int(timestamp)
952 t = self.add_timestamp(buildname, timestamp)
954 print('WARNING, could not parse ts line', ts_line)
956 # let's try to be robust here -- tests that fail very early like e.g.
957 # "Cannot make space for a PLC instance: vplc IP pool exhausted", that occurs as part of provision
958 # will result in a 'trace' symlink to an inexisting 'trace-<>.txt' because no step has gone through
959 # simple 'trace' should exist though as it is created by run_log
960 command = ['bash', '-c', "grep KO /root/*/logs/trace /dev/null 2>&1" ]
961 trace_lines = self.backquote_ssh(command).split('\n')
962 for line in trace_lines:
965 m = TestBox.matcher_grep_missing.match(line)
967 buildname = m.group('buildname')
968 self.add_broken(buildname, '', 'NO STEP DONE')
970 m = TestBox.matcher_grep.match(line)
972 buildname = m.group('buildname')
973 plcindex = m.group('plcindex')
974 step = m.group('step')
975 self.add_broken(buildname, plcindex, step)
977 header("TestBox.sense: command {} returned line that failed to match\n{}".format(command, line))
978 header(">>{}<<".format(line))
980 pids = self.backquote_ssh (['pgrep', 'run_log'], trash_err=True)
983 command = ['ls','-ld'] + ["/proc/{}/cwd".format(pid) for pid in pids.split("\n") if pid]
984 ps_lines = self.backquote_ssh(command).split('\n')
985 for line in ps_lines:
988 m = TestBox.matcher_proc.match(line)
991 buildname = m.group('buildname')
992 self.add_running_test(pid, buildname)
994 header("TestBox.sense: command {} returned line that failed to match\n{}".format(command, line))
995 header(">>{}<<".format(line))
999 return self.hostname_fedora()
1001 def list (self, verbose=False):
1002 # verbose shows all tests
1004 instances = self.test_instances
1007 instances = [ i for i in self.test_instances if i.is_running() ]
1011 header ("No {} on {}".format(msg, self.line()))
1013 header ("{} on {}".format(msg, self.line()))
1014 instances.sort(key=timestamp_key)
1017 # show 'starting' regardless of verbose
1018 if self.starting_ips:
1019 header("Starting IP addresses on {}".format(self.line()))
1020 self.starting_ips.sort()
1021 for starting in self.starting_ips:
1024 header("Empty 'starting' on {}".format(self.line()))
1026 ############################################################
1032 self.options = Options()
1033 self.options.dry_run = False
1034 self.options.verbose = False
1035 self.options.reboot = False
1036 self.options.soft = False
1037 self.test_box = TestBox (self.test_box_spec())
1038 self.build_lxc_boxes = [ BuildLxcBox(h) for h in self.build_lxc_boxes_spec() ]
1039 self.plc_lxc_boxes = [ PlcLxcBox (h, m) for (h, m) in self.plc_lxc_boxes_spec ()]
1040 self.qemu_boxes = [ QemuBox (h, m) for (h, m) in self.qemu_boxes_spec ()]
1041 self._sensed = False
1043 self.vplc_pool = Pool(self.vplc_ips(), "for vplcs", self)
1044 self.vnode_pool = Pool(self.vnode_ips(), "for vnodes", self)
1046 self.build_boxes = self.build_lxc_boxes
1047 self.plc_boxes = self.plc_lxc_boxes
1048 self.default_boxes = self.plc_boxes + self.qemu_boxes
1049 self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
1051 return "<Substrate {}>".format(self.summary_line())
1053 def summary_line (self):
1055 msg += " {} xp".format(len(self.plc_lxc_boxes))
1056 msg += " {} xq".format(len(self.qemu_boxes))
1060 def fqdn (self, hostname):
1061 if hostname.find('.') < 0:
1062 return "{}.{}".format(hostname, self.domain())
1065 # return True if actual sensing takes place
1066 def sense(self, force=False):
1067 if self._sensed and not force:
1069 print('Sensing local substrate...', end=' ')
1071 for b in self.default_boxes:
1072 b.sense(self.options)
1077 def list(self, verbose=False):
1078 for b in self.default_boxes:
1081 def add_dummy_plc(self, plc_boxname, plcname):
1082 for pb in self.plc_boxes:
1083 if pb.hostname == plc_boxname:
1084 pb.add_dummy(plcname)
1086 def add_dummy_qemu(self, qemu_boxname, qemuname):
1087 for qb in self.qemu_boxes:
1088 if qb.hostname == qemu_boxname:
1089 qb.add_dummy(qemuname)
1092 def add_starting_dummy(self, bname, vname):
1093 return self.add_dummy_plc(bname, vname) or self.add_dummy_qemu(bname, vname)
1096 def provision(self, plcs, options):
1098 # attach each plc to a plc box and an IP address
1099 plcs = [ self.provision_plc(plc, options) for plc in plcs ]
1100 # attach each node/qemu to a qemu box with an IP address
1101 plcs = [ self.provision_qemus(plc,options) for plc in plcs ]
1102 # update the SFA spec accordingly
1103 plcs = [ self.localize_sfa_rspec(plc, options) for plc in plcs ]
1106 except Exception as e:
1107 print('* Could not provision this test on current substrate','--',e,'--','exiting')
1108 traceback.print_exc()
1111 # it is expected that a couple of options like ips_bplc and ips_vplc
1112 # are set or unset together
1114 def check_options(x, y):
1117 return len(x) == len(y)
1119 # find an available plc box (or make space)
1120 # and a free IP address (using options if present)
1121 def provision_plc(self, plc, options):
1123 assert Substrate.check_options(options.ips_bplc, options.ips_vplc)
1125 #### let's find an IP address for that plc
1127 if options.ips_vplc:
1129 # we don't check anything here,
1130 # it is the caller's responsability to cleanup and make sure this makes sense
1131 plc_boxname = options.ips_bplc.pop()
1132 vplc_hostname = options.ips_vplc.pop()
1137 vplc_hostname = None
1138 # try to find an available IP
1139 self.vplc_pool.sense()
1140 couple = self.vplc_pool.next_free()
1142 (vplc_hostname, unused) = couple
1143 #### we need to find one plc box that still has a slot
1145 # use the box that has max free spots for load balancing
1146 for pb in self.plc_boxes:
1147 free = pb.free_slots()
1149 plc_boxname = pb.hostname
1151 # if there's no available slot in the plc_boxes, or we need a free IP address
1152 # make space by killing the oldest running instance
1153 if not plc_boxname or not vplc_hostname:
1154 # find the oldest of all our instances
1155 all_plc_instances = reduce(lambda x, y: x+y,
1156 [ pb.plc_instances for pb in self.plc_boxes ],
1158 all_plc_instances.sort(key=timestamp_key)
1160 plc_instance_to_kill = all_plc_instances[0]
1164 msg += " PLC boxes are full"
1165 if not vplc_hostname:
1166 msg += " vplc IP pool exhausted"
1167 msg += " {}".format(self.summary_line())
1168 raise Exception("Cannot make space for a PLC instance:" + msg)
1169 freed_plc_boxname = plc_instance_to_kill.plc_box.hostname
1170 freed_vplc_hostname = plc_instance_to_kill.vplcname()
1171 message = 'killing oldest plc instance = {} on {}'\
1172 .format(plc_instance_to_kill.line(), freed_plc_boxname)
1173 plc_instance_to_kill.kill()
1174 # use this new plcbox if that was the problem
1176 plc_boxname = freed_plc_boxname
1177 # ditto for the IP address
1178 if not vplc_hostname:
1179 vplc_hostname = freed_vplc_hostname
1180 # record in pool as mine
1181 self.vplc_pool.set_mine(vplc_hostname)
1184 self.add_dummy_plc(plc_boxname, plc['name'])
1185 vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
1186 self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
1188 #### compute a helpful vserver name
1189 # remove domain in hostname
1190 vplc_short = short_hostname(vplc_hostname)
1191 vservername = "{}-{}-{}".format(options.buildname, plc['index'], vplc_short)
1192 plc_name = "{}_{}".format(plc['name'], vplc_short)
1194 utils.header('PROVISION plc {} in box {} at IP {} as {}'\
1195 .format(plc['name'], plc_boxname, vplc_hostname, vservername))
1197 #### apply in the plc_spec
1199 # label = options.personality.replace("linux","")
1200 mapper = {'plc' : [ ('*' , {'host_box' : plc_boxname,
1202 'vservername' : vservername,
1203 'vserverip' : vplc_ip,
1204 'settings:PLC_DB_HOST' : vplc_hostname,
1205 'settings:PLC_API_HOST' : vplc_hostname,
1206 'settings:PLC_BOOT_HOST' : vplc_hostname,
1207 'settings:PLC_WWW_HOST' : vplc_hostname,
1208 'settings:PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
1209 'settings:PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
1214 # mappers only work on a list of plcs
1215 return TestMapper([plc], options).map(mapper)[0]
1218 def provision_qemus(self, plc, options):
1220 assert Substrate.check_options(options.ips_bnode, options.ips_vnode)
1222 test_mapper = TestMapper([plc], options)
1223 nodenames = test_mapper.node_names()
1225 for nodename in nodenames:
1227 if options.ips_vnode:
1228 # as above, it's a rerun, take it for granted
1229 qemu_boxname = options.ips_bnode.pop()
1230 vnode_hostname = options.ips_vnode.pop()
1235 vnode_hostname = None
1236 # try to find an available IP
1237 self.vnode_pool.sense()
1238 couple = self.vnode_pool.next_free()
1240 (vnode_hostname, unused) = couple
1241 # find a physical box
1243 # use the box that has max free spots for load balancing
1244 for qb in self.qemu_boxes:
1245 free = qb.free_slots()
1247 qemu_boxname = qb.hostname
1249 # if we miss the box or the IP, kill the oldest instance
1250 if not qemu_boxname or not vnode_hostname:
1251 # find the oldest of all our instances
1252 all_qemu_instances = reduce(lambda x, y: x+y,
1253 [ qb.qemu_instances for qb in self.qemu_boxes ],
1255 all_qemu_instances.sort(key=timestamp_key)
1257 qemu_instance_to_kill = all_qemu_instances[0]
1260 if not qemu_boxname:
1261 msg += " QEMU boxes are full"
1262 if not vnode_hostname:
1263 msg += " vnode IP pool exhausted"
1264 msg += " {}".format(self.summary_line())
1265 raise Exception("Cannot make space for a QEMU instance:"+msg)
1266 freed_qemu_boxname = qemu_instance_to_kill.qemu_box.hostname
1267 freed_vnode_hostname = short_hostname(qemu_instance_to_kill.nodename)
1269 message = 'killing oldest qemu node = {} on {}'.format(qemu_instance_to_kill.line(),
1271 qemu_instance_to_kill.kill()
1272 # use these freed resources where needed
1273 if not qemu_boxname:
1274 qemu_boxname = freed_qemu_boxname
1275 if not vnode_hostname:
1276 vnode_hostname = freed_vnode_hostname
1277 self.vnode_pool.set_mine(vnode_hostname)
1279 self.add_dummy_qemu(qemu_boxname, vnode_hostname)
1280 mac = self.vnode_pool.retrieve_userdata(vnode_hostname)
1281 ip = self.vnode_pool.get_ip(vnode_hostname)
1282 self.vnode_pool.add_starting(vnode_hostname, qemu_boxname)
1284 vnode_fqdn = self.fqdn(vnode_hostname)
1285 nodemap = {'host_box' : qemu_boxname,
1286 'node_fields:hostname' : vnode_fqdn,
1287 'interface_fields:ip' : ip,
1288 'ipaddress_fields:ip_addr' : ip,
1289 'interface_fields:mac' : mac,
1291 nodemap.update(self.network_settings())
1292 maps.append( (nodename, nodemap) )
1294 utils.header("PROVISION node {} in box {} at IP {} with MAC {}"\
1295 .format(nodename, qemu_boxname, vnode_hostname, mac))
1297 return test_mapper.map({'node':maps})[0]
1299 def localize_sfa_rspec(self, plc, options):
1301 plc['sfa']['settings']['SFA_REGISTRY_HOST'] = plc['settings']['PLC_DB_HOST']
1302 plc['sfa']['settings']['SFA_AGGREGATE_HOST'] = plc['settings']['PLC_DB_HOST']
1303 plc['sfa']['settings']['SFA_DB_HOST'] = plc['settings']['PLC_DB_HOST']
1304 plc['sfa']['settings']['SFA_PLC_URL'] = 'https://{}:443/PLCAPI/'.format(plc['settings']['PLC_API_HOST'])
1307 #################### release:
1308 def release(self, options):
1309 self.vplc_pool.release_my_starting()
1310 self.vnode_pool.release_my_starting()
1313 #################### show results for interactive mode
1314 def get_box(self, boxname):
1315 for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1316 if b.shortname() == boxname:
1319 if b.shortname() == boxname.split('.')[0]:
1323 print("Could not find box {}".format(boxname))
1326 # deal with the mix of boxes and names and stores the current focus
1327 # as a list of Box instances in self.focus_all
1328 def normalize(self, box_or_names):
1330 for box in box_or_names:
1331 if not isinstance(box, Box):
1332 box = self.get_box(box)
1334 print('Warning - could not handle box',box)
1335 self.focus_all.append(box)
1337 self.focus_build = [ x for x in self.focus_all if isinstance(x, BuildBox) ]
1338 self.focus_plc = [ x for x in self.focus_all if isinstance(x, PlcBox) ]
1339 self.focus_qemu = [ x for x in self.focus_all if isinstance(x, QemuBox) ]
1341 def list_boxes(self):
1342 print('Sensing', end=' ')
1344 for box in self.focus_all:
1345 box.sense(self.options)
1347 for box in self.focus_all:
1348 box.list(self.options.verbose)
1350 def reboot_boxes(self):
1351 for box in self.focus_all:
1352 box.reboot(self.options)
1354 def sanity_check(self):
1355 print('Sanity check')
1356 self.sanity_check_plc()
1357 self.sanity_check_qemu()
1359 def sanity_check_plc(self):
1362 def sanity_check_qemu(self):
1364 for box in self.focus_qemu:
1365 all_nodes += box.node_names()
1367 for node in all_nodes:
1368 if node not in hash:
1371 for (node,count) in list(hash.items()):
1373 print('WARNING - duplicate node', node)
1376 ####################
1377 # can be run as a utility to probe/display/manage the local infrastructure
1379 parser = OptionParser()
1380 parser.add_option('-r', "--reboot", action='store_true', dest='reboot', default=False,
1381 help='reboot mode (use shutdown -r)')
1382 parser.add_option('-s', "--soft", action='store_true', dest='soft', default=False,
1383 help='soft mode for reboot (terminates processes)')
1384 parser.add_option('-t', "--testbox", action='store_true', dest='testbox', default=False,
1385 help='add test box')
1386 parser.add_option('-b', "--build", action='store_true', dest='builds', default=False,
1387 help='add build boxes')
1388 parser.add_option('-p', "--plc", action='store_true', dest='plcs', default=False,
1389 help='add plc boxes')
1390 parser.add_option('-q', "--qemu", action='store_true', dest='qemus', default=False,
1391 help='add qemu boxes')
1392 parser.add_option('-a', "--all", action='store_true', dest='all', default=False,
1393 help='address all known boxes, like -b -t -p -q')
1394 parser.add_option('-v', "--verbose", action='store_true', dest='verbose', default=False,
1395 help='verbose mode')
1396 parser.add_option('-n', "--dry_run", action='store_true', dest='dry_run', default=False,
1397 help='dry run mode')
1398 (self.options, args) = parser.parse_args()
1401 if self.options.testbox: boxes += [self.test_box]
1402 if self.options.builds: boxes += self.build_boxes
1403 if self.options.plcs: boxes += self.plc_boxes
1404 if self.options.qemus: boxes += self.qemu_boxes
1405 if self.options.all: boxes += self.all_boxes
1408 verbose = self.options.verbose
1409 # default scope is -b -p -q -t
1411 boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box]
1413 self.normalize(boxes)
1415 if self.options.reboot: