2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010-2015 INRIA
5 # #################### history
7 # see also Substrate.readme
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity,
14 # hence the 'timestamp-*' steps in TestPlc
16 # this should be much more flexible:
17 # * supports several plc boxes
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
21 # #################### howto use
23 # each site is to write its own LocalSubstrate.py,
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
28 # . the vserver-capable boxes used for hosting myplcs
29 # . and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
32 # . the kvm-qemu capable boxes to host qemu instances
33 # . and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
36 # #################### implem. note
38 # this model relies on 'sensing' the substrate,
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies
41 # however there is a 'black hole' between the time where a given address is
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class
47 # ####################
55 from optparse import OptionParser
58 from TestSsh import TestSsh
59 from TestMapper import TestMapper
60 from functools import reduce
62 # too painful to propagate this cleanly
65 def header (message, banner=True):
66 if not message: return
68 print("===============", end=' ')
72 def timestamp_key(o): return o.timestamp
74 def short_hostname (hostname):
75 return hostname.split('.')[0]
78 # the place were other test instances tell about their not-yet-started
79 # instances, that go undetected through sensing
82 location = '/root/starting'
92 with open(Starting.location) as starting:
93 self.tuples = [line.strip().split('@') for line in starting.readlines()]
99 return [ x for (x, _) in self.tuples ]
101 def add (self, vname, bname):
102 if not vname in self.vnames():
103 with open(Starting.location, 'a') as out:
104 out.write("{}@{}\n".format(vname, bname))
106 def delete_vname (self, vname):
108 if vname in self.vnames():
109 with open(Starting.location, 'w') as f:
110 for (v, b) in self.tuples:
112 f.write("{}@{}\n".format(v, b))
116 # allows to pick an available IP among a pool
117 # input is expressed as a list of tuples (hostname,ip,user_data)
118 # that can be searched iteratively for a free slot
120 # pool = [ (hostname1,user_data1),
121 # (hostname2,user_data2),
122 # (hostname3,user_data2),
123 # (hostname4,user_data4) ]
124 # assuming that ip1 and ip3 are taken (pingable), then we'd get
126 # pool.next_free() -> entry2
127 # pool.next_free() -> entry4
128 # pool.next_free() -> None
129 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
132 def __init__ (self, hostname, userdata):
133 self.hostname = hostname
134 self.userdata = userdata
135 # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
136 # 'mine' is for our own stuff, 'starting' from the concurrent tests
141 return "<PoolItem {} {}>".format(self.hostname, self.userdata)
144 return "Pooled {} ({}) -> {}".format(self.hostname, self.userdata, self.status)
147 if self.status == None: return '?'
148 elif self.status == 'busy': return '+'
149 elif self.status == 'free': return '-'
150 elif self.status == 'mine': return 'M'
151 elif self.status == 'starting': return 'S'
156 self.ip = socket.gethostbyname(self.hostname)
161 def __init__ (self, tuples, message, substrate):
162 self.pool_items = [ PoolItem (hostname, userdata) for (hostname, userdata) in tuples ]
163 self.message = message
164 # where to send notifications upon load_starting
165 self.substrate = substrate
168 return "<Pool {} : {} .. {}>".format(self.message, self.pool_items[0], self.pool_items[-1])
170 def list (self, verbose=False):
172 for i in self.pool_items: print(i.line())
176 for i in self.pool_items: line += ' ' + i.char()
179 def _item (self, hostname):
180 for i in self.pool_items:
181 if i.hostname == hostname: return i
182 raise Exception ("Could not locate hostname {} in pool {}".format(hostname, self.message))
184 def retrieve_userdata (self, hostname):
185 return self._item(hostname).userdata
187 def get_ip (self, hostname):
189 return self._item(hostname).get_ip()
191 return socket.gethostbyname(hostname)
193 def set_mine (self, hostname):
195 self._item(hostname).status='mine'
197 print('WARNING: host {} not found in IP pool {}'.format(hostname, self.message))
199 def next_free (self):
200 for i in self.pool_items:
201 if i.status == 'free':
203 return (i.hostname, i.userdata)
207 # we have a starting instance of our own
208 def add_starting (self, vname, bname):
209 Starting().add(vname, bname)
210 for i in self.pool_items:
211 if i.hostname == vname:
214 # load the starting instances from the common file
215 # remember that might be ours
216 # return the list of (vname,bname) that are not ours
217 def load_starting (self):
218 starting = Starting()
221 for (v, b) in starting.tuples:
222 for i in self.pool_items:
223 if i.hostname == v and i.status == 'free':
224 i.status = 'starting'
225 new_tuples.append( (v, b,) )
228 def release_my_starting (self):
229 for i in self.pool_items:
230 if i.status == 'mine':
231 Starting().delete_vname(i.hostname)
237 for item in self.pool_items:
238 if item.status is not None:
239 print(item.char(), end=' ')
241 if self.check_ping (item.hostname):
250 print('Sensing IP pool', self.message, end=' ')
254 for vname, bname in self.load_starting():
255 self.substrate.add_starting_dummy(bname, vname)
256 print("After having loaded 'starting': IP pool")
258 # OS-dependent ping option (support for macos, for convenience)
259 ping_timeout_option = None
260 # returns True when a given hostname/ip responds to ping
261 def check_ping (self, hostname):
262 if '.' not in hostname:
263 hostname = self.substrate.fqdn(hostname)
264 if not Pool.ping_timeout_option:
265 (status, osname) = subprocess.getstatusoutput("uname -s")
267 raise Exception("TestPool: Cannot figure your OS name")
268 if osname == "Linux":
269 Pool.ping_timeout_option = "-w"
270 elif osname == "Darwin":
271 Pool.ping_timeout_option = "-t"
273 command = "ping -c 1 {} 1 {}".format(Pool.ping_timeout_option, hostname)
274 (status, output) = subprocess.getstatusoutput(command)
279 def __init__ (self, hostname):
280 self.hostname = hostname
283 return "<Box {}>".format(self.hostname)
284 def shortname (self):
285 return short_hostname(self.hostname)
287 return TestSsh(self.hostname, username='root', unknown_host=False)
288 def reboot (self, options):
289 self.test_ssh().run("shutdown -r now",
290 message="Rebooting {}".format(self.hostname),
291 dry_run=options.dry_run)
293 def hostname_fedora (self, virt=None):
294 # this truly is an opening bracket
295 result = "{}".format(self.hostname) + " {"
297 result += "{}-".format(virt)
298 result += "{} {}".format(self.fedora(), self.memory())
299 # too painful to propagate this cleanly
302 result += "-{}".format(self.uname())
303 # and the matching closing bracket
307 separator = "===composite==="
310 # take this chance to gather useful stuff
313 if self._probed is not None: return self._probed
314 composite_command = [ ]
315 composite_command += [ "hostname" ]
316 composite_command += [ ";" , "echo", Box.separator , ";" ]
317 composite_command += [ "uptime" ]
318 composite_command += [ ";" , "echo", Box.separator , ";" ]
319 composite_command += [ "uname", "-r"]
320 composite_command += [ ";" , "echo", Box.separator , ";" ]
321 composite_command += [ "cat" , "/etc/fedora-release" ]
322 composite_command += [ ";" , "echo", Box.separator , ";" ]
323 composite_command += [ "grep", "MemTotal", "/proc/meminfo" ]
325 # due to colons and all, this is going wrong on the local box (typically testmaster)
326 # I am reluctant to change TestSsh as it might break all over the place, so
327 if self.test_ssh().is_local():
328 probe_argv = [ "bash", "-c", " ".join (composite_command) ]
330 probe_argv = self.test_ssh().actual_argv(composite_command)
331 composite = self.backquote ( probe_argv, trash_err=True )
332 self._hostname = self._uptime = self._uname = self._fedora = self._memory = "** Unknown **"
334 print("root@{} unreachable".format(self.hostname))
338 pieces = composite.split(Box.separator)
339 pieces = [ x.strip() for x in pieces ]
341 [hostname, uptime, uname, fedora, memory] = pieces
343 self._hostname = hostname
344 self._uptime = ', '.join([ x.strip() for x in uptime.split(',')[2:]]).replace("load average", "load")
346 self._fedora = fedora.replace("Fedora release ","f").split(" ")[0]
348 self._memory = int(memory.split()[1])/(1024)
349 except Exception as e:
351 print('BEG issue with pieces')
352 traceback.print_exc()
353 self._probed = self._hostname
356 # use argv=['bash','-c',"the command line"]
359 if hasattr(self,'_uptime') and self._uptime: return self._uptime
360 return '*unprobed* uptime'
363 if hasattr(self,'_uname') and self._uname: return self._uname
364 return '*unprobed* uname'
367 if hasattr(self,'_fedora') and self._fedora: return self._fedora
368 return '*unprobed* fedora'
371 if hasattr(self,'_memory') and self._memory: return "{} Mb".format(self._memory)
372 return '*unprobed* memory'
374 def run(self, argv, message=None, trash_err=False, dry_run=False):
376 print('DRY_RUN:', end=' ')
377 print(" ".join(argv))
382 return subprocess.call(argv)
384 with open('/dev/null', 'w') as null:
385 return subprocess.call(argv, stderr=null)
387 def run_ssh (self, argv, message, trash_err=False, dry_run=False):
388 ssh_argv = self.test_ssh().actual_argv(argv)
389 result = self.run (ssh_argv, message, trash_err, dry_run=dry_run)
391 print("WARNING: failed to run {} on {}".format(" ".join(argv), self.hostname))
394 def backquote (self, argv, trash_err=False):
395 # in python3 we need to set universal_newlines=True
397 out_err = subprocess.Popen(argv, stdout=subprocess.PIPE,
398 universal_newlines=True).communicate()
400 with open('/dev/null', 'w') as null:
401 out_err = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=null,
402 universal_newlines=True).communicate()
403 # only interested in stdout here
406 # if you have any shell-expanded arguments like *
407 # and if there's any chance the command is adressed to the local host
408 def backquote_ssh (self, argv, trash_err=False):
409 if not self.probe(): return ''
410 return self.backquote(self.test_ssh().actual_argv(argv), trash_err)
412 ############################################################
414 def __init__ (self, buildname, pid, buildbox):
415 self.buildname = buildname
416 self.buildbox = buildbox
419 return "<BuildInstance {} in {}>".format(self.buildname, self.buildbox)
421 def add_pid(self,pid):
422 self.pids.append(pid)
425 return "== {} == (pids={})".format(self.buildname, self.pids)
427 class BuildBox (Box):
428 def __init__ (self, hostname):
429 Box.__init__(self, hostname)
430 self.build_instances = []
432 return "<BuildBox {}>".format(self.hostname)
434 def add_build(self, buildname, pid):
435 for build in self.build_instances:
436 if build.buildname == buildname:
439 self.build_instances.append(BuildInstance(buildname, pid, self))
441 def list(self, verbose=False):
442 if not self.build_instances:
443 header ('No build process on {} ({})'.format(self.hostname_fedora(), self.uptime()))
445 header ("Builds on {} ({})".format(self.hostname_fedora(), self.uptime()))
446 for b in self.build_instances:
447 header (b.line(), banner=False)
449 def reboot (self, options):
451 Box.reboot(self, options)
453 self.soft_reboot (options)
455 build_matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
456 build_matcher_initvm=re.compile("\s*(?P<pid>[0-9]+).*initvm.*\s+(?P<buildname>[^\s]+)\s*\Z")
458 class BuildLxcBox (BuildBox):
459 def soft_reboot (self, options):
460 command=['pkill','lbuild']
461 self.run_ssh(command, "Terminating vbuild processes", dry_run=options.dry_run)
463 # inspect box and find currently running builds
464 def sense(self, options):
467 pids = self.backquote_ssh(['pgrep','lbuild'], trash_err=True)
469 command = ['ps', '-o', 'pid,command'] + [ pid for pid in pids.split("\n") if pid]
470 ps_lines = self.backquote_ssh(command).split('\n')
471 for line in ps_lines:
472 if not line.strip() or line.find('PID') >= 0: continue
473 m = build_matcher.match(line)
475 date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
476 buildname = m.group('buildname').replace('@DATE@', date)
477 self.add_build(buildname, m.group('pid'))
479 m = build_matcher_initvm.match(line)
481 # buildname is expansed here
482 self.add_build(buildname, m.group('pid'))
484 header('BuildLxcBox.sense: command {} returned line that failed to match'.format(command))
485 header(">>{}<<".format(line))
487 ############################################################
489 def __init__ (self, plcbox):
490 self.plc_box = plcbox
494 return "<PlcInstance {}>".format(self.plc_box)
496 def set_timestamp (self,timestamp):
497 self.timestamp = timestamp
499 self.timestamp = int(time.time())
500 def pretty_timestamp (self):
501 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
503 class PlcLxcInstance (PlcInstance):
504 # does lxc have a context id of any kind ?
505 def __init__ (self, plcbox, lxcname, pid):
506 PlcInstance.__init__(self, plcbox)
507 self.lxcname = lxcname
510 return "<PlcLxcInstance {}>".format(self.lxcname)
513 return self.lxcname.split('-')[-1]
514 def buildname (self):
515 return self.lxcname.rsplit('-',2)[0]
518 msg="== {} ==".format(self.vplcname())
519 msg += " [={}]".format(self.lxcname)
520 if self.pid==-1: msg+=" not (yet?) running"
521 else: msg+=" (pid={})".format(self.pid)
522 if self.timestamp: msg += " @ {}".format(self.pretty_timestamp())
523 else: msg += " *unknown timestamp*"
527 command="rsync lxc-driver.sh {}:/root".format(self.plc_box.hostname)
528 subprocess.getstatusoutput(command)
529 msg="lxc container stopping {} on {}".format(self.lxcname, self.plc_box.hostname)
530 self.plc_box.run_ssh(['/root/lxc-driver.sh', '-c', 'stop_lxc', '-n', self.lxcname], msg)
531 self.plc_box.forget(self)
535 def __init__ (self, hostname, max_plcs):
536 Box.__init__(self, hostname)
537 self.plc_instances = []
538 self.max_plcs = max_plcs
540 return "<PlcBox {}>".format(self.hostname)
542 def free_slots (self):
543 return self.max_plcs - len(self.plc_instances)
545 # fill one slot even though this one is not started yet
546 def add_dummy (self, plcname):
547 dummy=PlcLxcInstance(self, 'dummy_'+plcname, 0)
549 self.plc_instances.append(dummy)
551 def forget (self, plc_instance):
552 self.plc_instances.remove(plc_instance)
554 def reboot (self, options):
556 Box.reboot(self, options)
558 self.soft_reboot (options)
560 def list(self, verbose=False):
561 if not self.plc_instances:
562 header ('No plc running on {}'.format(self.line()))
564 header ("Active plc VMs on {}".format(self.line()))
565 self.plc_instances.sort(key=timestamp_key)
566 for p in self.plc_instances:
567 header (p.line(), banner=False)
569 ## we do not this at INRIA any more
570 class PlcLxcBox (PlcBox):
572 def add_lxc (self, lxcname, pid):
573 for plc in self.plc_instances:
574 if plc.lxcname == lxcname:
575 header("WARNING, duplicate myplc {} running on {}"\
576 .format(lxcname, self.hostname), banner=False)
578 self.plc_instances.append(PlcLxcInstance(self, lxcname, pid))
581 # a line describing the box
583 return "{} [max={},free={}] ({})".format(self.hostname_fedora(virt="lxc"),
584 self.max_plcs, self.free_slots(),
587 def plc_instance_by_lxcname(self, lxcname):
588 for p in self.plc_instances:
589 if p.lxcname == lxcname:
593 # essentially shutdown all running containers
594 def soft_reboot(self, options):
595 command="rsync lxc-driver.sh {}:/root".format(self.hostname)
596 subprocess.getstatusoutput(command)
597 self.run_ssh( ['/root/lxc-driver.sh','-c','stop_all'],
598 "Stopping all running lxc containers on {}".format(self.hostname),
599 dry_run=options.dry_run)
602 # sense is expected to fill self.plc_instances with PlcLxcInstance's
603 # to describe the currently running VM's
604 def sense(self, options):
607 command = "rsync lxc-driver.sh {}:/root".format(self.hostname)
608 subprocess.getstatusoutput(command)
609 command = ['/root/lxc-driver.sh', '-c', 'sense_all']
610 lxc_stat = self.backquote_ssh (command)
611 for lxc_line in lxc_stat.split("\n"):
614 # we mix build and plc VMs
615 if 'vplc' not in lxc_line:
617 lxcname = lxc_line.split(";")[0]
618 pid = lxc_line.split(";")[1]
619 timestamp = lxc_line.split(";")[2]
620 self.add_lxc(lxcname,pid)
621 try: timestamp = int(timestamp)
622 except: timestamp = 0
623 p = self.plc_instance_by_lxcname(lxcname)
625 print('WARNING zombie plc',self.hostname,lxcname)
626 print('... was expecting',lxcname,'in',[i.lxcname for i in self.plc_instances])
628 p.set_timestamp(timestamp)
630 ############################################################
632 def __init__(self, nodename, pid, qemubox):
633 self.nodename = nodename
635 self.qemu_box = qemubox
637 self.buildname = None
640 return "<QemuInstance {}>".format(self.nodename)
642 def set_buildname (self, buildname):
643 self.buildname = buildname
644 def set_timestamp (self, timestamp):
645 self.timestamp = timestamp
647 self.timestamp = int(time.time())
648 def pretty_timestamp (self):
649 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
652 msg = "== {} ==".format(short_hostname(self.nodename))
653 msg += " [={}]".format(self.buildname)
654 if self.pid: msg += " (pid={})".format(self.pid)
655 else: msg += " not (yet?) running"
656 if self.timestamp: msg += " @ {}".format(self.pretty_timestamp())
657 else: msg += " *unknown timestamp*"
662 print("cannot kill qemu {} with pid==0".format(self.nodename))
664 msg = "Killing qemu {} with pid={} on box {}".format(self.nodename, self.pid, self.qemu_box.hostname)
665 self.qemu_box.run_ssh(['kill', "{}".format(self.pid)], msg)
666 self.qemu_box.forget(self)
670 def __init__ (self, hostname, max_qemus):
671 Box.__init__(self, hostname)
672 self.qemu_instances = []
673 self.max_qemus = max_qemus
675 return "<QemuBox {}>".format(self.hostname)
677 def add_node(self, nodename, pid):
678 for qemu in self.qemu_instances:
679 if qemu.nodename == nodename:
680 header("WARNING, duplicate qemu {} running on {}"\
681 .format(nodename,self.hostname), banner=False)
683 self.qemu_instances.append(QemuInstance(nodename, pid, self))
685 def node_names (self):
686 return [ qi.nodename for qi in self.qemu_instances ]
688 def forget (self, qemu_instance):
689 self.qemu_instances.remove(qemu_instance)
691 # fill one slot even though this one is not started yet
692 def add_dummy(self, nodename):
693 dummy=QemuInstance('dummy_'+nodename, 0, self)
695 self.qemu_instances.append(dummy)
698 return "{} [max={},free={}] ({}) {}"\
699 .format(self.hostname_fedora(virt="qemu"),
700 self.max_qemus, self.free_slots(),
701 self.uptime(), self.driver())
703 def list(self, verbose=False):
704 if not self.qemu_instances:
705 header ('No qemu on {}'.format(self.line()))
707 header ("Qemus on {}".format(self.line()))
708 self.qemu_instances.sort(key=timestamp_key)
709 for q in self.qemu_instances:
710 header (q.line(), banner=False)
712 def free_slots (self):
713 return self.max_qemus - len(self.qemu_instances)
716 if hasattr(self,'_driver') and self._driver:
718 return '*undef* driver'
720 def qemu_instance_by_pid(self, pid):
721 for q in self.qemu_instances:
726 def qemu_instance_by_nodename_buildname (self, nodename, buildname):
727 for q in self.qemu_instances:
728 if q.nodename == nodename and q.buildname == buildname:
732 def reboot (self, options):
734 Box.reboot(self, options)
736 self.run_ssh(['pkill','qemu'], "Killing qemu instances",
737 dry_run=options.dry_run)
739 matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
741 def sense(self, options):
744 modules = self.backquote_ssh(['lsmod']).split('\n')
745 self._driver = '*NO kqemu/kvm_intel MODULE LOADED*'
746 for module in modules:
747 if module.find('kqemu') == 0:
748 self._driver = 'kqemu module loaded'
749 # kvm might be loaded without kvm_intel (we dont have AMD)
750 elif module.find('kvm_intel') == 0:
751 self._driver = 'kvm_intel OK'
752 ########## find out running pids
753 pids = self.backquote_ssh(['pgrep','qemu'])
756 command = ['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
757 ps_lines = self.backquote_ssh(command).split("\n")
758 for line in ps_lines:
759 if not line.strip() or line.find('PID') >=0 :
761 m = QemuBox.matcher.match(line)
763 self.add_node(m.group('nodename'), m.group('pid'))
765 header('QemuBox.sense: command {} returned line that failed to match'.format(command))
766 header(">>{}<<".format(line))
767 ########## retrieve alive instances and map to build
769 command = ['grep', '.', '/vservers/*/*/qemu.pid', '/dev/null']
770 pid_lines = self.backquote_ssh(command, trash_err=True).split('\n')
771 for pid_line in pid_lines:
772 if not pid_line.strip():
774 # expect <build>/<nodename>/qemu.pid:<pid>pid
776 (_, __, buildname, nodename, tail) = pid_line.split('/')
777 (_,pid) = tail.split(':')
778 q = self.qemu_instance_by_pid(pid)
781 q.set_buildname(buildname)
782 live_builds.append(buildname)
784 print('WARNING, could not parse pid line', pid_line)
785 # retrieve timestamps
788 command = ['grep','.']
789 command += ['/vservers/{}/*/timestamp'.format(b) for b in live_builds]
790 command += ['/dev/null']
791 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
792 for ts_line in ts_lines:
793 if not ts_line.strip():
795 # expect <build>/<nodename>/timestamp:<timestamp>
797 (_, __, buildname, nodename, tail) = ts_line.split('/')
798 nodename = nodename.replace('qemu-', '')
799 (_, timestamp) = tail.split(':')
800 timestamp = int(timestamp)
801 q = self.qemu_instance_by_nodename_buildname(nodename, buildname)
803 # this warning corresponds to qemu instances that were not killed properly
804 # and that have a dangling qemu.pid - and not even all of them as they need
805 # to be attached to a build that has a node running...
806 # it is more confusing than helpful, so let's just trash it
807 #print 'WARNING zombie qemu',self.hostname,ts_line
808 #print '... was expecting (',short_hostname(nodename),buildname,') in',\
809 # [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
811 q.set_timestamp(timestamp)
813 print('WARNING, could not parse ts line',ts_line)
817 def __init__(self, buildname, pid=0):
821 self.buildname = buildname
825 self.broken_steps = []
828 return "<TestInstance {}>".format(self.buildname)
830 def set_timestamp(self, timestamp):
831 self.timestamp = timestamp
833 self.timestamp = int(time.time())
834 def pretty_timestamp(self):
835 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
836 def is_running (self):
837 return len(self.pids) != 0
838 def add_pid(self, pid):
839 self.pids.append(pid)
840 def set_broken(self, plcindex, step):
841 self.broken_steps.append( (plcindex, step,) )
843 def second_letter(self):
844 if not self.broken_steps:
847 really_broken = [ step for (i,step) in self.broken_steps if '_ignore' not in step ]
848 # W is for warning like what's in the build mail
849 if len(really_broken) == 0:
855 # make up a 2-letter sign
856 # first letter : '=', unless build is running : '*'
857 double = '*' if self.pids else '='
858 # second letter : '=' if fine, 'W' for warnings (only ignored steps) 'B' for broken
859 letter2 = self.second_letter()
861 msg = " {} {} ==".format(double, self.buildname)
864 elif len(self.pids)==1:
865 msg += " (pid={})".format(self.pids[0])
867 msg += " !!!pids={}!!!".format(self.pids)
868 msg += " @{}".format(self.pretty_timestamp())
870 msg2 = ( ' BROKEN' if letter2 == 'B' else ' WARNING' )
871 # sometimes we have an empty plcindex
872 msg += " [{}=".format(msg2) \
873 + " ".join(["{}@{}".format(s, i) if i else s for (i, s) in self.broken_steps]) \
878 def __init__(self, hostname):
879 Box.__init__(self, hostname)
880 self.starting_ips = []
881 self.test_instances = []
883 return "<TestBox {}>".format(self.hostname)
885 def reboot(self, options):
886 # can't reboot a vserver VM
887 self.run_ssh(['pkill', 'run_log'], "Terminating current runs",
888 dry_run=options.dry_run)
889 self.run_ssh(['rm', '-f', Starting.location], "Cleaning {}".format(Starting.location),
890 dry_run=options.dry_run)
892 def get_test(self, buildname):
893 for i in self.test_instances:
894 if i.buildname == buildname:
897 # we scan ALL remaining test results, even the ones not running
898 def add_timestamp(self, buildname, timestamp):
899 i = self.get_test(buildname)
901 i.set_timestamp(timestamp)
903 i = TestInstance(buildname, 0)
904 i.set_timestamp(timestamp)
905 self.test_instances.append(i)
907 def add_running_test(self, pid, buildname):
908 i = self.get_test(buildname)
910 self.test_instances.append(TestInstance(buildname, pid))
913 print("WARNING: 2 concurrent tests run on same build {}".format(buildname))
916 def add_broken(self, buildname, plcindex, step):
917 i = self.get_test(buildname)
919 i = TestInstance(buildname)
920 self.test_instances.append(i)
921 i.set_broken(plcindex, step)
923 matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
924 matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
925 matcher_grep_missing=re.compile ("grep: /root/(?P<buildname>[^/]+)/logs/trace: No such file or directory")
927 def sense(self, options):
929 self.starting_ips = [ x for x in self.backquote_ssh( ['cat', Starting.location], trash_err=True).strip().split('\n') if x ]
931 # scan timestamps on all tests
932 # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
933 # xxx would make sense above too
934 command = ['bash', '-c', "grep . /root/*/timestamp /dev/null"]
935 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
936 for ts_line in ts_lines:
937 if not ts_line.strip():
939 # expect /root/<buildname>/timestamp:<timestamp>
941 (ts_file, timestamp) = ts_line.split(':')
942 ts_file = os.path.dirname(ts_file)
943 buildname = os.path.basename(ts_file)
944 timestamp = int(timestamp)
945 t = self.add_timestamp(buildname, timestamp)
947 print('WARNING, could not parse ts line', ts_line)
949 # let's try to be robust here -- tests that fail very early like e.g.
950 # "Cannot make space for a PLC instance: vplc IP pool exhausted", that occurs as part of provision
951 # will result in a 'trace' symlink to an inexisting 'trace-<>.txt' because no step has gone through
952 # simple 'trace' should exist though as it is created by run_log
953 command = ['bash', '-c', "grep KO /root/*/logs/trace /dev/null 2>&1" ]
954 trace_lines = self.backquote_ssh(command).split('\n')
955 for line in trace_lines:
958 m = TestBox.matcher_grep_missing.match(line)
960 buildname = m.group('buildname')
961 self.add_broken(buildname, '', 'NO STEP DONE')
963 m = TestBox.matcher_grep.match(line)
965 buildname = m.group('buildname')
966 plcindex = m.group('plcindex')
967 step = m.group('step')
968 self.add_broken(buildname, plcindex, step)
970 header("TestBox.sense: command {} returned line that failed to match\n{}".format(command, line))
971 header(">>{}<<".format(line))
973 pids = self.backquote_ssh (['pgrep', 'run_log'], trash_err=True)
976 command = ['ls','-ld'] + ["/proc/{}/cwd".format(pid) for pid in pids.split("\n") if pid]
977 ps_lines = self.backquote_ssh(command).split('\n')
978 for line in ps_lines:
981 m = TestBox.matcher_proc.match(line)
984 buildname = m.group('buildname')
985 self.add_running_test(pid, buildname)
987 header("TestBox.sense: command {} returned line that failed to match\n{}".format(command, line))
988 header(">>{}<<".format(line))
992 return self.hostname_fedora()
994 def list (self, verbose=False):
995 # verbose shows all tests
997 instances = self.test_instances
1000 instances = [ i for i in self.test_instances if i.is_running() ]
1004 header ("No {} on {}".format(msg, self.line()))
1006 header ("{} on {}".format(msg, self.line()))
1007 instances.sort(key=timestamp_key)
1010 # show 'starting' regardless of verbose
1011 if self.starting_ips:
1012 header("Starting IP addresses on {}".format(self.line()))
1013 self.starting_ips.sort()
1014 for starting in self.starting_ips:
1017 header("Empty 'starting' on {}".format(self.line()))
1019 ############################################################
1025 self.options = Options()
1026 self.options.dry_run = False
1027 self.options.verbose = False
1028 self.options.reboot = False
1029 self.options.soft = False
1030 self.test_box = TestBox (self.test_box_spec())
1031 self.build_lxc_boxes = [ BuildLxcBox(h) for h in self.build_lxc_boxes_spec() ]
1032 self.plc_lxc_boxes = [ PlcLxcBox (h, m) for (h, m) in self.plc_lxc_boxes_spec ()]
1033 self.qemu_boxes = [ QemuBox (h, m) for (h, m) in self.qemu_boxes_spec ()]
1034 self._sensed = False
1036 self.vplc_pool = Pool(self.vplc_ips(), "for vplcs", self)
1037 self.vnode_pool = Pool(self.vnode_ips(), "for vnodes", self)
1039 self.build_boxes = self.build_lxc_boxes
1040 self.plc_boxes = self.plc_lxc_boxes
1041 self.default_boxes = self.plc_boxes + self.qemu_boxes
1042 self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
1044 return "<Substrate {}>".format(self.summary_line())
1046 def summary_line (self):
1048 msg += " {} xp".format(len(self.plc_lxc_boxes))
1049 msg += " {} xq".format(len(self.qemu_boxes))
1053 def fqdn (self, hostname):
1054 if hostname.find('.') < 0:
1055 return "{}.{}".format(hostname, self.domain())
1058 # return True if actual sensing takes place
1059 def sense(self, force=False):
1060 if self._sensed and not force:
1062 print('Sensing local substrate...', end=' ')
1064 for b in self.default_boxes:
1065 b.sense(self.options)
1070 def list(self, verbose=False):
1071 for b in self.default_boxes:
1074 def add_dummy_plc(self, plc_boxname, plcname):
1075 for pb in self.plc_boxes:
1076 if pb.hostname == plc_boxname:
1077 pb.add_dummy(plcname)
1079 def add_dummy_qemu(self, qemu_boxname, qemuname):
1080 for qb in self.qemu_boxes:
1081 if qb.hostname == qemu_boxname:
1082 qb.add_dummy(qemuname)
1085 def add_starting_dummy(self, bname, vname):
1086 return self.add_dummy_plc(bname, vname) or self.add_dummy_qemu(bname, vname)
1089 def provision(self, plcs, options):
1091 # attach each plc to a plc box and an IP address
1092 plcs = [ self.provision_plc(plc, options) for plc in plcs ]
1093 # attach each node/qemu to a qemu box with an IP address
1094 plcs = [ self.provision_qemus(plc,options) for plc in plcs ]
1095 # update the SFA spec accordingly
1096 plcs = [ self.localize_sfa_rspec(plc, options) for plc in plcs ]
1099 except Exception as e:
1100 print('* Could not provision this test on current substrate','--',e,'--','exiting')
1101 traceback.print_exc()
1104 # it is expected that a couple of options like ips_bplc and ips_vplc
1105 # are set or unset together
1107 def check_options(x, y):
1110 return len(x) == len(y)
1112 # find an available plc box (or make space)
1113 # and a free IP address (using options if present)
1114 def provision_plc(self, plc, options):
1116 assert Substrate.check_options(options.ips_bplc, options.ips_vplc)
1118 #### let's find an IP address for that plc
1120 if options.ips_vplc:
1122 # we don't check anything here,
1123 # it is the caller's responsability to cleanup and make sure this makes sense
1124 plc_boxname = options.ips_bplc.pop()
1125 vplc_hostname = options.ips_vplc.pop()
1130 vplc_hostname = None
1131 # try to find an available IP
1132 self.vplc_pool.sense()
1133 couple = self.vplc_pool.next_free()
1135 (vplc_hostname, unused) = couple
1136 #### we need to find one plc box that still has a slot
1138 # use the box that has max free spots for load balancing
1139 for pb in self.plc_boxes:
1140 free = pb.free_slots()
1142 plc_boxname = pb.hostname
1144 # if there's no available slot in the plc_boxes, or we need a free IP address
1145 # make space by killing the oldest running instance
1146 if not plc_boxname or not vplc_hostname:
1147 # find the oldest of all our instances
1148 all_plc_instances = reduce(lambda x, y: x+y,
1149 [ pb.plc_instances for pb in self.plc_boxes ],
1151 all_plc_instances.sort(key=timestamp_key)
1153 plc_instance_to_kill = all_plc_instances[0]
1157 msg += " PLC boxes are full"
1158 if not vplc_hostname:
1159 msg += " vplc IP pool exhausted"
1160 msg += " {}".format(self.summary_line())
1161 raise Exception("Cannot make space for a PLC instance:" + msg)
1162 freed_plc_boxname = plc_instance_to_kill.plc_box.hostname
1163 freed_vplc_hostname = plc_instance_to_kill.vplcname()
1164 message = 'killing oldest plc instance = {} on {}'\
1165 .format(plc_instance_to_kill.line(), freed_plc_boxname)
1166 plc_instance_to_kill.kill()
1167 # use this new plcbox if that was the problem
1169 plc_boxname = freed_plc_boxname
1170 # ditto for the IP address
1171 if not vplc_hostname:
1172 vplc_hostname = freed_vplc_hostname
1173 # record in pool as mine
1174 self.vplc_pool.set_mine(vplc_hostname)
1177 self.add_dummy_plc(plc_boxname, plc['name'])
1178 vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
1179 self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
1181 #### compute a helpful vserver name
1182 # remove domain in hostname
1183 vplc_short = short_hostname(vplc_hostname)
1184 vservername = "{}-{}-{}".format(options.buildname, plc['index'], vplc_short)
1185 plc_name = "{}_{}".format(plc['name'], vplc_short)
1187 utils.header('PROVISION plc {} in box {} at IP {} as {}'\
1188 .format(plc['name'], plc_boxname, vplc_hostname, vservername))
1190 #### apply in the plc_spec
1192 # label = options.personality.replace("linux","")
1193 mapper = {'plc' : [ ('*' , {'host_box' : plc_boxname,
1195 'vservername' : vservername,
1196 'vserverip' : vplc_ip,
1197 'settings:PLC_DB_HOST' : vplc_hostname,
1198 'settings:PLC_API_HOST' : vplc_hostname,
1199 'settings:PLC_BOOT_HOST' : vplc_hostname,
1200 'settings:PLC_WWW_HOST' : vplc_hostname,
1201 'settings:PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
1202 'settings:PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
1207 # mappers only work on a list of plcs
1208 return TestMapper([plc], options).map(mapper)[0]
1211 def provision_qemus(self, plc, options):
1213 assert Substrate.check_options(options.ips_bnode, options.ips_vnode)
1215 test_mapper = TestMapper([plc], options)
1216 nodenames = test_mapper.node_names()
1218 for nodename in nodenames:
1220 if options.ips_vnode:
1221 # as above, it's a rerun, take it for granted
1222 qemu_boxname = options.ips_bnode.pop()
1223 vnode_hostname = options.ips_vnode.pop()
1228 vnode_hostname = None
1229 # try to find an available IP
1230 self.vnode_pool.sense()
1231 couple = self.vnode_pool.next_free()
1233 (vnode_hostname, unused) = couple
1234 # find a physical box
1236 # use the box that has max free spots for load balancing
1237 for qb in self.qemu_boxes:
1238 free = qb.free_slots()
1240 qemu_boxname = qb.hostname
1242 # if we miss the box or the IP, kill the oldest instance
1243 if not qemu_boxname or not vnode_hostname:
1244 # find the oldest of all our instances
1245 all_qemu_instances = reduce(lambda x, y: x+y,
1246 [ qb.qemu_instances for qb in self.qemu_boxes ],
1248 all_qemu_instances.sort(key=timestamp_key)
1250 qemu_instance_to_kill = all_qemu_instances[0]
1253 if not qemu_boxname:
1254 msg += " QEMU boxes are full"
1255 if not vnode_hostname:
1256 msg += " vnode IP pool exhausted"
1257 msg += " {}".format(self.summary_line())
1258 raise Exception("Cannot make space for a QEMU instance:"+msg)
1259 freed_qemu_boxname = qemu_instance_to_kill.qemu_box.hostname
1260 freed_vnode_hostname = short_hostname(qemu_instance_to_kill.nodename)
1262 message = 'killing oldest qemu node = {} on {}'.format(qemu_instance_to_kill.line(),
1264 qemu_instance_to_kill.kill()
1265 # use these freed resources where needed
1266 if not qemu_boxname:
1267 qemu_boxname = freed_qemu_boxname
1268 if not vnode_hostname:
1269 vnode_hostname = freed_vnode_hostname
1270 self.vnode_pool.set_mine(vnode_hostname)
1272 self.add_dummy_qemu(qemu_boxname, vnode_hostname)
1273 mac = self.vnode_pool.retrieve_userdata(vnode_hostname)
1274 ip = self.vnode_pool.get_ip(vnode_hostname)
1275 self.vnode_pool.add_starting(vnode_hostname, qemu_boxname)
1277 vnode_fqdn = self.fqdn(vnode_hostname)
1278 nodemap = {'host_box' : qemu_boxname,
1279 'node_fields:hostname' : vnode_fqdn,
1280 'interface_fields:ip' : ip,
1281 'ipaddress_fields:ip_addr' : ip,
1282 'interface_fields:mac' : mac,
1284 nodemap.update(self.network_settings())
1285 maps.append( (nodename, nodemap) )
1287 utils.header("PROVISION node {} in box {} at IP {} with MAC {}"\
1288 .format(nodename, qemu_boxname, vnode_hostname, mac))
1290 return test_mapper.map({'node':maps})[0]
1292 def localize_sfa_rspec(self, plc, options):
1294 plc['sfa']['settings']['SFA_REGISTRY_HOST'] = plc['settings']['PLC_DB_HOST']
1295 plc['sfa']['settings']['SFA_AGGREGATE_HOST'] = plc['settings']['PLC_DB_HOST']
1296 plc['sfa']['settings']['SFA_SM_HOST'] = plc['settings']['PLC_DB_HOST']
1297 plc['sfa']['settings']['SFA_DB_HOST'] = plc['settings']['PLC_DB_HOST']
1298 plc['sfa']['settings']['SFA_PLC_URL'] = 'https://{}:443/PLCAPI/'.format(plc['settings']['PLC_API_HOST'])
1301 #################### release:
1302 def release(self, options):
1303 self.vplc_pool.release_my_starting()
1304 self.vnode_pool.release_my_starting()
1307 #################### show results for interactive mode
1308 def get_box(self, boxname):
1309 for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1310 if b.shortname() == boxname:
1313 if b.shortname() == boxname.split('.')[0]:
1317 print("Could not find box {}".format(boxname))
1320 # deal with the mix of boxes and names and stores the current focus
1321 # as a list of Box instances in self.focus_all
1322 def normalize(self, box_or_names):
1324 for box in box_or_names:
1325 if not isinstance(box, Box):
1326 box = self.get_box(box)
1328 print('Warning - could not handle box',box)
1329 self.focus_all.append(box)
1331 self.focus_build = [ x for x in self.focus_all if isinstance(x, BuildBox) ]
1332 self.focus_plc = [ x for x in self.focus_all if isinstance(x, PlcBox) ]
1333 self.focus_qemu = [ x for x in self.focus_all if isinstance(x, QemuBox) ]
1335 def list_boxes(self):
1336 print('Sensing', end=' ')
1338 for box in self.focus_all:
1339 box.sense(self.options)
1341 for box in self.focus_all:
1342 box.list(self.options.verbose)
1344 def reboot_boxes(self):
1345 for box in self.focus_all:
1346 box.reboot(self.options)
1348 def sanity_check(self):
1349 print('Sanity check')
1350 self.sanity_check_plc()
1351 self.sanity_check_qemu()
1353 def sanity_check_plc(self):
1356 def sanity_check_qemu(self):
1358 for box in self.focus_qemu:
1359 all_nodes += box.node_names()
1361 for node in all_nodes:
1362 if node not in hash:
1365 for (node,count) in list(hash.items()):
1367 print('WARNING - duplicate node', node)
1370 ####################
1371 # can be run as a utility to probe/display/manage the local infrastructure
1373 parser = OptionParser()
1374 parser.add_option('-r', "--reboot", action='store_true', dest='reboot', default=False,
1375 help='reboot mode (use shutdown -r)')
1376 parser.add_option('-s', "--soft", action='store_true', dest='soft', default=False,
1377 help='soft mode for reboot (terminates processes)')
1378 parser.add_option('-t', "--testbox", action='store_true', dest='testbox', default=False,
1379 help='add test box')
1380 parser.add_option('-b', "--build", action='store_true', dest='builds', default=False,
1381 help='add build boxes')
1382 parser.add_option('-p', "--plc", action='store_true', dest='plcs', default=False,
1383 help='add plc boxes')
1384 parser.add_option('-q', "--qemu", action='store_true', dest='qemus', default=False,
1385 help='add qemu boxes')
1386 parser.add_option('-a', "--all", action='store_true', dest='all', default=False,
1387 help='address all known boxes, like -b -t -p -q')
1388 parser.add_option('-v', "--verbose", action='store_true', dest='verbose', default=False,
1389 help='verbose mode')
1390 parser.add_option('-n', "--dry_run", action='store_true', dest='dry_run', default=False,
1391 help='dry run mode')
1392 (self.options, args) = parser.parse_args()
1395 if self.options.testbox: boxes += [self.test_box]
1396 if self.options.builds: boxes += self.build_boxes
1397 if self.options.plcs: boxes += self.plc_boxes
1398 if self.options.qemus: boxes += self.qemu_boxes
1399 if self.options.all: boxes += self.all_boxes
1402 verbose = self.options.verbose
1403 # default scope is -b -p -q -t
1405 boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box]
1407 self.normalize(boxes)
1409 if self.options.reboot: