.
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72 ####################
73 # pool class
74 # allows to pick an available IP among a pool
75 # input is expressed as a list of tuples (hostname,ip,user_data)
76 # that can be searched iteratively for a free slot
77 # e.g.
78 # pool = [ (hostname1,user_data1),  
79 #          (hostname2,user_data2),  
80 #          (hostname3,user_data2),  
81 #          (hostname4,user_data4) ]
82 # assuming that ip1 and ip3 are taken (pingable), then we'd get
83 # pool=Pool(pool)
84 # pool.next_free() -> entry2
85 # pool.next_free() -> entry4
86 # pool.next_free() -> None
87 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
88
89 class PoolItem:
90     def __init__ (self,hostname,userdata):
91         self.hostname=hostname
92         self.userdata=userdata
93         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
94         # 'mine' is for our own stuff, 'starting' from the concurrent tests
95         self.status=None
96         self.ip=None
97
98     def line(self):
99         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
100
101     def char (self):
102         if   self.status==None:       return '?'
103         elif self.status=='busy':     return '+'
104         elif self.status=='free':     return '-'
105         elif self.status=='mine':     return 'M'
106         elif self.status=='starting': return 'S'
107
108     def get_ip(self):
109         if self.ip: return self.ip
110         ip=socket.gethostbyname(self.hostname)
111         self.ip=ip
112         return ip
113
114 class Pool:
115
116     def __init__ (self, tuples,message):
117         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
118         self.message=message
119
120     def list (self):
121         for i in self.pool_items: print i.line()
122
123     def line (self):
124         line=self.message
125         for i in self.pool_items: line += ' ' + i.char()
126         return line
127
128     def _item (self, hostname):
129         for i in self.pool_items: 
130             if i.hostname==hostname: return i
131         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
132
133     def retrieve_userdata (self, hostname): 
134         return self._item(hostname).userdata
135
136     def get_ip (self, hostname):
137         try:    return self._item(hostname).get_ip()
138         except: return socket.gethostbyname(hostname)
139         
140     def set_mine (self, hostname):
141         try:
142             self._item(hostname).status='mine'
143         except:
144             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
145
146     def next_free (self):
147         for i in self.pool_items:
148             if i.status == 'free':
149                 i.status='mine'
150                 return (i.hostname,i.userdata)
151         return None
152
153     # the place were other test instances tell about their not-yet-started
154     # instances, that go undetected through sensing
155     starting='/root/starting'
156     def add_starting (self, name):
157         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
158         except: items=[]
159         if not name in items:
160             file(Pool.starting,'a').write(name+'\n')
161         for i in self.pool_items:
162             if i.hostname==name: i.status='mine'
163             
164     # we load this after actual sensing; 
165     def load_starting (self):
166         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
167         except: items=[]
168         for i in self.pool_items:
169             if i.hostname in items:
170                 if i.status=='free' : i.status='starting'
171
172     def release_my_starting (self):
173         for i in self.pool_items:
174             if i.status=='mine': 
175                 self.del_starting(i.hostname)
176                 i.status=None
177
178     def del_starting (self, name):
179         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
180         except: items=[]
181         if name in items:
182             f=file(Pool.starting,'w')
183             for item in items: 
184                 if item != name: f.write(item+'\n')
185             f.close()
186     
187     ##########
188     def _sense (self):
189         for item in self.pool_items:
190             if item.status is not None: 
191                 print item.char(),
192                 continue
193             if self.check_ping (item.hostname): 
194                 item.status='busy'
195                 print '*',
196             else:
197                 item.status='free'
198                 print '.',
199     
200     def sense (self):
201         print 'Sensing IP pool',self.message,
202         self._sense()
203         print 'Done'
204         self.load_starting()
205         print 'After starting: IP pool'
206         print self.line()
207     # OS-dependent ping option (support for macos, for convenience)
208     ping_timeout_option = None
209     # returns True when a given hostname/ip responds to ping
210     def check_ping (self,hostname):
211         if not Pool.ping_timeout_option:
212             (status,osname) = commands.getstatusoutput("uname -s")
213             if status != 0:
214                 raise Exception, "TestPool: Cannot figure your OS name"
215             if osname == "Linux":
216                 Pool.ping_timeout_option="-w"
217             elif osname == "Darwin":
218                 Pool.ping_timeout_option="-t"
219
220         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
221         (status,output) = commands.getstatusoutput(command)
222         return status == 0
223
224 ####################
225 class Box:
226     def __init__ (self,hostname):
227         self.hostname=hostname
228         self._probed=None
229     def shortname (self):
230         return short_hostname(self.hostname)
231     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
232     def reboot (self, options):
233         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
234                             dry_run=options.dry_run)
235
236     def uptime(self):
237         if hasattr(self,'_uptime') and self._uptime: return self._uptime
238         return '*undef* uptime'
239     def sense_uptime (self):
240         command=['uptime']
241         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
242         if not self._uptime: self._uptime='unreachable'
243
244     def run(self,argv,message=None,trash_err=False,dry_run=False):
245         if dry_run:
246             print 'DRY_RUN:',
247             print " ".join(argv)
248             return 0
249         else:
250             header(message)
251             if not trash_err:
252                 return subprocess.call(argv)
253             else:
254                 return subprocess.call(argv,stderr=file('/dev/null','w'))
255                 
256     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
257         ssh_argv = self.test_ssh().actual_argv(argv)
258         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
259         if result!=0:
260             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
261         return result
262
263     def backquote (self, argv, trash_err=False):
264         # print 'running backquote',argv
265         if not trash_err:
266             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
267         else:
268             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
269         return result
270
271     def probe (self):
272         if self._probed is not None: return self._probed
273         # first probe the ssh link
274         probe_argv=self.test_ssh().actual_argv(['hostname'])
275         self._probed=self.backquote ( probe_argv, trash_err=True )
276         if not self._probed: print "root@%s unreachable"%self.hostname
277         return self._probed
278
279     # use argv=['bash','-c',"the command line"]
280     # if you have any shell-expanded arguments like *
281     # and if there's any chance the command is adressed to the local host
282     def backquote_ssh (self, argv, trash_err=False):
283         if not self.probe(): return ''
284         return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
285
286 ############################################################
287 class BuildInstance:
288     def __init__ (self, buildname, pid, buildbox):
289         self.buildname=buildname
290         self.buildbox=buildbox
291         self.pids=[pid]
292
293     def add_pid(self,pid):
294         self.pids.append(pid)
295
296     def line (self):
297         return "== %s == (pids=%r)"%(self.buildname,self.pids)
298
299 class BuildBox (Box):
300     def __init__ (self,hostname):
301         Box.__init__(self,hostname)
302         self.build_instances=[]
303
304     def add_build (self,buildname,pid):
305         for build in self.build_instances:
306             if build.buildname==buildname: 
307                 build.add_pid(pid)
308                 return
309         self.build_instances.append(BuildInstance(buildname, pid, self))
310
311     def list(self):
312         if not self.build_instances: 
313             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
314         else:
315             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
316             for b in self.build_instances: 
317                 header (b.line(),banner=False)
318
319     def reboot (self, options):
320         if not options.soft:
321             self.reboot(options)
322         else:
323             command=['pkill','vbuild']
324             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
325
326     # inspect box and find currently running builds
327     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
328     def sense(self, options):
329         print 'b',
330         self.sense_uptime()
331         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
332         if not pids: return
333         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
334         ps_lines=self.backquote_ssh (command).split('\n')
335         for line in ps_lines:
336             if not line.strip() or line.find('PID')>=0: continue
337             m=BuildBox.matcher.match(line)
338             if m: 
339                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
340                 buildname=m.group('buildname').replace('@DATE@',date)
341                 self.add_build (buildname,m.group('pid'))
342             else: header('command %r returned line that failed to match'%command)
343
344 ############################################################
345 class PlcInstance:
346     def __init__ (self, vservername, ctxid, plcbox):
347         self.vservername=vservername
348         self.ctxid=ctxid
349         self.plc_box=plcbox
350         # unknown yet
351         self.timestamp=0
352
353     def set_timestamp (self,timestamp): self.timestamp=timestamp
354     def set_now (self): self.timestamp=int(time.time())
355     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
356
357     def vplcname (self):
358         return self.vservername.split('-')[-1]
359     def buildname (self):
360         return self.vservername.rsplit('-',2)[0]
361
362     def line (self):
363         msg="== %s =="%(self.vplcname())
364         msg += " [=%s]"%self.vservername
365         if self.ctxid==0:  msg+=" not (yet?) running"
366         else:              msg+=" (ctx=%s)"%self.ctxid     
367         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
368         else:              msg += " *unknown timestamp*"
369         return msg
370
371     def kill (self):
372         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
373         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
374         self.plc_box.forget(self)
375
376 class PlcBox (Box):
377     def __init__ (self, hostname, max_plcs):
378         Box.__init__(self,hostname)
379         self.plc_instances=[]
380         self.max_plcs=max_plcs
381
382     def add_vserver (self,vservername,ctxid):
383         for plc in self.plc_instances:
384             if plc.vservername==vservername: 
385                 header("WARNING, duplicate myplc %s running on %s"%\
386                            (vservername,self.hostname),banner=False)
387                 return
388         self.plc_instances.append(PlcInstance(vservername,ctxid,self))
389     
390     def forget (self, plc_instance):
391         self.plc_instances.remove(plc_instance)
392
393     # fill one slot even though this one is not started yet
394     def add_dummy (self, plcname):
395         dummy=PlcInstance('dummy_'+plcname,0,self)
396         dummy.set_now()
397         self.plc_instances.append(dummy)
398
399     def line(self): 
400         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_plcs,self.free_spots(),self.uname())
401         return msg
402         
403     def list(self):
404         if not self.plc_instances: 
405             header ('No vserver running on %s'%(self.line()))
406         else:
407             header ("Active plc VMs on %s"%self.line())
408             self.plc_instances.sort(timestamp_sort)
409             for p in self.plc_instances: 
410                 header (p.line(),banner=False)
411
412     def free_spots (self):
413         return self.max_plcs - len(self.plc_instances)
414
415     def uname(self):
416         if hasattr(self,'_uname') and self._uname: return self._uname
417         return '*undef* uname'
418
419     def plc_instance_by_vservername (self, vservername):
420         for p in self.plc_instances:
421             if p.vservername==vservername: return p
422         return None
423
424     def reboot (self, options):
425         if not options.soft:
426             self.reboot(options)
427         else:
428             self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
429                          dry_run=options.dry_run)
430
431     def sense (self, options):
432         print 'p',
433         self._uname=self.backquote_ssh(['uname','-r']).strip()
434         # try to find fullname (vserver_stat truncates to a ridiculously short name)
435         # fetch the contexts for all vservers on that box
436         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
437         context_map=self.backquote_ssh (map_command)
438         # at this point we have a set of lines like
439         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
440         ctx_dict={}
441         for map_line in context_map.split("\n"):
442             if not map_line: continue
443             [path,xid] = map_line.split(':')
444             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
445         # at this point ctx_id maps context id to vservername
446
447         command=['vserver-stat']
448         vserver_stat = self.backquote_ssh (command)
449         for vserver_line in vserver_stat.split("\n"):
450             if not vserver_line: continue
451             context=vserver_line.split()[0]
452             if context=="CTX": continue
453             try:
454                 longname=ctx_dict[context]
455                 self.add_vserver(longname,context)
456             except:
457                 print 'WARNING: found ctx %s in vserver_stat but was unable to figure a corresp. vserver'%context
458
459         # scan timestamps 
460         running_vsnames = [ i.vservername for i in self.plc_instances ]
461         command=   ['grep','.']
462         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
463         command += ['/dev/null']
464         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
465         for ts_line in ts_lines:
466             if not ts_line.strip(): continue
467             # expect /vservers/<vservername>.timestamp:<timestamp>
468             try:
469                 (ts_file,timestamp)=ts_line.split(':')
470                 ts_file=os.path.basename(ts_file)
471                 (vservername,_)=os.path.splitext(ts_file)
472                 timestamp=int(timestamp)
473                 p=self.plc_instance_by_vservername(vservername)
474                 if not p: 
475                     print 'WARNING zombie plc',self.hostname,ts_line
476                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
477                     continue
478                 p.set_timestamp(timestamp)
479             except:  print 'WARNING, could not parse ts line',ts_line
480         
481
482
483
484 ############################################################
485 class QemuInstance: 
486     def __init__ (self, nodename, pid, qemubox):
487         self.nodename=nodename
488         self.pid=pid
489         self.qemu_box=qemubox
490         # not known yet
491         self.buildname=None
492         self.timestamp=0
493         
494     def set_buildname (self,buildname): self.buildname=buildname
495     def set_timestamp (self,timestamp): self.timestamp=timestamp
496     def set_now (self): self.timestamp=int(time.time())
497     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
498     
499     def line (self):
500         msg = "== %s =="%(short_hostname(self.nodename))
501         msg += " [=%s]"%self.buildname
502         if self.pid:       msg += " (pid=%s)"%self.pid
503         else:              msg += " not (yet?) running"
504         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
505         else:              msg += " *unknown timestamp*"
506         return msg
507     
508     def kill(self):
509         if self.pid==0: 
510             print "cannot kill qemu %s with pid==0"%self.nodename
511             return
512         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
513         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
514         self.qemu_box.forget(self)
515
516
517 class QemuBox (Box):
518     def __init__ (self, hostname, max_qemus):
519         Box.__init__(self,hostname)
520         self.qemu_instances=[]
521         self.max_qemus=max_qemus
522
523     def add_node (self,nodename,pid):
524         for qemu in self.qemu_instances:
525             if qemu.nodename==nodename: 
526                 header("WARNING, duplicate qemu %s running on %s"%\
527                            (nodename,self.hostname), banner=False)
528                 return
529         self.qemu_instances.append(QemuInstance(nodename,pid,self))
530
531     def forget (self, qemu_instance):
532         self.qemu_instances.remove(qemu_instance)
533
534     # fill one slot even though this one is not started yet
535     def add_dummy (self, nodename):
536         dummy=QemuInstance('dummy_'+nodename,0,self)
537         dummy.set_now()
538         self.qemu_instances.append(dummy)
539
540     def line (self):
541         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_spots(),self.driver())
542         return msg
543
544     def list(self):
545         if not self.qemu_instances: 
546             header ('No qemu process on %s'%(self.line()))
547         else:
548             header ("Active qemu processes on %s"%(self.line()))
549             self.qemu_instances.sort(timestamp_sort)
550             for q in self.qemu_instances: 
551                 header (q.line(),banner=False)
552
553     def free_spots (self):
554         return self.max_qemus - len(self.qemu_instances)
555
556     def driver(self):
557         if hasattr(self,'_driver') and self._driver: return self._driver
558         return '*undef* driver'
559
560     def qemu_instance_by_pid (self,pid):
561         for q in self.qemu_instances:
562             if q.pid==pid: return q
563         return None
564
565     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
566         for q in self.qemu_instances:
567             if q.nodename==nodename and q.buildname==buildname:
568                 return q
569         return None
570
571     def reboot (self, options):
572         if not options.soft:
573             self.reboot(options)
574         else:
575             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
576                          dry_run=options.dry_run)
577
578     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
579     def sense(self, options):
580         print 'q',
581         modules=self.backquote_ssh(['lsmod']).split('\n')
582         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
583         for module in modules:
584             if module.find('kqemu')==0:
585                 self._driver='kqemu module loaded'
586             # kvm might be loaded without vkm_intel (we dont have AMD)
587             elif module.find('kvm_intel')==0:
588                 self._driver='kvm_intel module loaded'
589         ########## find out running pids
590         pids=self.backquote_ssh(['pgrep','qemu'])
591         if not pids: return
592         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
593         ps_lines = self.backquote_ssh (command).split("\n")
594         for line in ps_lines:
595             if not line.strip() or line.find('PID') >=0 : continue
596             m=QemuBox.matcher.match(line)
597             if m: self.add_node (m.group('nodename'),m.group('pid'))
598             else: header('command %r returned line that failed to match'%command)
599         ########## retrieve alive instances and map to build
600         live_builds=[]
601         command=['grep','.','*/*/qemu.pid','/dev/null']
602         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
603         for pid_line in pid_lines:
604             if not pid_line.strip(): continue
605             # expect <build>/<nodename>/qemu.pid:<pid>pid
606             try:
607                 (buildname,nodename,tail)=pid_line.split('/')
608                 (_,pid)=tail.split(':')
609                 q=self.qemu_instance_by_pid (pid)
610                 if not q: continue
611                 q.set_buildname(buildname)
612                 live_builds.append(buildname)
613             except: print 'WARNING, could not parse pid line',pid_line
614         # retrieve timestamps
615         if not live_builds: return
616         command=   ['grep','.']
617         command += ['%s/*/timestamp'%b for b in live_builds]
618         command += ['/dev/null']
619         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
620         for ts_line in ts_lines:
621             if not ts_line.strip(): continue
622             # expect <build>/<nodename>/timestamp:<timestamp>
623             try:
624                 (buildname,nodename,tail)=ts_line.split('/')
625                 nodename=nodename.replace('qemu-','')
626                 (_,timestamp)=tail.split(':')
627                 timestamp=int(timestamp)
628                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
629                 if not q: 
630                     print 'WARNING zombie qemu',self.hostname,ts_line
631                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
632                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
633                     continue
634                 q.set_timestamp(timestamp)
635             except:  print 'WARNING, could not parse ts line',ts_line
636
637 ####################
638 class TestInstance:
639     def __init__ (self, buildname, pid=0):
640         self.pids=[]
641         if pid!=0: self.pid.append(pid)
642         self.buildname=buildname
643         # latest trace line
644         self.trace=''
645         # has a KO test
646         self.broken_steps=[]
647         self.timestamp = 0
648
649     def set_timestamp (self,timestamp): self.timestamp=timestamp
650     def set_now (self): self.timestamp=int(time.time())
651     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
652
653
654     def add_pid (self,pid):
655         self.pids.append(pid)
656     def set_broken (self,plcindex, step): 
657         self.broken_steps.append ( (plcindex, step,) )
658
659     def line (self):
660         double='=='
661         if self.pids: double='*'+double[1]
662         if self.broken_steps: double=double[0]+'B'
663         msg = " %s %s =="%(double,self.buildname)
664         if not self.pids:       pass
665         elif len(self.pids)==1: msg += " (pid=%s)"%self.pids[0]
666         else:                   msg += " !!!pids=%s!!!"%self.pids
667         msg += " @%s"%self.pretty_timestamp()
668         if self.broken_steps:
669             msg += "\n BROKEN IN STEPS"
670             for (i,s) in self.broken_steps: msg += " %s@%s"%(s,i)
671         return msg
672
673 class TestBox (Box):
674     def __init__ (self,hostname):
675         Box.__init__(self,hostname)
676         self.starting_ips=[]
677         self.test_instances=[]
678
679     def reboot (self, options):
680         # can't reboot a vserver VM
681         self.run_ssh (['pkill','run_log'],"Terminating current runs",
682                       dry_run=options.dry_run)
683         self.run_ssh (['rm','-f',Pool.starting],"Cleaning %s"%Pool.starting,
684                       dry_run=options.dry_run)
685
686     def get_test (self, buildname):
687         for i in self.test_instances:
688             if i.buildname==buildname: return i
689
690     # we scan ALL remaining test results, even the ones not running
691     def add_timestamp (self, buildname, timestamp):
692         i=self.get_test(buildname)
693         if i:   
694             i.set_timestamp(timestamp)
695         else:   
696             i=TestInstance(buildname,0)
697             i.set_timestamp(timestamp)
698             self.test_instances.append(i)
699
700     def add_running_test (self, pid, buildname):
701         i=self.get_test(buildname)
702         if not i:
703             self.test_instances.append (TestInstance (buildname,pid))
704             return
705         if i.pids:
706             print "WARNING: 2 concurrent tests run on same build %s"%buildname
707         i.add_pid (pid)
708
709     def add_broken (self, buildname, plcindex, step):
710         i=self.get_test(buildname)
711         if not i:
712             i=TestInstance(buildname)
713             self.test_instances.append(i)
714         i.set_broken(plcindex, step)
715
716     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
717     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
718     def sense (self, options):
719         print 't',
720         self.sense_uptime()
721         self.starting_ips=[x for x in self.backquote_ssh(['cat',Pool.starting], trash_err=True).strip().split('\n') if x]
722
723         # scan timestamps on all tests
724         # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
725         # xxx would make sense above too
726         command=['bash','-c',"grep . /root/*/timestamp /dev/null"]
727         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
728         for ts_line in ts_lines:
729             if not ts_line.strip(): continue
730             # expect /root/<buildname>/timestamp:<timestamp>
731             try:
732                 (ts_file,timestamp)=ts_line.split(':')
733                 ts_file=os.path.dirname(ts_file)
734                 buildname=os.path.basename(ts_file)
735                 timestamp=int(timestamp)
736                 t=self.add_timestamp(buildname,timestamp)
737             except:  print 'WARNING, could not parse ts line',ts_line
738
739         command=['bash','-c',"grep KO /root/*/logs/trace* /dev/null" ]
740         trace_lines=self.backquote_ssh (command).split('\n')
741         for line in trace_lines:
742             if not line.strip(): continue
743             m=TestBox.matcher_grep.match(line)
744             if m: 
745                 buildname=m.group('buildname')
746                 plcindex=m.group('plcindex')
747                 step=m.group('step')
748                 self.add_broken(buildname,plcindex, step)
749             else: header("command %r returned line that failed to match\n%s"%(command,line))
750
751         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
752         if not pids: return
753         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
754         ps_lines=self.backquote_ssh (command).split('\n')
755         for line in ps_lines:
756             if not line.strip(): continue
757             m=TestBox.matcher_proc.match(line)
758             if m: 
759                 pid=m.group('pid')
760                 buildname=m.group('buildname')
761                 self.add_running_test(pid, buildname)
762             else: header("command %r returned line that failed to match\n%s"%(command,line))
763         
764         
765     def line (self):
766         return "%s (%s)"%(self.hostname,self.uptime())
767
768     def list (self):
769         if not self.test_instances:
770             header ("No known tests on %s"%self.line())
771         else:
772             header ("Known tests on %s"%self.line())
773             self.test_instances.sort(timestamp_sort)
774             for i in self.test_instances: print i.line()
775         if self.starting_ips:
776             header ("Starting IP addresses on %s"%self.line())
777             self.starting_ips.sort()
778             for starting in self.starting_ips: print starting
779
780 ############################################################
781 class Options: pass
782
783 class Substrate:
784
785     def __init__ (self):
786         self.options=Options()
787         self.options.dry_run=False
788         self.options.verbose=False
789         self.options.reboot=False
790         self.options.soft=False
791         self.test_box = TestBox (self.test_box_spec())
792         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
793         self.plc_boxes = [ PlcBox (h,m) for (h,m) in self.plc_boxes_spec ()]
794         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
795         self.default_boxes = self.plc_boxes + self.qemu_boxes
796         self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
797         self._sensed=False
798
799         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs")
800         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes")
801
802     def fqdn (self, hostname):
803         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
804         return hostname
805
806     # return True if actual sensing takes place
807     def sense (self,force=False):
808         if self._sensed and not force: return False
809         print 'Sensing local substrate...',
810         for b in self.default_boxes: b.sense(self.options)
811         print 'Done'
812         self._sensed=True
813         return True
814
815     def list (self):
816         for b in self.default_boxes:
817             b.list()
818
819     def add_dummy_plc (self, plc_boxname, plcname):
820         for pb in self.plc_boxes:
821             if pb.hostname==plc_boxname:
822                 pb.add_dummy(plcname)
823     def add_dummy_qemu (self, qemu_boxname, qemuname):
824         for qb in self.qemu_boxes:
825             if qb.hostname==qemu_boxname:
826                 qb.add_dummy(qemuname)
827
828     ########## 
829     def provision (self,plcs,options):
830         try:
831             # attach each plc to a plc box and an IP address
832             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
833             # attach each node/qemu to a qemu box with an IP address
834             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
835             # update the SFA spec accordingly
836             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
837             return plcs
838         except Exception, e:
839             print '* Could not provision this test on current substrate','--',e,'--','exiting'
840             traceback.print_exc()
841             sys.exit(1)
842
843     # it is expected that a couple of options like ips_bplc and ips_vplc 
844     # are set or unset together
845     @staticmethod
846     def check_options (x,y):
847         if not x and not y: return True
848         return len(x)==len(y)
849
850     # find an available plc box (or make space)
851     # and a free IP address (using options if present)
852     def provision_plc (self, plc, options):
853         
854         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
855
856         #### let's find an IP address for that plc
857         # look in options 
858         if options.ips_vplc:
859             # this is a rerun
860             # we don't check anything here, 
861             # it is the caller's responsability to cleanup and make sure this makes sense
862             plc_boxname = options.ips_bplc.pop()
863             vplc_hostname=options.ips_vplc.pop()
864         else:
865             if self.sense(): self.list()
866             plc_boxname=None
867             vplc_hostname=None
868             # try to find an available IP 
869             self.vplc_pool.sense()
870             couple=self.vplc_pool.next_free()
871             if couple:
872                 (vplc_hostname,unused)=couple
873             #### we need to find one plc box that still has a slot
874             max_free=0
875             # use the box that has max free spots for load balancing
876             for pb in self.plc_boxes:
877                 free=pb.free_spots()
878                 if free>max_free:
879                     plc_boxname=pb.hostname
880                     max_free=free
881             # if there's no available slot in the plc_boxes, or we need a free IP address
882             # make space by killing the oldest running instance
883             if not plc_boxname or not vplc_hostname:
884                 # find the oldest of all our instances
885                 all_plc_instances=reduce(lambda x, y: x+y, 
886                                          [ pb.plc_instances for pb in self.plc_boxes ],
887                                          [])
888                 all_plc_instances.sort(timestamp_sort)
889                 try:
890                     plc_instance_to_kill=all_plc_instances[0]
891                 except:
892                     msg=""
893                     if not plc_boxname: msg += " PLC boxes are full"
894                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
895                     raise Exception,"Could not make space for a PLC instance:"+msg
896                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
897                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
898                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
899                                                                   freed_plc_boxname)
900                 plc_instance_to_kill.kill()
901                 # use this new plcbox if that was the problem
902                 if not plc_boxname:
903                     plc_boxname=freed_plc_boxname
904                 # ditto for the IP address
905                 if not vplc_hostname:
906                     vplc_hostname=freed_vplc_hostname
907                     # record in pool as mine
908                     self.vplc_pool.set_mine(vplc_hostname)
909
910         # 
911         self.add_dummy_plc(plc_boxname,plc['name'])
912         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
913         self.vplc_pool.add_starting(vplc_hostname)
914
915         #### compute a helpful vserver name
916         # remove domain in hostname
917         vplc_short = short_hostname(vplc_hostname)
918         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
919         plc_name = "%s_%s"%(plc['name'],vplc_short)
920
921         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
922                           (plc['name'],plc_boxname,vplc_hostname,vservername))
923
924         #### apply in the plc_spec
925         # # informative
926         # label=options.personality.replace("linux","")
927         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
928                                    # 'name':'%s-'+label,
929                                    'name': plc_name,
930                                    'vservername':vservername,
931                                    'vserverip':vplc_ip,
932                                    'PLC_DB_HOST':vplc_hostname,
933                                    'PLC_API_HOST':vplc_hostname,
934                                    'PLC_BOOT_HOST':vplc_hostname,
935                                    'PLC_WWW_HOST':vplc_hostname,
936                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
937                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
938                                    } ) ]
939                   }
940
941
942         # mappers only work on a list of plcs
943         return TestMapper([plc],options).map(mapper)[0]
944
945     ##########
946     def provision_qemus (self, plc, options):
947
948         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
949
950         test_mapper = TestMapper ([plc], options)
951         nodenames = test_mapper.node_names()
952         maps=[]
953         for nodename in nodenames:
954
955             if options.ips_vnode:
956                 # as above, it's a rerun, take it for granted
957                 qemu_boxname=options.ips_bnode.pop()
958                 vnode_hostname=options.ips_vnode.pop()
959             else:
960                 if self.sense(): self.list()
961                 qemu_boxname=None
962                 vnode_hostname=None
963                 # try to find an available IP 
964                 self.vnode_pool.sense()
965                 couple=self.vnode_pool.next_free()
966                 if couple:
967                     (vnode_hostname,unused)=couple
968                 # find a physical box
969                 max_free=0
970                 # use the box that has max free spots for load balancing
971                 for qb in self.qemu_boxes:
972                     free=qb.free_spots()
973                     if free>max_free:
974                         qemu_boxname=qb.hostname
975                         max_free=free
976                 # if we miss the box or the IP, kill the oldest instance
977                 if not qemu_boxname or not vnode_hostname:
978                 # find the oldest of all our instances
979                     all_qemu_instances=reduce(lambda x, y: x+y, 
980                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
981                                               [])
982                     all_qemu_instances.sort(timestamp_sort)
983                     try:
984                         qemu_instance_to_kill=all_qemu_instances[0]
985                     except:
986                         msg=""
987                         if not qemu_boxname: msg += " QEMU boxes are full"
988                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
989                         raise Exception,"Could not make space for a QEMU instance:"+msg
990                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
991                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
992                     # kill it
993                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
994                                                                    freed_qemu_boxname)
995                     qemu_instance_to_kill.kill()
996                     # use these freed resources where needed
997                     if not qemu_boxname:
998                         qemu_boxname=freed_qemu_boxname
999                     if not vnode_hostname:
1000                         vnode_hostname=freed_vnode_hostname
1001                         self.vnode_pool.set_mine(vnode_hostname)
1002
1003             self.add_dummy_qemu (qemu_boxname,nodename)
1004             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
1005             ip=self.vnode_pool.get_ip (vnode_hostname)
1006             self.vnode_pool.add_starting(vnode_hostname)
1007
1008             vnode_fqdn = self.fqdn(vnode_hostname)
1009             nodemap={'host_box':qemu_boxname,
1010                      'node_fields:hostname':vnode_fqdn,
1011                      'interface_fields:ip':ip, 
1012                      'interface_fields:mac':mac,
1013                      }
1014             nodemap.update(self.network_settings())
1015             maps.append ( (nodename, nodemap) )
1016
1017             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
1018                              (nodename,qemu_boxname,vnode_hostname,mac))
1019
1020         return test_mapper.map({'node':maps})[0]
1021
1022     def localize_sfa_rspec (self,plc,options):
1023        
1024         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
1025         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
1026         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
1027         plc['sfa']['SFA_PLC_DB_HOST'] = plc['PLC_DB_HOST']
1028         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
1029         for site in plc['sites']:
1030             for node in site['nodes']:
1031                 plc['sfa']['sfa_slice_rspec']['part4'] = node['node_fields']['hostname']
1032         return plc
1033
1034     #################### release:
1035     def release (self,options):
1036         self.vplc_pool.release_my_starting()
1037         self.vnode_pool.release_my_starting()
1038         pass
1039
1040     #################### show results for interactive mode
1041     def get_box (self,boxname):
1042         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1043             if b.shortname()==boxname:
1044                 return b
1045         print "Could not find box %s"%boxname
1046         return None
1047
1048     def list_boxes(self,boxnames):
1049         print 'Sensing',
1050         for boxname in boxnames:
1051             b=self.get_box(boxname)
1052             if not b: continue
1053             b.sense(self.options)
1054         print 'Done'
1055         for boxname in boxnames:
1056             b=self.get_box(boxname)
1057             if not b: continue
1058             b.list()
1059
1060     def reboot_boxes(self,boxnames):
1061         for boxname in boxnames:
1062             b=self.get_box(boxname)
1063             if not b: continue
1064             b.reboot(self.options)
1065
1066     ####################
1067     # can be run as a utility to manage the local infrastructure
1068     def main (self):
1069         parser=OptionParser()
1070         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1071                            help='reboot mode (use shutdown -r)')
1072         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1073                            help='soft mode for reboot (vserver stop or kill qemus)')
1074         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1075                            help='add test box') 
1076         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1077                            help='add build boxes')
1078         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1079                            help='add plc boxes')
1080         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1081                            help='add qemu boxes') 
1082         parser.add_option ('-a',"--all",action='store_true',dest='all',default=False,
1083                            help='address all known  boxes, like -b -t -p -q')
1084         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1085                            help='verbose mode')
1086         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1087                            help='dry run mode')
1088         (self.options,args)=parser.parse_args()
1089
1090         boxes=args
1091         if self.options.testbox: boxes += [self.test_box.hostname]
1092         if self.options.builds: boxes += [b.hostname for b in self.build_boxes]
1093         if self.options.plcs: boxes += [b.hostname for b in self.plc_boxes]
1094         if self.options.qemus: boxes += [b.hostname for b in self.qemu_boxes]
1095         if self.options.all: boxes += [b.hostname for b in self.all_boxes]
1096         boxes=list(set(boxes))
1097         
1098         # default scope is -b -p -q
1099         if not boxes:
1100             boxes = [ b.hostname for b in \
1101                           self.build_boxes + self.plc_boxes + self.qemu_boxes ]
1102
1103         if self.options.reboot: self.reboot_boxes (boxes)
1104         else:                   self.list_boxes (boxes)