scaffolding for lxc-based plcs
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72
73 ####################
74 # the place were other test instances tell about their not-yet-started
75 # instances, that go undetected through sensing
76 class Starting:
77
78     location='/root/starting'
79     def __init__ (self):
80         self.tuples=[]
81
82     def load (self):
83         try:    self.tuples=[line.strip().split('@') 
84                              for line in file(Starting.location).readlines()]
85         except: self.tuples=[]
86
87     def vnames (self) : 
88         self.load()
89         return [ x for (x,_) in self.tuples ]
90
91     def add (self, vname, bname):
92         if not vname in self.vnames():
93             file(Starting.location,'a').write("%s@%s\n"%(vname,bname))
94             
95     def delete_vname (self, vname):
96         self.load()
97         if vname in self.vnames():
98             f=file(Starting.location,'w')
99             for (v,b) in self.tuples: 
100                 if v != vname: f.write("%s@%s\n"%(v,b))
101             f.close()
102     
103 ####################
104 # pool class
105 # allows to pick an available IP among a pool
106 # input is expressed as a list of tuples (hostname,ip,user_data)
107 # that can be searched iteratively for a free slot
108 # e.g.
109 # pool = [ (hostname1,user_data1),  
110 #          (hostname2,user_data2),  
111 #          (hostname3,user_data2),  
112 #          (hostname4,user_data4) ]
113 # assuming that ip1 and ip3 are taken (pingable), then we'd get
114 # pool=Pool(pool)
115 # pool.next_free() -> entry2
116 # pool.next_free() -> entry4
117 # pool.next_free() -> None
118 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
119
120 class PoolItem:
121     def __init__ (self,hostname,userdata):
122         self.hostname=hostname
123         self.userdata=userdata
124         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
125         # 'mine' is for our own stuff, 'starting' from the concurrent tests
126         self.status=None
127         self.ip=None
128
129     def line(self):
130         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
131
132     def char (self):
133         if   self.status==None:       return '?'
134         elif self.status=='busy':     return '+'
135         elif self.status=='free':     return '-'
136         elif self.status=='mine':     return 'M'
137         elif self.status=='starting': return 'S'
138
139     def get_ip(self):
140         if self.ip: return self.ip
141         ip=socket.gethostbyname(self.hostname)
142         self.ip=ip
143         return ip
144
145 class Pool:
146
147     def __init__ (self, tuples,message, substrate):
148         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
149         self.message=message
150         # where to send notifications upon load_starting
151         self.substrate=substrate
152
153     def list (self):
154         for i in self.pool_items: print i.line()
155
156     def line (self):
157         line=self.message
158         for i in self.pool_items: line += ' ' + i.char()
159         return line
160
161     def _item (self, hostname):
162         for i in self.pool_items: 
163             if i.hostname==hostname: return i
164         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
165
166     def retrieve_userdata (self, hostname): 
167         return self._item(hostname).userdata
168
169     def get_ip (self, hostname):
170         try:    return self._item(hostname).get_ip()
171         except: return socket.gethostbyname(hostname)
172         
173     def set_mine (self, hostname):
174         try:
175             self._item(hostname).status='mine'
176         except:
177             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
178
179     def next_free (self):
180         for i in self.pool_items:
181             if i.status == 'free':
182                 i.status='mine'
183                 return (i.hostname,i.userdata)
184         return None
185
186     ####################
187     # we have a starting instance of our own
188     def add_starting (self, vname, bname):
189         Starting().add(vname,bname)
190         for i in self.pool_items:
191             if i.hostname==vname: i.status='mine'
192
193     # load the starting instances from the common file
194     # remember that might be ours
195     # return the list of (vname,bname) that are not ours
196     def load_starting (self):
197         starting=Starting()
198         starting.load()
199         new_tuples=[]
200         for (v,b) in starting.tuples:
201             for i in self.pool_items:
202                 if i.hostname==v and i.status=='free':
203                     i.status='starting'
204                     new_tuples.append( (v,b,) )
205         return new_tuples
206
207     def release_my_starting (self):
208         for i in self.pool_items:
209             if i.status=='mine':
210                 Starting().delete_vname (i.hostname)
211                 i.status=None
212
213
214     ##########
215     def _sense (self):
216         for item in self.pool_items:
217             if item.status is not None: 
218                 print item.char(),
219                 continue
220             if self.check_ping (item.hostname): 
221                 item.status='busy'
222                 print '*',
223             else:
224                 item.status='free'
225                 print '.',
226     
227     def sense (self):
228         print 'Sensing IP pool',self.message,
229         self._sense()
230         print 'Done'
231         for (vname,bname) in self.load_starting():
232             self.substrate.add_starting_dummy (bname, vname)
233         print 'After starting: IP pool'
234         print self.line()
235     # OS-dependent ping option (support for macos, for convenience)
236     ping_timeout_option = None
237     # returns True when a given hostname/ip responds to ping
238     def check_ping (self,hostname):
239         if not Pool.ping_timeout_option:
240             (status,osname) = commands.getstatusoutput("uname -s")
241             if status != 0:
242                 raise Exception, "TestPool: Cannot figure your OS name"
243             if osname == "Linux":
244                 Pool.ping_timeout_option="-w"
245             elif osname == "Darwin":
246                 Pool.ping_timeout_option="-t"
247
248         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
249         (status,output) = commands.getstatusoutput(command)
250         return status == 0
251
252 ####################
253 class Box:
254     def __init__ (self,hostname):
255         self.hostname=hostname
256         self._probed=None
257     def shortname (self):
258         return short_hostname(self.hostname)
259     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
260     def reboot (self, options):
261         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
262                             dry_run=options.dry_run)
263
264     def uptime(self):
265         if hasattr(self,'_uptime') and self._uptime: return self._uptime
266         return '*undef* uptime'
267     def sense_uptime (self):
268         command=['uptime']
269         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
270         if not self._uptime: self._uptime='unreachable'
271
272     def run(self,argv,message=None,trash_err=False,dry_run=False):
273         if dry_run:
274             print 'DRY_RUN:',
275             print " ".join(argv)
276             return 0
277         else:
278             header(message)
279             if not trash_err:
280                 return subprocess.call(argv)
281             else:
282                 return subprocess.call(argv,stderr=file('/dev/null','w'))
283                 
284     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
285         ssh_argv = self.test_ssh().actual_argv(argv)
286         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
287         if result!=0:
288             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
289         return result
290
291     def backquote (self, argv, trash_err=False):
292         # print 'running backquote',argv
293         if not trash_err:
294             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
295         else:
296             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
297         return result
298
299     def probe (self):
300         if self._probed is not None: return self._probed
301         # first probe the ssh link
302         probe_argv=self.test_ssh().actual_argv(['hostname'])
303         self._probed=self.backquote ( probe_argv, trash_err=True )
304         if not self._probed: print "root@%s unreachable"%self.hostname
305         return self._probed
306
307     # use argv=['bash','-c',"the command line"]
308     # if you have any shell-expanded arguments like *
309     # and if there's any chance the command is adressed to the local host
310     def backquote_ssh (self, argv, trash_err=False):
311         if not self.probe(): return ''
312         return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
313
314 ############################################################
315 class BuildInstance:
316     def __init__ (self, buildname, pid, buildbox):
317         self.buildname=buildname
318         self.buildbox=buildbox
319         self.pids=[pid]
320
321     def add_pid(self,pid):
322         self.pids.append(pid)
323
324     def line (self):
325         return "== %s == (pids=%r)"%(self.buildname,self.pids)
326
327 class BuildBox (Box):
328     def __init__ (self,hostname):
329         Box.__init__(self,hostname)
330         self.build_instances=[]
331
332     def add_build (self,buildname,pid):
333         for build in self.build_instances:
334             if build.buildname==buildname: 
335                 build.add_pid(pid)
336                 return
337         self.build_instances.append(BuildInstance(buildname, pid, self))
338
339     def list(self):
340         if not self.build_instances: 
341             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
342         else:
343             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
344             for b in self.build_instances: 
345                 header (b.line(),banner=False)
346
347     def reboot (self, options):
348         if not options.soft:
349             self.reboot(options)
350         else:
351             command=['pkill','vbuild']
352             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
353
354     # inspect box and find currently running builds
355     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
356     matcher_building_vm=re.compile("\s*(?P<pid>[0-9]+).*init-vserver.*-i\s+eth.\s+(?P<buildname>[^\s]+)\s*\Z")
357     def sense(self, options):
358         print 'b',
359         self.sense_uptime()
360         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
361         if not pids: return
362         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
363         ps_lines=self.backquote_ssh (command).split('\n')
364         for line in ps_lines:
365             if not line.strip() or line.find('PID')>=0: continue
366             m=BuildBox.matcher.match(line)
367             if m: 
368                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
369                 buildname=m.group('buildname').replace('@DATE@',date)
370                 self.add_build (buildname,m.group('pid'))
371                 continue
372             m=BuildBox.matcher_building_vm.match(line)
373             if m: 
374                 # buildname is expansed here
375                 self.add_build (buildname,m.group('pid'))
376                 continue
377             header('BuildBox.sense: command %r returned line that failed to match'%command)
378             header(">>%s<<"%line)
379
380 ############################################################
381 class PlcInstance:
382     def __init__ (self, plcbox):
383         self.plc_box=plcbox
384         # unknown yet
385         self.timestamp=0
386         
387     def set_timestamp (self,timestamp): self.timestamp=timestamp
388     def set_now (self): self.timestamp=int(time.time())
389     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
390
391 class PlcVsInstance (PlcInstance):
392     def __init__ (self, plcbox, vservername, ctxid):
393         PlcInstance.__init__(self,plcbox)
394         self.vservername=vservername
395         self.ctxid=ctxid
396
397     def vplcname (self):
398         return self.vservername.split('-')[-1]
399     def buildname (self):
400         return self.vservername.rsplit('-',2)[0]
401
402     def line (self):
403         msg="== %s =="%(self.vplcname())
404         msg += " [=%s]"%self.vservername
405         if self.ctxid==0:  msg+=" not (yet?) running"
406         else:              msg+=" (ctx=%s)"%self.ctxid     
407         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
408         else:              msg += " *unknown timestamp*"
409         return msg
410
411     def kill (self):
412         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
413         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
414         self.plc_box.forget(self)
415
416 class PlcLxcInstance (PlcInstance):
417     # does lxc have a context id of any kind ?
418     def __init__ (self, plcbox, lxcname):
419         PlcInstance.__init__(self, plcbox)
420         self.lxcname = lxcname
421
422     def kill (self):
423         print "TODO PlcLxcInstance.kill"
424
425     def line (self):
426         return "TODO PlcLxcInstance.line"
427
428 ##########
429 class PlcBox (Box):
430     def __init__ (self, hostname, max_plcs):
431         Box.__init__(self,hostname)
432         self.plc_instances=[]
433         self.max_plcs=max_plcs
434
435 class PlcVsBox (PlcBox):
436
437     def add_vserver (self,vservername,ctxid):
438         for plc in self.plc_instances:
439             if plc.vservername==vservername: 
440                 header("WARNING, duplicate myplc %s running on %s"%\
441                            (vservername,self.hostname),banner=False)
442                 return
443         self.plc_instances.append(PlcVsInstance(self,vservername,ctxid))
444     
445     def forget (self, plc_instance):
446         self.plc_instances.remove(plc_instance)
447
448     # fill one slot even though this one is not started yet
449     def add_dummy (self, plcname):
450         dummy=PlcVsInstance(self,'dummy_'+plcname,0)
451         dummy.set_now()
452         self.plc_instances.append(dummy)
453
454     def line(self): 
455         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_plcs,self.free_slots(),self.uname())
456         return msg
457         
458     def list(self):
459         if not self.plc_instances: 
460             header ('No vserver running on %s'%(self.line()))
461         else:
462             header ("Active plc VMs on %s"%self.line())
463             self.plc_instances.sort(timestamp_sort)
464             for p in self.plc_instances: 
465                 header (p.line(),banner=False)
466
467     def free_slots (self):
468         return self.max_plcs - len(self.plc_instances)
469
470     def uname(self):
471         if hasattr(self,'_uname') and self._uname: return self._uname
472         return '*undef* uname'
473
474     def plc_instance_by_vservername (self, vservername):
475         for p in self.plc_instances:
476             if p.vservername==vservername: return p
477         return None
478
479     def reboot (self, options):
480         if not options.soft:
481             self.reboot(options)
482         else:
483             self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
484                          dry_run=options.dry_run)
485
486     def sense (self, options):
487         print 'p',
488         self._uname=self.backquote_ssh(['uname','-r']).strip()
489         # try to find fullname (vserver_stat truncates to a ridiculously short name)
490         # fetch the contexts for all vservers on that box
491         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
492         context_map=self.backquote_ssh (map_command)
493         # at this point we have a set of lines like
494         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
495         ctx_dict={}
496         for map_line in context_map.split("\n"):
497             if not map_line: continue
498             [path,xid] = map_line.split(':')
499             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
500         # at this point ctx_id maps context id to vservername
501
502         command=['vserver-stat']
503         vserver_stat = self.backquote_ssh (command)
504         for vserver_line in vserver_stat.split("\n"):
505             if not vserver_line: continue
506             context=vserver_line.split()[0]
507             if context=="CTX": continue
508             try:
509                 longname=ctx_dict[context]
510                 self.add_vserver(longname,context)
511             except:
512                 print 'WARNING: found ctx %s in vserver_stat but was unable to figure a corresp. vserver'%context
513
514         # scan timestamps 
515         running_vsnames = [ i.vservername for i in self.plc_instances ]
516         command=   ['grep','.']
517         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
518         command += ['/dev/null']
519         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
520         for ts_line in ts_lines:
521             if not ts_line.strip(): continue
522             # expect /vservers/<vservername>.timestamp:<timestamp>
523             try:
524                 (ts_file,timestamp)=ts_line.split(':')
525                 ts_file=os.path.basename(ts_file)
526                 (vservername,_)=os.path.splitext(ts_file)
527                 timestamp=int(timestamp)
528                 p=self.plc_instance_by_vservername(vservername)
529                 if not p: 
530                     print 'WARNING zombie plc',self.hostname,ts_line
531                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
532                     continue
533                 p.set_timestamp(timestamp)
534             except:  print 'WARNING, could not parse ts line',ts_line
535         
536
537 class PlcLxcBox (PlcBox):
538
539     def add_dummy (self, plcname):
540         print "TODO PlcLxcBox.add_dummy"
541
542     def free_slots (self):
543         print "TODO PlcLxcBox.free_slots"
544
545     def list (self):
546         print "TODO PlcLxcBox.list"
547
548     def reboot (self, options):
549         print "TODO PlcLxcBox.reboot"
550
551     def sense (self, options):
552         print "TODO PlcLxcBox.sense"
553
554
555 ############################################################
556 class QemuInstance: 
557     def __init__ (self, nodename, pid, qemubox):
558         self.nodename=nodename
559         self.pid=pid
560         self.qemu_box=qemubox
561         # not known yet
562         self.buildname=None
563         self.timestamp=0
564         
565     def set_buildname (self,buildname): self.buildname=buildname
566     def set_timestamp (self,timestamp): self.timestamp=timestamp
567     def set_now (self): self.timestamp=int(time.time())
568     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
569     
570     def line (self):
571         msg = "== %s =="%(short_hostname(self.nodename))
572         msg += " [=%s]"%self.buildname
573         if self.pid:       msg += " (pid=%s)"%self.pid
574         else:              msg += " not (yet?) running"
575         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
576         else:              msg += " *unknown timestamp*"
577         return msg
578     
579     def kill(self):
580         if self.pid==0: 
581             print "cannot kill qemu %s with pid==0"%self.nodename
582             return
583         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
584         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
585         self.qemu_box.forget(self)
586
587
588 class QemuBox (Box):
589     def __init__ (self, hostname, max_qemus):
590         Box.__init__(self,hostname)
591         self.qemu_instances=[]
592         self.max_qemus=max_qemus
593
594     def add_node (self,nodename,pid):
595         for qemu in self.qemu_instances:
596             if qemu.nodename==nodename: 
597                 header("WARNING, duplicate qemu %s running on %s"%\
598                            (nodename,self.hostname), banner=False)
599                 return
600         self.qemu_instances.append(QemuInstance(nodename,pid,self))
601
602     def forget (self, qemu_instance):
603         self.qemu_instances.remove(qemu_instance)
604
605     # fill one slot even though this one is not started yet
606     def add_dummy (self, nodename):
607         dummy=QemuInstance('dummy_'+nodename,0,self)
608         dummy.set_now()
609         self.qemu_instances.append(dummy)
610
611     def line (self):
612         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_slots(),self.driver())
613         return msg
614
615     def list(self):
616         if not self.qemu_instances: 
617             header ('No qemu process on %s'%(self.line()))
618         else:
619             header ("Active qemu processes on %s"%(self.line()))
620             self.qemu_instances.sort(timestamp_sort)
621             for q in self.qemu_instances: 
622                 header (q.line(),banner=False)
623
624     def free_slots (self):
625         return self.max_qemus - len(self.qemu_instances)
626
627     def driver(self):
628         if hasattr(self,'_driver') and self._driver: return self._driver
629         return '*undef* driver'
630
631     def qemu_instance_by_pid (self,pid):
632         for q in self.qemu_instances:
633             if q.pid==pid: return q
634         return None
635
636     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
637         for q in self.qemu_instances:
638             if q.nodename==nodename and q.buildname==buildname:
639                 return q
640         return None
641
642     def reboot (self, options):
643         if not options.soft:
644             self.reboot(options)
645         else:
646             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
647                          dry_run=options.dry_run)
648
649     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
650     def sense(self, options):
651         print 'q',
652         modules=self.backquote_ssh(['lsmod']).split('\n')
653         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
654         for module in modules:
655             if module.find('kqemu')==0:
656                 self._driver='kqemu module loaded'
657             # kvm might be loaded without vkm_intel (we dont have AMD)
658             elif module.find('kvm_intel')==0:
659                 self._driver='kvm_intel module loaded'
660         ########## find out running pids
661         pids=self.backquote_ssh(['pgrep','qemu'])
662         if not pids: return
663         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
664         ps_lines = self.backquote_ssh (command).split("\n")
665         for line in ps_lines:
666             if not line.strip() or line.find('PID') >=0 : continue
667             m=QemuBox.matcher.match(line)
668             if m: 
669                 self.add_node (m.group('nodename'),m.group('pid'))
670                 continue
671             header('QemuBox.sense: command %r returned line that failed to match'%command)
672             header(">>%s<<"%line)
673         ########## retrieve alive instances and map to build
674         live_builds=[]
675         command=['grep','.','*/*/qemu.pid','/dev/null']
676         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
677         for pid_line in pid_lines:
678             if not pid_line.strip(): continue
679             # expect <build>/<nodename>/qemu.pid:<pid>pid
680             try:
681                 (buildname,nodename,tail)=pid_line.split('/')
682                 (_,pid)=tail.split(':')
683                 q=self.qemu_instance_by_pid (pid)
684                 if not q: continue
685                 q.set_buildname(buildname)
686                 live_builds.append(buildname)
687             except: print 'WARNING, could not parse pid line',pid_line
688         # retrieve timestamps
689         if not live_builds: return
690         command=   ['grep','.']
691         command += ['%s/*/timestamp'%b for b in live_builds]
692         command += ['/dev/null']
693         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
694         for ts_line in ts_lines:
695             if not ts_line.strip(): continue
696             # expect <build>/<nodename>/timestamp:<timestamp>
697             try:
698                 (buildname,nodename,tail)=ts_line.split('/')
699                 nodename=nodename.replace('qemu-','')
700                 (_,timestamp)=tail.split(':')
701                 timestamp=int(timestamp)
702                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
703                 if not q: 
704                     print 'WARNING zombie qemu',self.hostname,ts_line
705                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
706                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
707                     continue
708                 q.set_timestamp(timestamp)
709             except:  print 'WARNING, could not parse ts line',ts_line
710
711 ####################
712 class TestInstance:
713     def __init__ (self, buildname, pid=0):
714         self.pids=[]
715         if pid!=0: self.pid.append(pid)
716         self.buildname=buildname
717         # latest trace line
718         self.trace=''
719         # has a KO test
720         self.broken_steps=[]
721         self.timestamp = 0
722
723     def set_timestamp (self,timestamp): self.timestamp=timestamp
724     def set_now (self): self.timestamp=int(time.time())
725     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
726
727
728     def add_pid (self,pid):
729         self.pids.append(pid)
730     def set_broken (self, plcindex, step): 
731         self.broken_steps.append ( (plcindex, step,) )
732
733     def line (self):
734         double='=='
735         if self.pids: double='*'+double[1]
736         if self.broken_steps: double=double[0]+'B'
737         msg = " %s %s =="%(double,self.buildname)
738         if not self.pids:       pass
739         elif len(self.pids)==1: msg += " (pid=%s)"%self.pids[0]
740         else:                   msg += " !!!pids=%s!!!"%self.pids
741         msg += " @%s"%self.pretty_timestamp()
742         if self.broken_steps:
743             msg += " [BROKEN=" + " ".join( [ "%s@%s"%(s,i) for (i,s) in self.broken_steps ] ) + "]"
744         return msg
745
746 class TestBox (Box):
747     def __init__ (self,hostname):
748         Box.__init__(self,hostname)
749         self.starting_ips=[]
750         self.test_instances=[]
751
752     def reboot (self, options):
753         # can't reboot a vserver VM
754         self.run_ssh (['pkill','run_log'],"Terminating current runs",
755                       dry_run=options.dry_run)
756         self.run_ssh (['rm','-f',Starting.location],"Cleaning %s"%Starting.location,
757                       dry_run=options.dry_run)
758
759     def get_test (self, buildname):
760         for i in self.test_instances:
761             if i.buildname==buildname: return i
762
763     # we scan ALL remaining test results, even the ones not running
764     def add_timestamp (self, buildname, timestamp):
765         i=self.get_test(buildname)
766         if i:   
767             i.set_timestamp(timestamp)
768         else:   
769             i=TestInstance(buildname,0)
770             i.set_timestamp(timestamp)
771             self.test_instances.append(i)
772
773     def add_running_test (self, pid, buildname):
774         i=self.get_test(buildname)
775         if not i:
776             self.test_instances.append (TestInstance (buildname,pid))
777             return
778         if i.pids:
779             print "WARNING: 2 concurrent tests run on same build %s"%buildname
780         i.add_pid (pid)
781
782     def add_broken (self, buildname, plcindex, step):
783         i=self.get_test(buildname)
784         if not i:
785             i=TestInstance(buildname)
786             self.test_instances.append(i)
787         i.set_broken(plcindex, step)
788
789     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
790     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
791     def sense (self, options):
792         print 't',
793         self.sense_uptime()
794         self.starting_ips=[x for x in self.backquote_ssh(['cat',Starting.location], trash_err=True).strip().split('\n') if x]
795
796         # scan timestamps on all tests
797         # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
798         # xxx would make sense above too
799         command=['bash','-c',"grep . /root/*/timestamp /dev/null"]
800         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
801         for ts_line in ts_lines:
802             if not ts_line.strip(): continue
803             # expect /root/<buildname>/timestamp:<timestamp>
804             try:
805                 (ts_file,timestamp)=ts_line.split(':')
806                 ts_file=os.path.dirname(ts_file)
807                 buildname=os.path.basename(ts_file)
808                 timestamp=int(timestamp)
809                 t=self.add_timestamp(buildname,timestamp)
810             except:  print 'WARNING, could not parse ts line',ts_line
811
812         command=['bash','-c',"grep KO /root/*/logs/trace-* /dev/null" ]
813         trace_lines=self.backquote_ssh (command).split('\n')
814         for line in trace_lines:
815             if not line.strip(): continue
816             m=TestBox.matcher_grep.match(line)
817             if m: 
818                 buildname=m.group('buildname')
819                 plcindex=m.group('plcindex')
820                 step=m.group('step')
821                 self.add_broken(buildname,plcindex, step)
822                 continue
823             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
824             header(">>%s<<"%line)
825
826         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
827         if not pids: return
828         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
829         ps_lines=self.backquote_ssh (command).split('\n')
830         for line in ps_lines:
831             if not line.strip(): continue
832             m=TestBox.matcher_proc.match(line)
833             if m: 
834                 pid=m.group('pid')
835                 buildname=m.group('buildname')
836                 self.add_running_test(pid, buildname)
837                 continue
838             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
839             header(">>%s<<"%line)
840         
841         
842     def line (self):
843         return "%s (%s)"%(self.hostname,self.uptime())
844
845     def list (self):
846         if not self.test_instances:
847             header ("No known tests on %s"%self.line())
848         else:
849             header ("Known tests on %s"%self.line())
850             self.test_instances.sort(timestamp_sort)
851             for i in self.test_instances: print i.line()
852         if self.starting_ips:
853             header ("Starting IP addresses on %s"%self.line())
854             self.starting_ips.sort()
855             for starting in self.starting_ips: print starting
856
857 ############################################################
858 class Options: pass
859
860 class Substrate:
861
862     def __init__ (self, use_plc_vs_boxes=True, use_plc_lxc_boxes=False):
863         self.options=Options()
864         self.options.dry_run=False
865         self.options.verbose=False
866         self.options.reboot=False
867         self.options.soft=False
868         self.test_box = TestBox (self.test_box_spec())
869         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
870         self.plc_vs_boxes = [ PlcVsBox (h,m) for (h,m) in self.plc_vs_boxes_spec ()]
871         self.plc_lxc_boxes = [ PlcLxcBox (h,m) for (h,m) in self.plc_lxc_boxes_spec ()]
872         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
873         self._sensed=False
874
875         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs",self)
876         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes",self)
877         
878         self.rescope (use_plc_vs_boxes, use_plc_lxc_boxes)
879
880     def rescope(self, plcs_on_vs, plcs_on_lxc):
881         self.plc_boxes=[]
882         if plcs_on_vs: self.plc_boxes += self.plc_vs_boxes
883         if plcs_on_lxc: self.plc_boxes += self.plc_lxc_boxes
884         self.default_boxes = self.plc_boxes + self.qemu_boxes
885         self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
886
887     def fqdn (self, hostname):
888         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
889         return hostname
890
891     # return True if actual sensing takes place
892     def sense (self,force=False):
893         if self._sensed and not force: return False
894         print 'Sensing local substrate...',
895         for b in self.default_boxes: b.sense(self.options)
896         print 'Done'
897         self._sensed=True
898         return True
899
900     def list (self):
901         for b in self.default_boxes:
902             b.list()
903
904     def add_dummy_plc (self, plc_boxname, plcname):
905         for pb in self.plc_boxes:
906             if pb.hostname==plc_boxname:
907                 pb.add_dummy(plcname)
908                 return True
909     def add_dummy_qemu (self, qemu_boxname, qemuname):
910         for qb in self.qemu_boxes:
911             if qb.hostname==qemu_boxname:
912                 qb.add_dummy(qemuname)
913                 return True
914
915     def add_starting_dummy (self, bname, vname):
916         return self.add_dummy_plc (bname, vname) or self.add_dummy_qemu (bname, vname)
917
918     ########## 
919     def provision (self,plcs,options):
920         try:
921             # attach each plc to a plc box and an IP address
922             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
923             # attach each node/qemu to a qemu box with an IP address
924             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
925             # update the SFA spec accordingly
926             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
927             self.list()
928             return plcs
929         except Exception, e:
930             print '* Could not provision this test on current substrate','--',e,'--','exiting'
931             traceback.print_exc()
932             sys.exit(1)
933
934     # it is expected that a couple of options like ips_bplc and ips_vplc 
935     # are set or unset together
936     @staticmethod
937     def check_options (x,y):
938         if not x and not y: return True
939         return len(x)==len(y)
940
941     # find an available plc box (or make space)
942     # and a free IP address (using options if present)
943     def provision_plc (self, plc, options):
944         
945         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
946
947         #### let's find an IP address for that plc
948         # look in options 
949         if options.ips_vplc:
950             # this is a rerun
951             # we don't check anything here, 
952             # it is the caller's responsability to cleanup and make sure this makes sense
953             plc_boxname = options.ips_bplc.pop()
954             vplc_hostname=options.ips_vplc.pop()
955         else:
956             if self.sense(): self.list()
957             plc_boxname=None
958             vplc_hostname=None
959             # try to find an available IP 
960             self.vplc_pool.sense()
961             couple=self.vplc_pool.next_free()
962             if couple:
963                 (vplc_hostname,unused)=couple
964             #### we need to find one plc box that still has a slot
965             max_free=0
966             # use the box that has max free spots for load balancing
967             for pb in self.plc_boxes:
968                 free=pb.free_slots()
969                 if free>max_free:
970                     plc_boxname=pb.hostname
971                     max_free=free
972             # if there's no available slot in the plc_boxes, or we need a free IP address
973             # make space by killing the oldest running instance
974             if not plc_boxname or not vplc_hostname:
975                 # find the oldest of all our instances
976                 all_plc_instances=reduce(lambda x, y: x+y, 
977                                          [ pb.plc_instances for pb in self.plc_boxes ],
978                                          [])
979                 all_plc_instances.sort(timestamp_sort)
980                 try:
981                     plc_instance_to_kill=all_plc_instances[0]
982                 except:
983                     msg=""
984                     if not plc_boxname: msg += " PLC boxes are full"
985                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
986                     raise Exception,"Could not make space for a PLC instance:"+msg
987                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
988                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
989                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
990                                                                   freed_plc_boxname)
991                 plc_instance_to_kill.kill()
992                 # use this new plcbox if that was the problem
993                 if not plc_boxname:
994                     plc_boxname=freed_plc_boxname
995                 # ditto for the IP address
996                 if not vplc_hostname:
997                     vplc_hostname=freed_vplc_hostname
998                     # record in pool as mine
999                     self.vplc_pool.set_mine(vplc_hostname)
1000
1001         # 
1002         self.add_dummy_plc(plc_boxname,plc['name'])
1003         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
1004         self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
1005
1006         #### compute a helpful vserver name
1007         # remove domain in hostname
1008         vplc_short = short_hostname(vplc_hostname)
1009         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
1010         plc_name = "%s_%s"%(plc['name'],vplc_short)
1011
1012         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
1013                           (plc['name'],plc_boxname,vplc_hostname,vservername))
1014
1015         #### apply in the plc_spec
1016         # # informative
1017         # label=options.personality.replace("linux","")
1018         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
1019                                    # 'name':'%s-'+label,
1020                                    'name': plc_name,
1021                                    'vservername':vservername,
1022                                    'vserverip':vplc_ip,
1023                                    'PLC_DB_HOST':vplc_hostname,
1024                                    'PLC_API_HOST':vplc_hostname,
1025                                    'PLC_BOOT_HOST':vplc_hostname,
1026                                    'PLC_WWW_HOST':vplc_hostname,
1027                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
1028                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
1029                                    } ) ]
1030                   }
1031
1032
1033         # mappers only work on a list of plcs
1034         return TestMapper([plc],options).map(mapper)[0]
1035
1036     ##########
1037     def provision_qemus (self, plc, options):
1038
1039         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
1040
1041         test_mapper = TestMapper ([plc], options)
1042         nodenames = test_mapper.node_names()
1043         maps=[]
1044         for nodename in nodenames:
1045
1046             if options.ips_vnode:
1047                 # as above, it's a rerun, take it for granted
1048                 qemu_boxname=options.ips_bnode.pop()
1049                 vnode_hostname=options.ips_vnode.pop()
1050             else:
1051                 if self.sense(): self.list()
1052                 qemu_boxname=None
1053                 vnode_hostname=None
1054                 # try to find an available IP 
1055                 self.vnode_pool.sense()
1056                 couple=self.vnode_pool.next_free()
1057                 if couple:
1058                     (vnode_hostname,unused)=couple
1059                 # find a physical box
1060                 max_free=0
1061                 # use the box that has max free spots for load balancing
1062                 for qb in self.qemu_boxes:
1063                     free=qb.free_slots()
1064                     if free>max_free:
1065                         qemu_boxname=qb.hostname
1066                         max_free=free
1067                 # if we miss the box or the IP, kill the oldest instance
1068                 if not qemu_boxname or not vnode_hostname:
1069                 # find the oldest of all our instances
1070                     all_qemu_instances=reduce(lambda x, y: x+y, 
1071                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
1072                                               [])
1073                     all_qemu_instances.sort(timestamp_sort)
1074                     try:
1075                         qemu_instance_to_kill=all_qemu_instances[0]
1076                     except:
1077                         msg=""
1078                         if not qemu_boxname: msg += " QEMU boxes are full"
1079                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
1080                         raise Exception,"Could not make space for a QEMU instance:"+msg
1081                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
1082                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
1083                     # kill it
1084                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
1085                                                                    freed_qemu_boxname)
1086                     qemu_instance_to_kill.kill()
1087                     # use these freed resources where needed
1088                     if not qemu_boxname:
1089                         qemu_boxname=freed_qemu_boxname
1090                     if not vnode_hostname:
1091                         vnode_hostname=freed_vnode_hostname
1092                         self.vnode_pool.set_mine(vnode_hostname)
1093
1094             self.add_dummy_qemu (qemu_boxname,vnode_hostname)
1095             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
1096             ip=self.vnode_pool.get_ip (vnode_hostname)
1097             self.vnode_pool.add_starting(vnode_hostname,qemu_boxname)
1098
1099             vnode_fqdn = self.fqdn(vnode_hostname)
1100             nodemap={'host_box':qemu_boxname,
1101                      'node_fields:hostname':vnode_fqdn,
1102                      'interface_fields:ip':ip, 
1103                      'interface_fields:mac':mac,
1104                      }
1105             nodemap.update(self.network_settings())
1106             maps.append ( (nodename, nodemap) )
1107
1108             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
1109                              (nodename,qemu_boxname,vnode_hostname,mac))
1110
1111         return test_mapper.map({'node':maps})[0]
1112
1113     def localize_sfa_rspec (self,plc,options):
1114        
1115         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
1116         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
1117         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
1118         plc['sfa']['SFA_DB_HOST'] = plc['PLC_DB_HOST']
1119         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
1120         return plc
1121
1122     #################### release:
1123     def release (self,options):
1124         self.vplc_pool.release_my_starting()
1125         self.vnode_pool.release_my_starting()
1126         pass
1127
1128     #################### show results for interactive mode
1129     def get_box (self,boxname):
1130         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1131             if b.shortname()==boxname:
1132                 return b
1133         print "Could not find box %s"%boxname
1134         return None
1135
1136     def list_boxes(self,box_or_names):
1137         print 'Sensing',
1138         for box in box_or_names:
1139             if not isinstance(box,Box): box=self.get_box(box)
1140             if not box: continue
1141             box.sense(self.options)
1142         print 'Done'
1143         for box in box_or_names:
1144             if not isinstance(box,Box): box=self.get_box(box)
1145             if not box: continue
1146             box.list()
1147
1148     def reboot_boxes(self,box_or_names):
1149         for box in box_or_names:
1150             if not isinstance(box,Box): box=self.get_box(box)
1151             if not box: continue
1152             box.reboot(self.options)
1153
1154     ####################
1155     # can be run as a utility to manage the local infrastructure
1156     def main (self):
1157         parser=OptionParser()
1158         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1159                            help='reboot mode (use shutdown -r)')
1160         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1161                            help='soft mode for reboot (vserver stop or kill qemus)')
1162         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1163                            help='add test box') 
1164         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1165                            help='add build boxes')
1166         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1167                            help='add plc boxes')
1168         parser.add_option ('-X', "--lxc",action='store_true',dest='plcs_use_lxc',
1169                            help='use lxc-enabled plc boxes instead of vs-enabled ones')
1170         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1171                            help='add qemu boxes') 
1172         parser.add_option ('-a',"--all",action='store_true',dest='all',default=False,
1173                            help='address all known  boxes, like -b -t -p -q')
1174         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1175                            help='verbose mode')
1176         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1177                            help='dry run mode')
1178         (self.options,args)=parser.parse_args()
1179
1180         if self.options.plcs_use_lxc:
1181             self.rescope (plcs_on_vs=False, plcs_on_lxc=True)
1182
1183         boxes=args
1184         if self.options.testbox: boxes += [self.test_box]
1185         if self.options.builds: boxes += self.build_boxes
1186         if self.options.plcs: boxes += self.plc_boxes
1187         if self.options.qemus: boxes += self.qemu_boxes
1188         if self.options.all: boxes += self.all_boxes
1189         
1190         # default scope is -b -p -q
1191         if not boxes:
1192             boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes
1193
1194         if self.options.reboot: self.reboot_boxes (boxes)
1195         else:                   self.list_boxes (boxes)