properly reports vserver building processes
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72
73 ####################
74 # the place were other test instances tell about their not-yet-started
75 # instances, that go undetected through sensing
76 class Starting:
77
78     location='/root/starting'
79     def __init__ (self):
80         self.tuples=[]
81
82     def load (self):
83         try:    self.tuples=[line.strip().split('@') 
84                              for line in file(Starting.location).readlines()]
85         except: self.tuples=[]
86
87     def vnames (self) : 
88         self.load()
89         return [ x for (x,_) in self.tuples ]
90
91     def add (self, vname, bname):
92         if not vname in self.vnames():
93             file(Starting.location,'a').write("%s@%s\n"%(vname,bname))
94             
95     def delete_vname (self, vname):
96         self.load()
97         if vname in self.vnames():
98             f=file(Starting.location,'w')
99             for (v,b) in self.tuples: 
100                 if v != vname: f.write("%s@%s\n"%(v,b))
101             f.close()
102     
103 ####################
104 # pool class
105 # allows to pick an available IP among a pool
106 # input is expressed as a list of tuples (hostname,ip,user_data)
107 # that can be searched iteratively for a free slot
108 # e.g.
109 # pool = [ (hostname1,user_data1),  
110 #          (hostname2,user_data2),  
111 #          (hostname3,user_data2),  
112 #          (hostname4,user_data4) ]
113 # assuming that ip1 and ip3 are taken (pingable), then we'd get
114 # pool=Pool(pool)
115 # pool.next_free() -> entry2
116 # pool.next_free() -> entry4
117 # pool.next_free() -> None
118 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
119
120 class PoolItem:
121     def __init__ (self,hostname,userdata):
122         self.hostname=hostname
123         self.userdata=userdata
124         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
125         # 'mine' is for our own stuff, 'starting' from the concurrent tests
126         self.status=None
127         self.ip=None
128
129     def line(self):
130         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
131
132     def char (self):
133         if   self.status==None:       return '?'
134         elif self.status=='busy':     return '+'
135         elif self.status=='free':     return '-'
136         elif self.status=='mine':     return 'M'
137         elif self.status=='starting': return 'S'
138
139     def get_ip(self):
140         if self.ip: return self.ip
141         ip=socket.gethostbyname(self.hostname)
142         self.ip=ip
143         return ip
144
145 class Pool:
146
147     def __init__ (self, tuples,message, substrate):
148         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
149         self.message=message
150         # where to send notifications upon load_starting
151         self.substrate=substrate
152
153     def list (self):
154         for i in self.pool_items: print i.line()
155
156     def line (self):
157         line=self.message
158         for i in self.pool_items: line += ' ' + i.char()
159         return line
160
161     def _item (self, hostname):
162         for i in self.pool_items: 
163             if i.hostname==hostname: return i
164         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
165
166     def retrieve_userdata (self, hostname): 
167         return self._item(hostname).userdata
168
169     def get_ip (self, hostname):
170         try:    return self._item(hostname).get_ip()
171         except: return socket.gethostbyname(hostname)
172         
173     def set_mine (self, hostname):
174         try:
175             self._item(hostname).status='mine'
176         except:
177             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
178
179     def next_free (self):
180         for i in self.pool_items:
181             if i.status == 'free':
182                 i.status='mine'
183                 return (i.hostname,i.userdata)
184         return None
185
186     ####################
187     # we have a starting instance of our own
188     def add_starting (self, vname, bname):
189         Starting().add(vname,bname)
190         for i in self.pool_items:
191             if i.hostname==vname: i.status='mine'
192
193     # load the starting instances from the common file
194     # remember that might be ours
195     # return the list of (vname,bname) that are not ours
196     def load_starting (self):
197         starting=Starting()
198         starting.load()
199         new_tuples=[]
200         for (v,b) in starting.tuples:
201             for i in self.pool_items:
202                 if i.hostname==v and i.status=='free':
203                     i.status='starting'
204                     new_tuples.append( (v,b,) )
205         return new_tuples
206
207     def release_my_starting (self):
208         for i in self.pool_items:
209             if i.status=='mine':
210                 Starting().delete_vname (i.hostname)
211                 i.status=None
212
213
214     ##########
215     def _sense (self):
216         for item in self.pool_items:
217             if item.status is not None: 
218                 print item.char(),
219                 continue
220             if self.check_ping (item.hostname): 
221                 item.status='busy'
222                 print '*',
223             else:
224                 item.status='free'
225                 print '.',
226     
227     def sense (self):
228         print 'Sensing IP pool',self.message,
229         self._sense()
230         print 'Done'
231         for (vname,bname) in self.load_starting():
232             self.substrate.add_starting_dummy (bname, vname)
233         print 'After starting: IP pool'
234         print self.line()
235     # OS-dependent ping option (support for macos, for convenience)
236     ping_timeout_option = None
237     # returns True when a given hostname/ip responds to ping
238     def check_ping (self,hostname):
239         if not Pool.ping_timeout_option:
240             (status,osname) = commands.getstatusoutput("uname -s")
241             if status != 0:
242                 raise Exception, "TestPool: Cannot figure your OS name"
243             if osname == "Linux":
244                 Pool.ping_timeout_option="-w"
245             elif osname == "Darwin":
246                 Pool.ping_timeout_option="-t"
247
248         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
249         (status,output) = commands.getstatusoutput(command)
250         return status == 0
251
252 ####################
253 class Box:
254     def __init__ (self,hostname):
255         self.hostname=hostname
256         self._probed=None
257     def shortname (self):
258         return short_hostname(self.hostname)
259     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
260     def reboot (self, options):
261         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
262                             dry_run=options.dry_run)
263
264     def uptime(self):
265         if hasattr(self,'_uptime') and self._uptime: return self._uptime
266         return '*undef* uptime'
267     def sense_uptime (self):
268         command=['uptime']
269         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
270         if not self._uptime: self._uptime='unreachable'
271
272     def run(self,argv,message=None,trash_err=False,dry_run=False):
273         if dry_run:
274             print 'DRY_RUN:',
275             print " ".join(argv)
276             return 0
277         else:
278             header(message)
279             if not trash_err:
280                 return subprocess.call(argv)
281             else:
282                 return subprocess.call(argv,stderr=file('/dev/null','w'))
283                 
284     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
285         ssh_argv = self.test_ssh().actual_argv(argv)
286         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
287         if result!=0:
288             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
289         return result
290
291     def backquote (self, argv, trash_err=False):
292         # print 'running backquote',argv
293         if not trash_err:
294             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
295         else:
296             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
297         return result
298
299     def probe (self):
300         if self._probed is not None: return self._probed
301         # first probe the ssh link
302         probe_argv=self.test_ssh().actual_argv(['hostname'])
303         self._probed=self.backquote ( probe_argv, trash_err=True )
304         if not self._probed: print "root@%s unreachable"%self.hostname
305         return self._probed
306
307     # use argv=['bash','-c',"the command line"]
308     # if you have any shell-expanded arguments like *
309     # and if there's any chance the command is adressed to the local host
310     def backquote_ssh (self, argv, trash_err=False):
311         if not self.probe(): return ''
312         return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
313
314 ############################################################
315 class BuildInstance:
316     def __init__ (self, buildname, pid, buildbox):
317         self.buildname=buildname
318         self.buildbox=buildbox
319         self.pids=[pid]
320
321     def add_pid(self,pid):
322         self.pids.append(pid)
323
324     def line (self):
325         return "== %s == (pids=%r)"%(self.buildname,self.pids)
326
327 class BuildBox (Box):
328     def __init__ (self,hostname):
329         Box.__init__(self,hostname)
330         self.build_instances=[]
331
332     def add_build (self,buildname,pid):
333         for build in self.build_instances:
334             if build.buildname==buildname: 
335                 build.add_pid(pid)
336                 return
337         self.build_instances.append(BuildInstance(buildname, pid, self))
338
339     def list(self):
340         if not self.build_instances: 
341             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
342         else:
343             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
344             for b in self.build_instances: 
345                 header (b.line(),banner=False)
346
347     def reboot (self, options):
348         if not options.soft:
349             self.reboot(options)
350         else:
351             command=['pkill','vbuild']
352             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
353
354     # inspect box and find currently running builds
355     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
356     matcher_building_vm=re.compile("\s*(?P<pid>[0-9]+).*init-vserver.*-i\s+eth.\s+(?P<buildname>[^\s]+)\s*\Z")
357     def sense(self, options):
358         print 'b',
359         self.sense_uptime()
360         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
361         if not pids: return
362         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
363         ps_lines=self.backquote_ssh (command).split('\n')
364         for line in ps_lines:
365             if not line.strip() or line.find('PID')>=0: continue
366             m=BuildBox.matcher.match(line)
367             if m: 
368                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
369                 buildname=m.group('buildname').replace('@DATE@',date)
370                 self.add_build (buildname,m.group('pid'))
371                 continue
372             m=BuildBox.matcher_building_vm.match(line)
373             if m: 
374                 # buildname is expansed here
375                 self.add_build (buildname,m.group('pid'))
376                 continue
377             header('BuildBox.sense: command %r returned line that failed to match'%command)
378             header(">>%s<<"%line)
379
380 ############################################################
381 class PlcInstance:
382     def __init__ (self, vservername, ctxid, plcbox):
383         self.vservername=vservername
384         self.ctxid=ctxid
385         self.plc_box=plcbox
386         # unknown yet
387         self.timestamp=0
388
389     def set_timestamp (self,timestamp): self.timestamp=timestamp
390     def set_now (self): self.timestamp=int(time.time())
391     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
392
393     def vplcname (self):
394         return self.vservername.split('-')[-1]
395     def buildname (self):
396         return self.vservername.rsplit('-',2)[0]
397
398     def line (self):
399         msg="== %s =="%(self.vplcname())
400         msg += " [=%s]"%self.vservername
401         if self.ctxid==0:  msg+=" not (yet?) running"
402         else:              msg+=" (ctx=%s)"%self.ctxid     
403         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
404         else:              msg += " *unknown timestamp*"
405         return msg
406
407     def kill (self):
408         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
409         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
410         self.plc_box.forget(self)
411
412 class PlcBox (Box):
413     def __init__ (self, hostname, max_plcs):
414         Box.__init__(self,hostname)
415         self.plc_instances=[]
416         self.max_plcs=max_plcs
417
418     def add_vserver (self,vservername,ctxid):
419         for plc in self.plc_instances:
420             if plc.vservername==vservername: 
421                 header("WARNING, duplicate myplc %s running on %s"%\
422                            (vservername,self.hostname),banner=False)
423                 return
424         self.plc_instances.append(PlcInstance(vservername,ctxid,self))
425     
426     def forget (self, plc_instance):
427         self.plc_instances.remove(plc_instance)
428
429     # fill one slot even though this one is not started yet
430     def add_dummy (self, plcname):
431         dummy=PlcInstance('dummy_'+plcname,0,self)
432         dummy.set_now()
433         self.plc_instances.append(dummy)
434
435     def line(self): 
436         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_plcs,self.free_spots(),self.uname())
437         return msg
438         
439     def list(self):
440         if not self.plc_instances: 
441             header ('No vserver running on %s'%(self.line()))
442         else:
443             header ("Active plc VMs on %s"%self.line())
444             self.plc_instances.sort(timestamp_sort)
445             for p in self.plc_instances: 
446                 header (p.line(),banner=False)
447
448     def free_spots (self):
449         return self.max_plcs - len(self.plc_instances)
450
451     def uname(self):
452         if hasattr(self,'_uname') and self._uname: return self._uname
453         return '*undef* uname'
454
455     def plc_instance_by_vservername (self, vservername):
456         for p in self.plc_instances:
457             if p.vservername==vservername: return p
458         return None
459
460     def reboot (self, options):
461         if not options.soft:
462             self.reboot(options)
463         else:
464             self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
465                          dry_run=options.dry_run)
466
467     def sense (self, options):
468         print 'p',
469         self._uname=self.backquote_ssh(['uname','-r']).strip()
470         # try to find fullname (vserver_stat truncates to a ridiculously short name)
471         # fetch the contexts for all vservers on that box
472         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
473         context_map=self.backquote_ssh (map_command)
474         # at this point we have a set of lines like
475         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
476         ctx_dict={}
477         for map_line in context_map.split("\n"):
478             if not map_line: continue
479             [path,xid] = map_line.split(':')
480             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
481         # at this point ctx_id maps context id to vservername
482
483         command=['vserver-stat']
484         vserver_stat = self.backquote_ssh (command)
485         for vserver_line in vserver_stat.split("\n"):
486             if not vserver_line: continue
487             context=vserver_line.split()[0]
488             if context=="CTX": continue
489             try:
490                 longname=ctx_dict[context]
491                 self.add_vserver(longname,context)
492             except:
493                 print 'WARNING: found ctx %s in vserver_stat but was unable to figure a corresp. vserver'%context
494
495         # scan timestamps 
496         running_vsnames = [ i.vservername for i in self.plc_instances ]
497         command=   ['grep','.']
498         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
499         command += ['/dev/null']
500         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
501         for ts_line in ts_lines:
502             if not ts_line.strip(): continue
503             # expect /vservers/<vservername>.timestamp:<timestamp>
504             try:
505                 (ts_file,timestamp)=ts_line.split(':')
506                 ts_file=os.path.basename(ts_file)
507                 (vservername,_)=os.path.splitext(ts_file)
508                 timestamp=int(timestamp)
509                 p=self.plc_instance_by_vservername(vservername)
510                 if not p: 
511                     print 'WARNING zombie plc',self.hostname,ts_line
512                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
513                     continue
514                 p.set_timestamp(timestamp)
515             except:  print 'WARNING, could not parse ts line',ts_line
516         
517
518
519
520 ############################################################
521 class QemuInstance: 
522     def __init__ (self, nodename, pid, qemubox):
523         self.nodename=nodename
524         self.pid=pid
525         self.qemu_box=qemubox
526         # not known yet
527         self.buildname=None
528         self.timestamp=0
529         
530     def set_buildname (self,buildname): self.buildname=buildname
531     def set_timestamp (self,timestamp): self.timestamp=timestamp
532     def set_now (self): self.timestamp=int(time.time())
533     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
534     
535     def line (self):
536         msg = "== %s =="%(short_hostname(self.nodename))
537         msg += " [=%s]"%self.buildname
538         if self.pid:       msg += " (pid=%s)"%self.pid
539         else:              msg += " not (yet?) running"
540         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
541         else:              msg += " *unknown timestamp*"
542         return msg
543     
544     def kill(self):
545         if self.pid==0: 
546             print "cannot kill qemu %s with pid==0"%self.nodename
547             return
548         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
549         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
550         self.qemu_box.forget(self)
551
552
553 class QemuBox (Box):
554     def __init__ (self, hostname, max_qemus):
555         Box.__init__(self,hostname)
556         self.qemu_instances=[]
557         self.max_qemus=max_qemus
558
559     def add_node (self,nodename,pid):
560         for qemu in self.qemu_instances:
561             if qemu.nodename==nodename: 
562                 header("WARNING, duplicate qemu %s running on %s"%\
563                            (nodename,self.hostname), banner=False)
564                 return
565         self.qemu_instances.append(QemuInstance(nodename,pid,self))
566
567     def forget (self, qemu_instance):
568         self.qemu_instances.remove(qemu_instance)
569
570     # fill one slot even though this one is not started yet
571     def add_dummy (self, nodename):
572         dummy=QemuInstance('dummy_'+nodename,0,self)
573         dummy.set_now()
574         self.qemu_instances.append(dummy)
575
576     def line (self):
577         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_spots(),self.driver())
578         return msg
579
580     def list(self):
581         if not self.qemu_instances: 
582             header ('No qemu process on %s'%(self.line()))
583         else:
584             header ("Active qemu processes on %s"%(self.line()))
585             self.qemu_instances.sort(timestamp_sort)
586             for q in self.qemu_instances: 
587                 header (q.line(),banner=False)
588
589     def free_spots (self):
590         return self.max_qemus - len(self.qemu_instances)
591
592     def driver(self):
593         if hasattr(self,'_driver') and self._driver: return self._driver
594         return '*undef* driver'
595
596     def qemu_instance_by_pid (self,pid):
597         for q in self.qemu_instances:
598             if q.pid==pid: return q
599         return None
600
601     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
602         for q in self.qemu_instances:
603             if q.nodename==nodename and q.buildname==buildname:
604                 return q
605         return None
606
607     def reboot (self, options):
608         if not options.soft:
609             self.reboot(options)
610         else:
611             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
612                          dry_run=options.dry_run)
613
614     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
615     def sense(self, options):
616         print 'q',
617         modules=self.backquote_ssh(['lsmod']).split('\n')
618         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
619         for module in modules:
620             if module.find('kqemu')==0:
621                 self._driver='kqemu module loaded'
622             # kvm might be loaded without vkm_intel (we dont have AMD)
623             elif module.find('kvm_intel')==0:
624                 self._driver='kvm_intel module loaded'
625         ########## find out running pids
626         pids=self.backquote_ssh(['pgrep','qemu'])
627         if not pids: return
628         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
629         ps_lines = self.backquote_ssh (command).split("\n")
630         for line in ps_lines:
631             if not line.strip() or line.find('PID') >=0 : continue
632             m=QemuBox.matcher.match(line)
633             if m: 
634                 self.add_node (m.group('nodename'),m.group('pid'))
635                 continue
636             header('QemuBox.sense: command %r returned line that failed to match'%command)
637             header(">>%s<<"%line)
638         ########## retrieve alive instances and map to build
639         live_builds=[]
640         command=['grep','.','*/*/qemu.pid','/dev/null']
641         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
642         for pid_line in pid_lines:
643             if not pid_line.strip(): continue
644             # expect <build>/<nodename>/qemu.pid:<pid>pid
645             try:
646                 (buildname,nodename,tail)=pid_line.split('/')
647                 (_,pid)=tail.split(':')
648                 q=self.qemu_instance_by_pid (pid)
649                 if not q: continue
650                 q.set_buildname(buildname)
651                 live_builds.append(buildname)
652             except: print 'WARNING, could not parse pid line',pid_line
653         # retrieve timestamps
654         if not live_builds: return
655         command=   ['grep','.']
656         command += ['%s/*/timestamp'%b for b in live_builds]
657         command += ['/dev/null']
658         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
659         for ts_line in ts_lines:
660             if not ts_line.strip(): continue
661             # expect <build>/<nodename>/timestamp:<timestamp>
662             try:
663                 (buildname,nodename,tail)=ts_line.split('/')
664                 nodename=nodename.replace('qemu-','')
665                 (_,timestamp)=tail.split(':')
666                 timestamp=int(timestamp)
667                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
668                 if not q: 
669                     print 'WARNING zombie qemu',self.hostname,ts_line
670                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
671                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
672                     continue
673                 q.set_timestamp(timestamp)
674             except:  print 'WARNING, could not parse ts line',ts_line
675
676 ####################
677 class TestInstance:
678     def __init__ (self, buildname, pid=0):
679         self.pids=[]
680         if pid!=0: self.pid.append(pid)
681         self.buildname=buildname
682         # latest trace line
683         self.trace=''
684         # has a KO test
685         self.broken_steps=[]
686         self.timestamp = 0
687
688     def set_timestamp (self,timestamp): self.timestamp=timestamp
689     def set_now (self): self.timestamp=int(time.time())
690     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
691
692
693     def add_pid (self,pid):
694         self.pids.append(pid)
695     def set_broken (self,plcindex, step): 
696         self.broken_steps.append ( (plcindex, step,) )
697
698     def line (self):
699         double='=='
700         if self.pids: double='*'+double[1]
701         if self.broken_steps: double=double[0]+'B'
702         msg = " %s %s =="%(double,self.buildname)
703         if not self.pids:       pass
704         elif len(self.pids)==1: msg += " (pid=%s)"%self.pids[0]
705         else:                   msg += " !!!pids=%s!!!"%self.pids
706         msg += " @%s"%self.pretty_timestamp()
707         if self.broken_steps:
708             msg += "\n BROKEN IN STEPS"
709             for (i,s) in self.broken_steps: msg += " %s@%s"%(s,i)
710         return msg
711
712 class TestBox (Box):
713     def __init__ (self,hostname):
714         Box.__init__(self,hostname)
715         self.starting_ips=[]
716         self.test_instances=[]
717
718     def reboot (self, options):
719         # can't reboot a vserver VM
720         self.run_ssh (['pkill','run_log'],"Terminating current runs",
721                       dry_run=options.dry_run)
722         self.run_ssh (['rm','-f',Starting.location],"Cleaning %s"%Starting.location,
723                       dry_run=options.dry_run)
724
725     def get_test (self, buildname):
726         for i in self.test_instances:
727             if i.buildname==buildname: return i
728
729     # we scan ALL remaining test results, even the ones not running
730     def add_timestamp (self, buildname, timestamp):
731         i=self.get_test(buildname)
732         if i:   
733             i.set_timestamp(timestamp)
734         else:   
735             i=TestInstance(buildname,0)
736             i.set_timestamp(timestamp)
737             self.test_instances.append(i)
738
739     def add_running_test (self, pid, buildname):
740         i=self.get_test(buildname)
741         if not i:
742             self.test_instances.append (TestInstance (buildname,pid))
743             return
744         if i.pids:
745             print "WARNING: 2 concurrent tests run on same build %s"%buildname
746         i.add_pid (pid)
747
748     def add_broken (self, buildname, plcindex, step):
749         i=self.get_test(buildname)
750         if not i:
751             i=TestInstance(buildname)
752             self.test_instances.append(i)
753         i.set_broken(plcindex, step)
754
755     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
756     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
757     def sense (self, options):
758         print 't',
759         self.sense_uptime()
760         self.starting_ips=[x for x in self.backquote_ssh(['cat',Starting.location], trash_err=True).strip().split('\n') if x]
761
762         # scan timestamps on all tests
763         # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
764         # xxx would make sense above too
765         command=['bash','-c',"grep . /root/*/timestamp /dev/null"]
766         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
767         for ts_line in ts_lines:
768             if not ts_line.strip(): continue
769             # expect /root/<buildname>/timestamp:<timestamp>
770             try:
771                 (ts_file,timestamp)=ts_line.split(':')
772                 ts_file=os.path.dirname(ts_file)
773                 buildname=os.path.basename(ts_file)
774                 timestamp=int(timestamp)
775                 t=self.add_timestamp(buildname,timestamp)
776             except:  print 'WARNING, could not parse ts line',ts_line
777
778         command=['bash','-c',"grep KO /root/*/logs/trace* /dev/null" ]
779         trace_lines=self.backquote_ssh (command).split('\n')
780         for line in trace_lines:
781             if not line.strip(): continue
782             m=TestBox.matcher_grep.match(line)
783             if m: 
784                 buildname=m.group('buildname')
785                 plcindex=m.group('plcindex')
786                 step=m.group('step')
787                 self.add_broken(buildname,plcindex, step)
788                 continue
789             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
790             header(">>%s<<"%line)
791
792         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
793         if not pids: return
794         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
795         ps_lines=self.backquote_ssh (command).split('\n')
796         for line in ps_lines:
797             if not line.strip(): continue
798             m=TestBox.matcher_proc.match(line)
799             if m: 
800                 pid=m.group('pid')
801                 buildname=m.group('buildname')
802                 self.add_running_test(pid, buildname)
803                 continue
804             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
805             header(">>%s<<"%line)
806         
807         
808     def line (self):
809         return "%s (%s)"%(self.hostname,self.uptime())
810
811     def list (self):
812         if not self.test_instances:
813             header ("No known tests on %s"%self.line())
814         else:
815             header ("Known tests on %s"%self.line())
816             self.test_instances.sort(timestamp_sort)
817             for i in self.test_instances: print i.line()
818         if self.starting_ips:
819             header ("Starting IP addresses on %s"%self.line())
820             self.starting_ips.sort()
821             for starting in self.starting_ips: print starting
822
823 ############################################################
824 class Options: pass
825
826 class Substrate:
827
828     def __init__ (self):
829         self.options=Options()
830         self.options.dry_run=False
831         self.options.verbose=False
832         self.options.reboot=False
833         self.options.soft=False
834         self.test_box = TestBox (self.test_box_spec())
835         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
836         self.plc_boxes = [ PlcBox (h,m) for (h,m) in self.plc_boxes_spec ()]
837         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
838         self.default_boxes = self.plc_boxes + self.qemu_boxes
839         self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
840         self._sensed=False
841
842         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs",self)
843         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes",self)
844
845     def fqdn (self, hostname):
846         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
847         return hostname
848
849     # return True if actual sensing takes place
850     def sense (self,force=False):
851         if self._sensed and not force: return False
852         print 'Sensing local substrate...',
853         for b in self.default_boxes: b.sense(self.options)
854         print 'Done'
855         self._sensed=True
856         return True
857
858     def list (self):
859         for b in self.default_boxes:
860             b.list()
861
862     def add_dummy_plc (self, plc_boxname, plcname):
863         for pb in self.plc_boxes:
864             if pb.hostname==plc_boxname:
865                 pb.add_dummy(plcname)
866                 return True
867     def add_dummy_qemu (self, qemu_boxname, qemuname):
868         for qb in self.qemu_boxes:
869             if qb.hostname==qemu_boxname:
870                 qb.add_dummy(qemuname)
871                 return True
872
873     def add_starting_dummy (self, bname, vname):
874         return self.add_dummy_plc (bname, vname) or self.add_dummy_qemu (bname, vname)
875
876     ########## 
877     def provision (self,plcs,options):
878         try:
879             # attach each plc to a plc box and an IP address
880             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
881             # attach each node/qemu to a qemu box with an IP address
882             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
883             # update the SFA spec accordingly
884             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
885             self.list()
886             return plcs
887         except Exception, e:
888             print '* Could not provision this test on current substrate','--',e,'--','exiting'
889             traceback.print_exc()
890             sys.exit(1)
891
892     # it is expected that a couple of options like ips_bplc and ips_vplc 
893     # are set or unset together
894     @staticmethod
895     def check_options (x,y):
896         if not x and not y: return True
897         return len(x)==len(y)
898
899     # find an available plc box (or make space)
900     # and a free IP address (using options if present)
901     def provision_plc (self, plc, options):
902         
903         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
904
905         #### let's find an IP address for that plc
906         # look in options 
907         if options.ips_vplc:
908             # this is a rerun
909             # we don't check anything here, 
910             # it is the caller's responsability to cleanup and make sure this makes sense
911             plc_boxname = options.ips_bplc.pop()
912             vplc_hostname=options.ips_vplc.pop()
913         else:
914             if self.sense(): self.list()
915             plc_boxname=None
916             vplc_hostname=None
917             # try to find an available IP 
918             self.vplc_pool.sense()
919             couple=self.vplc_pool.next_free()
920             if couple:
921                 (vplc_hostname,unused)=couple
922             #### we need to find one plc box that still has a slot
923             max_free=0
924             # use the box that has max free spots for load balancing
925             for pb in self.plc_boxes:
926                 free=pb.free_spots()
927                 if free>max_free:
928                     plc_boxname=pb.hostname
929                     max_free=free
930             # if there's no available slot in the plc_boxes, or we need a free IP address
931             # make space by killing the oldest running instance
932             if not plc_boxname or not vplc_hostname:
933                 # find the oldest of all our instances
934                 all_plc_instances=reduce(lambda x, y: x+y, 
935                                          [ pb.plc_instances for pb in self.plc_boxes ],
936                                          [])
937                 all_plc_instances.sort(timestamp_sort)
938                 try:
939                     plc_instance_to_kill=all_plc_instances[0]
940                 except:
941                     msg=""
942                     if not plc_boxname: msg += " PLC boxes are full"
943                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
944                     raise Exception,"Could not make space for a PLC instance:"+msg
945                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
946                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
947                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
948                                                                   freed_plc_boxname)
949                 plc_instance_to_kill.kill()
950                 # use this new plcbox if that was the problem
951                 if not plc_boxname:
952                     plc_boxname=freed_plc_boxname
953                 # ditto for the IP address
954                 if not vplc_hostname:
955                     vplc_hostname=freed_vplc_hostname
956                     # record in pool as mine
957                     self.vplc_pool.set_mine(vplc_hostname)
958
959         # 
960         self.add_dummy_plc(plc_boxname,plc['name'])
961         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
962         self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
963
964         #### compute a helpful vserver name
965         # remove domain in hostname
966         vplc_short = short_hostname(vplc_hostname)
967         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
968         plc_name = "%s_%s"%(plc['name'],vplc_short)
969
970         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
971                           (plc['name'],plc_boxname,vplc_hostname,vservername))
972
973         #### apply in the plc_spec
974         # # informative
975         # label=options.personality.replace("linux","")
976         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
977                                    # 'name':'%s-'+label,
978                                    'name': plc_name,
979                                    'vservername':vservername,
980                                    'vserverip':vplc_ip,
981                                    'PLC_DB_HOST':vplc_hostname,
982                                    'PLC_API_HOST':vplc_hostname,
983                                    'PLC_BOOT_HOST':vplc_hostname,
984                                    'PLC_WWW_HOST':vplc_hostname,
985                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
986                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
987                                    } ) ]
988                   }
989
990
991         # mappers only work on a list of plcs
992         return TestMapper([plc],options).map(mapper)[0]
993
994     ##########
995     def provision_qemus (self, plc, options):
996
997         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
998
999         test_mapper = TestMapper ([plc], options)
1000         nodenames = test_mapper.node_names()
1001         maps=[]
1002         for nodename in nodenames:
1003
1004             if options.ips_vnode:
1005                 # as above, it's a rerun, take it for granted
1006                 qemu_boxname=options.ips_bnode.pop()
1007                 vnode_hostname=options.ips_vnode.pop()
1008             else:
1009                 if self.sense(): self.list()
1010                 qemu_boxname=None
1011                 vnode_hostname=None
1012                 # try to find an available IP 
1013                 self.vnode_pool.sense()
1014                 couple=self.vnode_pool.next_free()
1015                 if couple:
1016                     (vnode_hostname,unused)=couple
1017                 # find a physical box
1018                 max_free=0
1019                 # use the box that has max free spots for load balancing
1020                 for qb in self.qemu_boxes:
1021                     free=qb.free_spots()
1022                     if free>max_free:
1023                         qemu_boxname=qb.hostname
1024                         max_free=free
1025                 # if we miss the box or the IP, kill the oldest instance
1026                 if not qemu_boxname or not vnode_hostname:
1027                 # find the oldest of all our instances
1028                     all_qemu_instances=reduce(lambda x, y: x+y, 
1029                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
1030                                               [])
1031                     all_qemu_instances.sort(timestamp_sort)
1032                     try:
1033                         qemu_instance_to_kill=all_qemu_instances[0]
1034                     except:
1035                         msg=""
1036                         if not qemu_boxname: msg += " QEMU boxes are full"
1037                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
1038                         raise Exception,"Could not make space for a QEMU instance:"+msg
1039                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
1040                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
1041                     # kill it
1042                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
1043                                                                    freed_qemu_boxname)
1044                     qemu_instance_to_kill.kill()
1045                     # use these freed resources where needed
1046                     if not qemu_boxname:
1047                         qemu_boxname=freed_qemu_boxname
1048                     if not vnode_hostname:
1049                         vnode_hostname=freed_vnode_hostname
1050                         self.vnode_pool.set_mine(vnode_hostname)
1051
1052             self.add_dummy_qemu (qemu_boxname,vnode_hostname)
1053             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
1054             ip=self.vnode_pool.get_ip (vnode_hostname)
1055             self.vnode_pool.add_starting(vnode_hostname,qemu_boxname)
1056
1057             vnode_fqdn = self.fqdn(vnode_hostname)
1058             nodemap={'host_box':qemu_boxname,
1059                      'node_fields:hostname':vnode_fqdn,
1060                      'interface_fields:ip':ip, 
1061                      'interface_fields:mac':mac,
1062                      }
1063             nodemap.update(self.network_settings())
1064             maps.append ( (nodename, nodemap) )
1065
1066             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
1067                              (nodename,qemu_boxname,vnode_hostname,mac))
1068
1069         return test_mapper.map({'node':maps})[0]
1070
1071     def localize_sfa_rspec (self,plc,options):
1072        
1073         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
1074         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
1075         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
1076         plc['sfa']['SFA_DB_HOST'] = plc['PLC_DB_HOST']
1077         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
1078         return plc
1079
1080     #################### release:
1081     def release (self,options):
1082         self.vplc_pool.release_my_starting()
1083         self.vnode_pool.release_my_starting()
1084         pass
1085
1086     #################### show results for interactive mode
1087     def get_box (self,boxname):
1088         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1089             if b.shortname()==boxname:
1090                 return b
1091         print "Could not find box %s"%boxname
1092         return None
1093
1094     def list_boxes(self,box_or_names):
1095         print 'Sensing',
1096         for box in box_or_names:
1097             if not isinstance(box,Box): box=self.get_box(box)
1098             if not box: continue
1099             box.sense(self.options)
1100         print 'Done'
1101         for box in box_or_names:
1102             if not isinstance(box,Box): box=self.get_box(box)
1103             if not box: continue
1104             box.list()
1105
1106     def reboot_boxes(self,box_or_names):
1107         for box in box_or_names:
1108             if not isinstance(box,Box): box=self.get_box(box)
1109             if not box: continue
1110             box.reboot(self.options)
1111
1112     ####################
1113     # can be run as a utility to manage the local infrastructure
1114     def main (self):
1115         parser=OptionParser()
1116         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1117                            help='reboot mode (use shutdown -r)')
1118         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1119                            help='soft mode for reboot (vserver stop or kill qemus)')
1120         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1121                            help='add test box') 
1122         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1123                            help='add build boxes')
1124         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1125                            help='add plc boxes')
1126         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1127                            help='add qemu boxes') 
1128         parser.add_option ('-a',"--all",action='store_true',dest='all',default=False,
1129                            help='address all known  boxes, like -b -t -p -q')
1130         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1131                            help='verbose mode')
1132         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1133                            help='dry run mode')
1134         (self.options,args)=parser.parse_args()
1135
1136         boxes=args
1137         if self.options.testbox: boxes += [self.test_box]
1138         if self.options.builds: boxes += self.build_boxes
1139         if self.options.plcs: boxes += self.plc_boxes
1140         if self.options.qemus: boxes += self.qemu_boxes
1141         if self.options.all: boxes += self.all_boxes
1142         
1143         # default scope is -b -p -q
1144         if not boxes:
1145             boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes
1146
1147         if self.options.reboot: self.reboot_boxes (boxes)
1148         else:                   self.list_boxes (boxes)