2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA
5 # #################### history
7 # see also Substrate.readme
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity,
14 # hence the 'timestamp-*' steps in TestPlc
16 # this should be much more flexible:
17 # * supports several plc boxes
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
21 # #################### howto use
23 # each site is to write its own LocalSubstrate.py,
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
28 # . the vserver-capable boxes used for hosting myplcs
29 # . and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
32 # . the kvm-qemu capable boxes to host qemu instances
33 # . and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
36 # #################### implem. note
38 # this model relies on 'sensing' the substrate,
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies
41 # however there is a 'black hole' between the time where a given address is
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class
47 # ####################
56 from optparse import OptionParser
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
62 # too painful to propagate this cleanly
65 def header (message, banner=True):
66 if not message: return
67 if banner: print "===============",
71 def timestamp_sort(o1, o2):
72 return o1.timestamp-o2.timestamp
74 def short_hostname (hostname):
75 return hostname.split('.')[0]
78 # the place were other test instances tell about their not-yet-started
79 # instances, that go undetected through sensing
82 location='/root/starting'
87 try: self.tuples=[line.strip().split('@')
88 for line in file(Starting.location).readlines()]
89 except: self.tuples=[]
93 return [ x for (x,_) in self.tuples ]
95 def add (self, vname, bname):
96 if not vname in self.vnames():
97 with open(Starting.location, 'a') as out:
98 out.write("%s@%s\n"%(vname,bname))
100 def delete_vname (self, vname):
102 if vname in self.vnames():
103 f=file(Starting.location,'w')
104 for (v, b) in self.tuples:
105 if v != vname: f.write("%s@%s\n"%(v,b))
110 # allows to pick an available IP among a pool
111 # input is expressed as a list of tuples (hostname,ip,user_data)
112 # that can be searched iteratively for a free slot
114 # pool = [ (hostname1,user_data1),
115 # (hostname2,user_data2),
116 # (hostname3,user_data2),
117 # (hostname4,user_data4) ]
118 # assuming that ip1 and ip3 are taken (pingable), then we'd get
120 # pool.next_free() -> entry2
121 # pool.next_free() -> entry4
122 # pool.next_free() -> None
123 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
126 def __init__ (self, hostname, userdata):
127 self.hostname = hostname
128 self.userdata = userdata
129 # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
130 # 'mine' is for our own stuff, 'starting' from the concurrent tests
135 return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
138 if self.status==None: return '?'
139 elif self.status=='busy': return '+'
140 elif self.status=='free': return '-'
141 elif self.status=='mine': return 'M'
142 elif self.status=='starting': return 'S'
145 if self.ip: return self.ip
146 ip=socket.gethostbyname(self.hostname)
152 def __init__ (self, tuples, message, substrate):
153 self.pool_items = [ PoolItem (hostname, userdata) for (hostname, userdata) in tuples ]
154 self.message = message
155 # where to send notifications upon load_starting
156 self.substrate = substrate
158 def list (self, verbose=False):
159 for i in self.pool_items: print i.line()
163 for i in self.pool_items: line += ' ' + i.char()
166 def _item (self, hostname):
167 for i in self.pool_items:
168 if i.hostname == hostname: return i
169 raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
171 def retrieve_userdata (self, hostname):
172 return self._item(hostname).userdata
174 def get_ip (self, hostname):
175 try: return self._item(hostname).get_ip()
176 except: return socket.gethostbyname(hostname)
178 def set_mine (self, hostname):
180 self._item(hostname).status='mine'
182 print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
184 def next_free (self):
185 for i in self.pool_items:
186 if i.status == 'free':
188 return (i.hostname, i.userdata)
192 # we have a starting instance of our own
193 def add_starting (self, vname, bname):
194 Starting().add(vname, bname)
195 for i in self.pool_items:
196 if i.hostname == vname: i.status='mine'
198 # load the starting instances from the common file
199 # remember that might be ours
200 # return the list of (vname,bname) that are not ours
201 def load_starting (self):
205 for (v,b) in starting.tuples:
206 for i in self.pool_items:
207 if i.hostname == v and i.status == 'free':
208 i.status = 'starting'
209 new_tuples.append( (v, b,) )
212 def release_my_starting (self):
213 for i in self.pool_items:
214 if i.status == 'mine':
215 Starting().delete_vname (i.hostname)
221 for item in self.pool_items:
222 if item.status is not None:
225 if self.check_ping (item.hostname):
233 print 'Sensing IP pool', self.message,
236 for (vname,bname) in self.load_starting():
237 self.substrate.add_starting_dummy (bname, vname)
238 print "After having loaded 'starting': IP pool"
240 # OS-dependent ping option (support for macos, for convenience)
241 ping_timeout_option = None
242 # returns True when a given hostname/ip responds to ping
243 def check_ping (self, hostname):
244 if not Pool.ping_timeout_option:
245 (status, osname) = commands.getstatusoutput("uname -s")
247 raise Exception, "TestPool: Cannot figure your OS name"
248 if osname == "Linux":
249 Pool.ping_timeout_option="-w"
250 elif osname == "Darwin":
251 Pool.ping_timeout_option="-t"
253 command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option, hostname)
254 (status,output) = commands.getstatusoutput(command)
259 def __init__ (self,hostname):
260 self.hostname = hostname
262 def shortname (self):
263 return short_hostname(self.hostname)
265 return TestSsh(self.hostname, username='root', unknown_host=False)
266 def reboot (self, options):
267 self.test_ssh().run("shutdown -r now",
268 message="Rebooting %s"%self.hostname,
269 dry_run=options.dry_run)
271 def hostname_fedora (self, virt=None):
272 result = "%s {"%self.hostname
275 result += "%s %s"%(self.fedora(),self.memory())
276 # too painful to propagate this cleanly
279 result += "-%s" % self.uname()
283 separator = "===composite==="
286 # take this chance to gather useful stuff
289 if self._probed is not None: return self._probed
290 composite_command = [ ]
291 composite_command += [ "hostname" ]
292 composite_command += [ ";" , "echo", Box.separator , ";" ]
293 composite_command += [ "uptime" ]
294 composite_command += [ ";" , "echo", Box.separator , ";" ]
295 composite_command += [ "uname", "-r"]
296 composite_command += [ ";" , "echo", Box.separator , ";" ]
297 composite_command += [ "cat" , "/etc/fedora-release" ]
298 composite_command += [ ";" , "echo", Box.separator , ";" ]
299 composite_command += [ "grep", "MemTotal", "/proc/meminfo" ]
301 # due to colons and all, this is going wrong on the local box (typically testmaster)
302 # I am reluctant to change TestSsh as it might break all over the place, so
303 if self.test_ssh().is_local():
304 probe_argv = [ "bash", "-c", " ".join (composite_command) ]
306 probe_argv = self.test_ssh().actual_argv(composite_command)
307 composite = self.backquote ( probe_argv, trash_err=True )
308 self._hostname = self._uptime = self._uname = self._fedora = self._memory = "** Unknown **"
310 print "root@%s unreachable"%self.hostname
314 pieces = composite.split(Box.separator)
315 pieces = [ x.strip() for x in pieces ]
317 [hostname, uptime, uname, fedora, memory] = pieces
319 self._hostname = hostname
320 self._uptime = ', '.join([ x.strip() for x in uptime.split(',')[2:]]).replace("load average", "load")
322 self._fedora = fedora.replace("Fedora release ","f").split(" ")[0]
324 self._memory = int(memory.split()[1])/(1024)
327 print 'BEG issue with pieces',pieces
328 traceback.print_exc()
329 print 'END issue with pieces',pieces
330 self._probed = self._hostname
333 # use argv=['bash','-c',"the command line"]
336 if hasattr(self,'_uptime') and self._uptime: return self._uptime
337 return '*unprobed* uptime'
340 if hasattr(self,'_uname') and self._uname: return self._uname
341 return '*unprobed* uname'
344 if hasattr(self,'_fedora') and self._fedora: return self._fedora
345 return '*unprobed* fedora'
348 if hasattr(self,'_memory') and self._memory: return "%s Mb"%self._memory
349 return '*unprobed* memory'
351 def run(self, argv, message=None, trash_err=False, dry_run=False):
359 return subprocess.call(argv)
361 return subprocess.call(argv, stderr=file('/dev/null','w'))
363 def run_ssh (self, argv, message, trash_err=False, dry_run=False):
364 ssh_argv = self.test_ssh().actual_argv(argv)
365 result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
367 print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
370 def backquote (self, argv, trash_err=False):
371 # print 'running backquote',argv
373 result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
375 result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
378 # if you have any shell-expanded arguments like *
379 # and if there's any chance the command is adressed to the local host
380 def backquote_ssh (self, argv, trash_err=False):
381 if not self.probe(): return ''
382 return self.backquote(self.test_ssh().actual_argv(argv), trash_err)
384 ############################################################
386 def __init__ (self, buildname, pid, buildbox):
387 self.buildname = buildname
388 self.buildbox = buildbox
391 def add_pid(self,pid):
392 self.pids.append(pid)
395 return "== %s == (pids=%r)"%(self.buildname,self.pids)
397 class BuildBox (Box):
398 def __init__ (self, hostname):
399 Box.__init__(self, hostname)
400 self.build_instances = []
402 def add_build(self, buildname, pid):
403 for build in self.build_instances:
404 if build.buildname == buildname:
407 self.build_instances.append(BuildInstance(buildname, pid, self))
409 def list(self, verbose=False):
410 if not self.build_instances:
411 header ('No build process on %s (%s)'%(self.hostname_fedora(), self.uptime()))
413 header ("Builds on %s (%s)"%(self.hostname_fedora(), self.uptime()))
414 for b in self.build_instances:
415 header (b.line(), banner=False)
417 def reboot (self, options):
419 Box.reboot(self, options)
421 self.soft_reboot (options)
423 build_matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
424 build_matcher_initvm=re.compile("\s*(?P<pid>[0-9]+).*initvm.*\s+(?P<buildname>[^\s]+)\s*\Z")
426 class BuildLxcBox (BuildBox):
427 def soft_reboot (self, options):
428 command=['pkill','lbuild']
429 self.run_ssh(command, "Terminating vbuild processes", dry_run=options.dry_run)
431 # inspect box and find currently running builds
432 def sense(self, options):
434 pids = self.backquote_ssh(['pgrep','lbuild'], trash_err=True)
436 command = ['ps', '-o', 'pid,command'] + [ pid for pid in pids.split("\n") if pid]
437 ps_lines = self.backquote_ssh(command).split('\n')
438 for line in ps_lines:
439 if not line.strip() or line.find('PID') >= 0: continue
440 m = build_matcher.match(line)
442 date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
443 buildname = m.group('buildname').replace('@DATE@', date)
444 self.add_build(buildname, m.group('pid'))
446 m = build_matcher_initvm.match(line)
448 # buildname is expansed here
449 self.add_build(buildname, m.group('pid'))
451 header('BuildLxcBox.sense: command %r returned line that failed to match'%command)
452 header(">>%s<<"%line)
454 ############################################################
456 def __init__ (self, plcbox):
457 self.plc_box = plcbox
461 def set_timestamp (self,timestamp):
462 self.timestamp = timestamp
464 self.timestamp = int(time.time())
465 def pretty_timestamp (self):
466 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
468 class PlcLxcInstance (PlcInstance):
469 # does lxc have a context id of any kind ?
470 def __init__ (self, plcbox, lxcname, pid):
471 PlcInstance.__init__(self, plcbox)
472 self.lxcname = lxcname
476 return self.lxcname.split('-')[-1]
477 def buildname (self):
478 return self.lxcname.rsplit('-',2)[0]
481 msg="== %s =="%(self.vplcname())
482 msg += " [=%s]"%self.lxcname
483 if self.pid==-1: msg+=" not (yet?) running"
484 else: msg+=" (pid=%s)"%self.pid
485 if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
486 else: msg += " *unknown timestamp*"
490 command="rsync lxc-driver.sh %s:/root"%self.plc_box.hostname
491 commands.getstatusoutput(command)
492 msg="lxc container stopping %s on %s"%(self.lxcname, self.plc_box.hostname)
493 self.plc_box.run_ssh(['/root/lxc-driver.sh', '-c', 'stop_lxc', '-n', self.lxcname], msg)
494 self.plc_box.forget(self)
498 def __init__ (self, hostname, max_plcs):
499 Box.__init__(self, hostname)
500 self.plc_instances = []
501 self.max_plcs = max_plcs
503 def free_slots (self):
504 return self.max_plcs - len(self.plc_instances)
506 # fill one slot even though this one is not started yet
507 def add_dummy (self, plcname):
508 dummy=PlcLxcInstance(self, 'dummy_'+plcname, 0)
510 self.plc_instances.append(dummy)
512 def forget (self, plc_instance):
513 self.plc_instances.remove(plc_instance)
515 def reboot (self, options):
517 Box.reboot(self, options)
519 self.soft_reboot (options)
521 def list(self, verbose=False):
522 if not self.plc_instances:
523 header ('No plc running on %s'%(self.line()))
525 header ("Active plc VMs on %s"%self.line())
526 self.plc_instances.sort(timestamp_sort)
527 for p in self.plc_instances:
528 header (p.line(), banner=False)
530 ## we do not this at INRIA any more
531 class PlcLxcBox (PlcBox):
533 def add_lxc (self, lxcname, pid):
534 for plc in self.plc_instances:
535 if plc.lxcname == lxcname:
536 header("WARNING, duplicate myplc %s running on %s"%\
537 (lxcname, self.hostname), banner=False)
539 self.plc_instances.append(PlcLxcInstance(self, lxcname, pid))
542 # a line describing the box
544 return "%s [max=%d,free=%d] (%s)"%(self.hostname_fedora(virt="lxc"),
545 self.max_plcs,self.free_slots(),
548 def plc_instance_by_lxcname(self, lxcname):
549 for p in self.plc_instances:
550 if p.lxcname == lxcname:
554 # essentially shutdown all running containers
555 def soft_reboot(self, options):
556 command="rsync lxc-driver.sh %s:/root"%self.hostname
557 commands.getstatusoutput(command)
558 self.run_ssh( ['/root/lxc-driver.sh','-c','stop_all'],
559 "Stopping all running lxc containers on %s"%self.hostname,
560 dry_run=options.dry_run)
563 # sense is expected to fill self.plc_instances with PlcLxcInstance's
564 # to describe the currently running VM's
565 def sense(self, options):
567 command="rsync lxc-driver.sh %s:/root"%self.hostname
568 commands.getstatusoutput(command)
569 command=['/root/lxc-driver.sh', '-c', 'sense_all']
570 lxc_stat = self.backquote_ssh (command)
571 for lxc_line in lxc_stat.split("\n"):
572 if not lxc_line: continue
573 lxcname = lxc_line.split(";")[0]
574 pid = lxc_line.split(";")[1]
575 timestamp = lxc_line.split(";")[2]
576 self.add_lxc(lxcname,pid)
577 try: timestamp = int(timestamp)
578 except: timestamp = 0
579 p = self.plc_instance_by_lxcname(lxcname)
581 print 'WARNING zombie plc',self.hostname,lxcname
582 print '... was expecting',lxcname,'in',[i.lxcname for i in self.plc_instances]
584 p.set_timestamp(timestamp)
586 ############################################################
588 def __init__(self, nodename, pid, qemubox):
589 self.nodename = nodename
591 self.qemu_box = qemubox
593 self.buildname = None
596 def set_buildname (self, buildname):
597 self.buildname = buildname
598 def set_timestamp (self, timestamp):
599 self.timestamp = timestamp
601 self.timestamp = int(time.time())
602 def pretty_timestamp (self):
603 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
606 msg = "== %s =="%(short_hostname(self.nodename))
607 msg += " [=%s]"%self.buildname
608 if self.pid: msg += " (pid=%s)"%self.pid
609 else: msg += " not (yet?) running"
610 if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
611 else: msg += " *unknown timestamp*"
616 print "cannot kill qemu %s with pid==0"%self.nodename
618 msg="Killing qemu %s with pid=%s on box %s"%(self.nodename, self.pid, self.qemu_box.hostname)
619 self.qemu_box.run_ssh(['kill', "%s"%self.pid], msg)
620 self.qemu_box.forget(self)
624 def __init__ (self, hostname, max_qemus):
625 Box.__init__(self, hostname)
626 self.qemu_instances = []
627 self.max_qemus = max_qemus
629 def add_node(self, nodename, pid):
630 for qemu in self.qemu_instances:
631 if qemu.nodename == nodename:
632 header("WARNING, duplicate qemu %s running on %s"%\
633 (nodename,self.hostname), banner=False)
635 self.qemu_instances.append(QemuInstance(nodename, pid, self))
637 def node_names (self):
638 return [ qi.nodename for qi in self.qemu_instances ]
640 def forget (self, qemu_instance):
641 self.qemu_instances.remove(qemu_instance)
643 # fill one slot even though this one is not started yet
644 def add_dummy(self, nodename):
645 dummy=QemuInstance('dummy_'+nodename, 0, self)
647 self.qemu_instances.append(dummy)
650 return "%s [max=%d,free=%d] (%s) %s"%(
651 self.hostname_fedora(virt="qemu"), self.max_qemus, self.free_slots(),
652 self.uptime(), self.driver())
654 def list(self, verbose=False):
655 if not self.qemu_instances:
656 header ('No qemu on %s'%(self.line()))
658 header ("Qemus on %s"%(self.line()))
659 self.qemu_instances.sort(timestamp_sort)
660 for q in self.qemu_instances:
661 header (q.line(), banner=False)
663 def free_slots (self):
664 return self.max_qemus - len(self.qemu_instances)
667 if hasattr(self,'_driver') and self._driver:
669 return '*undef* driver'
671 def qemu_instance_by_pid(self, pid):
672 for q in self.qemu_instances:
677 def qemu_instance_by_nodename_buildname (self, nodename, buildname):
678 for q in self.qemu_instances:
679 if q.nodename == nodename and q.buildname == buildname:
683 def reboot (self, options):
685 Box.reboot(self, options)
687 self.run_ssh(['pkill','qemu'], "Killing qemu instances",
688 dry_run=options.dry_run)
690 matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
692 def sense(self, options):
694 modules = self.backquote_ssh(['lsmod']).split('\n')
695 self._driver = '*NO kqemu/kvm_intel MODULE LOADED*'
696 for module in modules:
697 if module.find('kqemu') == 0:
698 self._driver = 'kqemu module loaded'
699 # kvm might be loaded without kvm_intel (we dont have AMD)
700 elif module.find('kvm_intel') == 0:
701 self._driver = 'kvm_intel OK'
702 ########## find out running pids
703 pids = self.backquote_ssh(['pgrep','qemu'])
706 command = ['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
707 ps_lines = self.backquote_ssh(command).split("\n")
708 for line in ps_lines:
709 if not line.strip() or line.find('PID') >=0 :
711 m = QemuBox.matcher.match(line)
713 self.add_node(m.group('nodename'), m.group('pid'))
715 header('QemuBox.sense: command %r returned line that failed to match'%command)
716 header(">>%s<<"%line)
717 ########## retrieve alive instances and map to build
719 command = ['grep', '.', '/vservers/*/*/qemu.pid', '/dev/null']
720 pid_lines = self.backquote_ssh(command, trash_err=True).split('\n')
721 for pid_line in pid_lines:
722 if not pid_line.strip():
724 # expect <build>/<nodename>/qemu.pid:<pid>pid
726 (_, __, buildname, nodename, tail) = pid_line.split('/')
727 (_,pid) = tail.split(':')
728 q = self.qemu_instance_by_pid(pid)
731 q.set_buildname(buildname)
732 live_builds.append(buildname)
734 print 'WARNING, could not parse pid line',pid_line
735 # retrieve timestamps
738 command = ['grep','.']
739 command += ['/vservers/%s/*/timestamp'%b for b in live_builds]
740 command += ['/dev/null']
741 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
742 for ts_line in ts_lines:
743 if not ts_line.strip():
745 # expect <build>/<nodename>/timestamp:<timestamp>
747 (_, __, buildname, nodename, tail) = ts_line.split('/')
748 nodename = nodename.replace('qemu-', '')
749 (_, timestamp) = tail.split(':')
750 timestamp = int(timestamp)
751 q = self.qemu_instance_by_nodename_buildname(nodename, buildname)
753 # this warning corresponds to qemu instances that were not killed properly
754 # and that have a dangling qemu.pid - and not even all of them as they need
755 # to be attached to a build that has a node running...
756 # it is more confusing than helpful, so let's just trash it
757 #print 'WARNING zombie qemu',self.hostname,ts_line
758 #print '... was expecting (',short_hostname(nodename),buildname,') in',\
759 # [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
761 q.set_timestamp(timestamp)
763 print 'WARNING, could not parse ts line',ts_line
767 def __init__(self, buildname, pid=0):
771 self.buildname = buildname
775 self.broken_steps = []
778 def set_timestamp(self, timestamp):
779 self.timestamp = timestamp
781 self.timestamp = int(time.time())
782 def pretty_timestamp(self):
783 return time.strftime("%Y-%m-%d:%H-%M", time.localtime(self.timestamp))
784 def is_running (self):
785 return len(self.pids) != 0
786 def add_pid(self, pid):
787 self.pids.append(pid)
788 def set_broken(self, plcindex, step):
789 self.broken_steps.append( (plcindex, step,) )
791 def second_letter(self):
792 if not self.broken_steps:
795 really_broken = [ step for (i,step) in self.broken_steps if '_ignore' not in step ]
796 # W is for warning like what's in the build mail
797 if len(really_broken) == 0:
803 # make up a 2-letter sign
804 # first letter : '=', unless build is running : '*'
805 double = '*' if self.pids else '='
806 # second letter : '=' if fine, 'W' for warnings (only ignored steps) 'B' for broken
807 letter2 = self.second_letter()
809 msg = " %s %s =="%(double,self.buildname)
812 elif len(self.pids)==1:
813 msg += " (pid=%s)"%self.pids[0]
815 msg += " !!!pids=%s!!!"%self.pids
816 msg += " @%s"%self.pretty_timestamp()
818 msg2 = ( ' BROKEN' if letter2 == 'B' else ' WARNING' )
819 # sometimes we have an empty plcindex
820 msg += " [%s="%msg2 + " ".join( [ "%s@%s"%(s,i) if i else s for (i, s) in self.broken_steps ] ) + "]"
824 def __init__(self, hostname):
825 Box.__init__(self, hostname)
826 self.starting_ips = []
827 self.test_instances = []
829 def reboot(self, options):
830 # can't reboot a vserver VM
831 self.run_ssh(['pkill', 'run_log'], "Terminating current runs",
832 dry_run=options.dry_run)
833 self.run_ssh(['rm', '-f', Starting.location], "Cleaning %s"%Starting.location,
834 dry_run=options.dry_run)
836 def get_test(self, buildname):
837 for i in self.test_instances:
838 if i.buildname == buildname:
841 # we scan ALL remaining test results, even the ones not running
842 def add_timestamp(self, buildname, timestamp):
843 i = self.get_test(buildname)
845 i.set_timestamp(timestamp)
847 i = TestInstance(buildname, 0)
848 i.set_timestamp(timestamp)
849 self.test_instances.append(i)
851 def add_running_test(self, pid, buildname):
852 i = self.get_test(buildname)
854 self.test_instances.append(TestInstance(buildname, pid))
857 print "WARNING: 2 concurrent tests run on same build %s"%buildname
860 def add_broken(self, buildname, plcindex, step):
861 i = self.get_test(buildname)
863 i = TestInstance(buildname)
864 self.test_instances.append(i)
865 i.set_broken(plcindex, step)
867 matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
868 matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
869 matcher_grep_missing=re.compile ("grep: /root/(?P<buildname>[^/]+)/logs/trace: No such file or directory")
871 def sense(self, options):
873 self.starting_ips = [x for x in self.backquote_ssh(['cat',Starting.location], trash_err=True).strip().split('\n') if x]
875 # scan timestamps on all tests
876 # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
877 # xxx would make sense above too
878 command = ['bash', '-c', "grep . /root/*/timestamp /dev/null"]
879 ts_lines = self.backquote_ssh(command, trash_err=True).split('\n')
880 for ts_line in ts_lines:
881 if not ts_line.strip():
883 # expect /root/<buildname>/timestamp:<timestamp>
885 (ts_file, timestamp) = ts_line.split(':')
886 ts_file = os.path.dirname(ts_file)
887 buildname = os.path.basename(ts_file)
888 timestamp = int(timestamp)
889 t = self.add_timestamp(buildname, timestamp)
891 print 'WARNING, could not parse ts line', ts_line
893 # let's try to be robust here -- tests that fail very early like e.g.
894 # "Cannot make space for a PLC instance: vplc IP pool exhausted", that occurs as part of provision
895 # will result in a 'trace' symlink to an inexisting 'trace-<>.txt' because no step has gone through
896 # simple 'trace' should exist though as it is created by run_log
897 command = ['bash', '-c', "grep KO /root/*/logs/trace /dev/null 2>&1" ]
898 trace_lines = self.backquote_ssh(command).split('\n')
899 for line in trace_lines:
902 m = TestBox.matcher_grep_missing.match(line)
904 buildname = m.group('buildname')
905 self.add_broken(buildname, '', 'NO STEP DONE')
907 m = TestBox.matcher_grep.match(line)
909 buildname = m.group('buildname')
910 plcindex = m.group('plcindex')
911 step = m.group('step')
912 self.add_broken(buildname, plcindex, step)
914 header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
915 header(">>%s<<"%line)
917 pids = self.backquote_ssh (['pgrep', 'run_log'], trash_err=True)
920 command = ['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
921 ps_lines = self.backquote_ssh(command).split('\n')
922 for line in ps_lines:
925 m = TestBox.matcher_proc.match(line)
928 buildname = m.group('buildname')
929 self.add_running_test(pid, buildname)
931 header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
932 header(">>%s<<"%line)
936 return self.hostname_fedora()
938 def list (self, verbose=False):
939 # verbose shows all tests
941 instances = self.test_instances
944 instances = [ i for i in self.test_instances if i.is_running() ]
948 header ("No %s on %s"%(msg,self.line()))
950 header ("%s on %s"%(msg,self.line()))
951 instances.sort(timestamp_sort)
954 # show 'starting' regardless of verbose
955 if self.starting_ips:
956 header("Starting IP addresses on %s"%self.line())
957 self.starting_ips.sort()
958 for starting in self.starting_ips:
961 header("Empty 'starting' on %s"%self.line())
963 ############################################################
969 self.options = Options()
970 self.options.dry_run = False
971 self.options.verbose = False
972 self.options.reboot = False
973 self.options.soft = False
974 self.test_box = TestBox (self.test_box_spec())
975 self.build_lxc_boxes = [ BuildLxcBox(h) for h in self.build_lxc_boxes_spec() ]
976 self.plc_lxc_boxes = [ PlcLxcBox (h, m) for (h, m) in self.plc_lxc_boxes_spec ()]
977 self.qemu_boxes = [ QemuBox (h, m) for (h, m) in self.qemu_boxes_spec ()]
980 self.vplc_pool = Pool(self.vplc_ips(), "for vplcs", self)
981 self.vnode_pool = Pool(self.vnode_ips(), "for vnodes", self)
983 self.build_boxes = self.build_lxc_boxes
984 self.plc_boxes = self.plc_lxc_boxes
985 self.default_boxes = self.plc_boxes + self.qemu_boxes
986 self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
988 def summary_line (self):
990 msg += " %d xp"%len(self.plc_lxc_boxes)
991 msg += " %d tried plc boxes"%len(self.plc_boxes)
995 def fqdn (self, hostname):
996 if hostname.find('.') < 0:
997 return "%s.%s" % (hostname, self.domain())
1000 # return True if actual sensing takes place
1001 def sense(self, force=False):
1002 if self._sensed and not force:
1004 print 'Sensing local substrate...',
1005 for b in self.default_boxes:
1006 b.sense(self.options)
1011 def list(self, verbose=False):
1012 for b in self.default_boxes:
1015 def add_dummy_plc(self, plc_boxname, plcname):
1016 for pb in self.plc_boxes:
1017 if pb.hostname == plc_boxname:
1018 pb.add_dummy(plcname)
1020 def add_dummy_qemu(self, qemu_boxname, qemuname):
1021 for qb in self.qemu_boxes:
1022 if qb.hostname == qemu_boxname:
1023 qb.add_dummy(qemuname)
1026 def add_starting_dummy(self, bname, vname):
1027 return self.add_dummy_plc(bname, vname) or self.add_dummy_qemu(bname, vname)
1030 def provision(self, plcs, options):
1032 # attach each plc to a plc box and an IP address
1033 plcs = [ self.provision_plc(plc, options) for plc in plcs ]
1034 # attach each node/qemu to a qemu box with an IP address
1035 plcs = [ self.provision_qemus(plc,options) for plc in plcs ]
1036 # update the SFA spec accordingly
1037 plcs = [ self.localize_sfa_rspec(plc, options) for plc in plcs ]
1040 except Exception, e:
1041 print '* Could not provision this test on current substrate','--',e,'--','exiting'
1042 traceback.print_exc()
1045 # it is expected that a couple of options like ips_bplc and ips_vplc
1046 # are set or unset together
1048 def check_options(x, y):
1051 return len(x) == len(y)
1053 # find an available plc box (or make space)
1054 # and a free IP address (using options if present)
1055 def provision_plc(self, plc, options):
1057 assert Substrate.check_options(options.ips_bplc, options.ips_vplc)
1059 #### let's find an IP address for that plc
1061 if options.ips_vplc:
1063 # we don't check anything here,
1064 # it is the caller's responsability to cleanup and make sure this makes sense
1065 plc_boxname = options.ips_bplc.pop()
1066 vplc_hostname = options.ips_vplc.pop()
1071 vplc_hostname = None
1072 # try to find an available IP
1073 self.vplc_pool.sense()
1074 couple = self.vplc_pool.next_free()
1076 (vplc_hostname, unused) = couple
1077 #### we need to find one plc box that still has a slot
1079 # use the box that has max free spots for load balancing
1080 for pb in self.plc_boxes:
1081 free = pb.free_slots()
1083 plc_boxname = pb.hostname
1085 # if there's no available slot in the plc_boxes, or we need a free IP address
1086 # make space by killing the oldest running instance
1087 if not plc_boxname or not vplc_hostname:
1088 # find the oldest of all our instances
1089 all_plc_instances = reduce(lambda x, y: x+y,
1090 [ pb.plc_instances for pb in self.plc_boxes ],
1092 all_plc_instances.sort(timestamp_sort)
1094 plc_instance_to_kill = all_plc_instances[0]
1098 msg += " PLC boxes are full"
1099 if not vplc_hostname:
1100 msg += " vplc IP pool exhausted"
1101 msg += " %s"%self.summary_line()
1102 raise Exception,"Cannot make space for a PLC instance:" + msg
1103 freed_plc_boxname = plc_instance_to_kill.plc_box.hostname
1104 freed_vplc_hostname = plc_instance_to_kill.vplcname()
1105 message = 'killing oldest plc instance = %s on %s' % (plc_instance_to_kill.line(),
1107 plc_instance_to_kill.kill()
1108 # use this new plcbox if that was the problem
1110 plc_boxname = freed_plc_boxname
1111 # ditto for the IP address
1112 if not vplc_hostname:
1113 vplc_hostname = freed_vplc_hostname
1114 # record in pool as mine
1115 self.vplc_pool.set_mine(vplc_hostname)
1118 self.add_dummy_plc(plc_boxname, plc['name'])
1119 vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
1120 self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
1122 #### compute a helpful vserver name
1123 # remove domain in hostname
1124 vplc_short = short_hostname(vplc_hostname)
1125 vservername = "%s-%d-%s" % (options.buildname, plc['index'], vplc_short)
1126 plc_name = "%s_%s" % (plc['name'], vplc_short)
1128 utils.header('PROVISION plc %s in box %s at IP %s as %s' % \
1129 (plc['name'], plc_boxname, vplc_hostname, vservername))
1131 #### apply in the plc_spec
1133 # label = options.personality.replace("linux","")
1134 mapper = {'plc' : [ ('*' , {'host_box' : plc_boxname,
1136 'vservername' : vservername,
1137 'vserverip' : vplc_ip,
1138 'settings:PLC_DB_HOST' : vplc_hostname,
1139 'settings:PLC_API_HOST' : vplc_hostname,
1140 'settings:PLC_BOOT_HOST' : vplc_hostname,
1141 'settings:PLC_WWW_HOST' : vplc_hostname,
1142 'settings:PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
1143 'settings:PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
1148 # mappers only work on a list of plcs
1149 return TestMapper([plc], options).map(mapper)[0]
1152 def provision_qemus(self, plc, options):
1154 assert Substrate.check_options(options.ips_bnode, options.ips_vnode)
1156 test_mapper = TestMapper([plc], options)
1157 nodenames = test_mapper.node_names()
1159 for nodename in nodenames:
1161 if options.ips_vnode:
1162 # as above, it's a rerun, take it for granted
1163 qemu_boxname = options.ips_bnode.pop()
1164 vnode_hostname = options.ips_vnode.pop()
1169 vnode_hostname = None
1170 # try to find an available IP
1171 self.vnode_pool.sense()
1172 couple = self.vnode_pool.next_free()
1174 (vnode_hostname, unused) = couple
1175 # find a physical box
1177 # use the box that has max free spots for load balancing
1178 for qb in self.qemu_boxes:
1179 free = qb.free_slots()
1181 qemu_boxname = qb.hostname
1183 # if we miss the box or the IP, kill the oldest instance
1184 if not qemu_boxname or not vnode_hostname:
1185 # find the oldest of all our instances
1186 all_qemu_instances = reduce(lambda x, y: x+y,
1187 [ qb.qemu_instances for qb in self.qemu_boxes ],
1189 all_qemu_instances.sort(timestamp_sort)
1191 qemu_instance_to_kill = all_qemu_instances[0]
1194 if not qemu_boxname:
1195 msg += " QEMU boxes are full"
1196 if not vnode_hostname:
1197 msg += " vnode IP pool exhausted"
1198 msg += " %s"%self.summary_line()
1199 raise Exception,"Cannot make space for a QEMU instance:"+msg
1200 freed_qemu_boxname = qemu_instance_to_kill.qemu_box.hostname
1201 freed_vnode_hostname = short_hostname(qemu_instance_to_kill.nodename)
1203 message = 'killing oldest qemu node = %s on %s' % (qemu_instance_to_kill.line(),
1205 qemu_instance_to_kill.kill()
1206 # use these freed resources where needed
1207 if not qemu_boxname:
1208 qemu_boxname = freed_qemu_boxname
1209 if not vnode_hostname:
1210 vnode_hostname = freed_vnode_hostname
1211 self.vnode_pool.set_mine(vnode_hostname)
1213 self.add_dummy_qemu(qemu_boxname, vnode_hostname)
1214 mac = self.vnode_pool.retrieve_userdata(vnode_hostname)
1215 ip = self.vnode_pool.get_ip(vnode_hostname)
1216 self.vnode_pool.add_starting(vnode_hostname, qemu_boxname)
1218 vnode_fqdn = self.fqdn(vnode_hostname)
1219 nodemap = {'host_box' : qemu_boxname,
1220 'node_fields:hostname' : vnode_fqdn,
1221 'interface_fields:ip' : ip,
1222 'ipaddress_fields:ip_addr' : ip,
1223 'interface_fields:mac' : mac,
1225 nodemap.update(self.network_settings())
1226 maps.append( (nodename, nodemap) )
1228 utils.header("PROVISION node %s in box %s at IP %s with MAC %s" % \
1229 (nodename, qemu_boxname, vnode_hostname, mac))
1231 return test_mapper.map({'node':maps})[0]
1233 def localize_sfa_rspec(self, plc, options):
1235 plc['sfa']['settings']['SFA_REGISTRY_HOST'] = plc['settings']['PLC_DB_HOST']
1236 plc['sfa']['settings']['SFA_AGGREGATE_HOST'] = plc['settings']['PLC_DB_HOST']
1237 plc['sfa']['settings']['SFA_SM_HOST'] = plc['settings']['PLC_DB_HOST']
1238 plc['sfa']['settings']['SFA_DB_HOST'] = plc['settings']['PLC_DB_HOST']
1239 plc['sfa']['settings']['SFA_PLC_URL'] = 'https://%s:443/PLCAPI/' % plc['settings']['PLC_API_HOST']
1242 #################### release:
1243 def release(self, options):
1244 self.vplc_pool.release_my_starting()
1245 self.vnode_pool.release_my_starting()
1248 #################### show results for interactive mode
1249 def get_box(self, boxname):
1250 for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1251 if b.shortname() == boxname:
1254 if b.shortname() == boxname.split('.')[0]:
1258 print "Could not find box %s" % boxname
1261 # deal with the mix of boxes and names and stores the current focus
1262 # as a list of Box instances in self.focus_all
1263 def normalize(self, box_or_names):
1265 for box in box_or_names:
1266 if not isinstance(box, Box):
1267 box = self.get_box(box)
1269 print 'Warning - could not handle box',box
1270 self.focus_all.append(box)
1272 self.focus_build = [ x for x in self.focus_all if isinstance(x, BuildBox) ]
1273 self.focus_plc = [ x for x in self.focus_all if isinstance(x, PlcBox) ]
1274 self.focus_qemu = [ x for x in self.focus_all if isinstance(x, QemuBox) ]
1276 def list_boxes(self):
1278 for box in self.focus_all:
1279 box.sense(self.options)
1281 for box in self.focus_all:
1282 box.list(self.options.verbose)
1284 def reboot_boxes(self):
1285 for box in self.focus_all:
1286 box.reboot(self.options)
1288 def sanity_check(self):
1289 print 'Sanity check'
1290 self.sanity_check_plc()
1291 self.sanity_check_qemu()
1293 def sanity_check_plc(self):
1296 def sanity_check_qemu(self):
1298 for box in self.focus_qemu:
1299 all_nodes += box.node_names()
1301 for node in all_nodes:
1302 if node not in hash:
1305 for (node,count) in hash.items():
1307 print 'WARNING - duplicate node', node
1310 ####################
1311 # can be run as a utility to probe/display/manage the local infrastructure
1313 parser = OptionParser()
1314 parser.add_option('-r', "--reboot", action='store_true', dest='reboot', default=False,
1315 help='reboot mode (use shutdown -r)')
1316 parser.add_option('-s', "--soft", action='store_true', dest='soft', default=False,
1317 help='soft mode for reboot (terminates processes)')
1318 parser.add_option('-t', "--testbox", action='store_true', dest='testbox', default=False,
1319 help='add test box')
1320 parser.add_option('-b', "--build", action='store_true', dest='builds', default=False,
1321 help='add build boxes')
1322 parser.add_option('-p', "--plc", action='store_true', dest='plcs', default=False,
1323 help='add plc boxes')
1324 parser.add_option('-q', "--qemu", action='store_true', dest='qemus', default=False,
1325 help='add qemu boxes')
1326 parser.add_option('-a', "--all", action='store_true', dest='all', default=False,
1327 help='address all known boxes, like -b -t -p -q')
1328 parser.add_option('-v', "--verbose", action='store_true', dest='verbose', default=False,
1329 help='verbose mode')
1330 parser.add_option('-n', "--dry_run", action='store_true', dest='dry_run', default=False,
1331 help='dry run mode')
1332 (self.options, args) = parser.parse_args()
1335 if self.options.testbox: boxes += [self.test_box]
1336 if self.options.builds: boxes += self.build_boxes
1337 if self.options.plcs: boxes += self.plc_boxes
1338 if self.options.qemus: boxes += self.qemu_boxes
1339 if self.options.all: boxes += self.all_boxes
1342 verbose = self.options.verbose
1343 # default scope is -b -p -q -t
1345 boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box]
1347 self.normalize(boxes)
1349 if self.options.reboot: