tweaks
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72 ####################
73 # pool class
74 # allows to pick an available IP among a pool
75 # input is expressed as a list of tuples (hostname,ip,user_data)
76 # that can be searched iteratively for a free slot
77 # e.g.
78 # pool = [ (hostname1,user_data1),  
79 #          (hostname2,user_data2),  
80 #          (hostname3,user_data2),  
81 #          (hostname4,user_data4) ]
82 # assuming that ip1 and ip3 are taken (pingable), then we'd get
83 # pool=Pool(pool)
84 # pool.next_free() -> entry2
85 # pool.next_free() -> entry4
86 # pool.next_free() -> None
87 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
88
89 class PoolItem:
90     def __init__ (self,hostname,userdata):
91         self.hostname=hostname
92         self.userdata=userdata
93         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
94         # 'mine' is for our own stuff, 'starting' from the concurrent tests
95         self.status=None
96         self.ip=None
97
98     def line(self):
99         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
100
101     def char (self):
102         if   self.status==None:       return '?'
103         elif self.status=='busy':     return '+'
104         elif self.status=='free':     return '-'
105         elif self.status=='mine':     return 'M'
106         elif self.status=='starting': return 'S'
107
108     def get_ip(self):
109         if self.ip: return self.ip
110         ip=socket.gethostbyname(self.hostname)
111         self.ip=ip
112         return ip
113
114 class Pool:
115
116     def __init__ (self, tuples,message):
117         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
118         self.message=message
119
120     def list (self):
121         for i in self.pool_items: print i.line()
122
123     def line (self):
124         line=self.message
125         for i in self.pool_items: line += ' ' + i.char()
126         return line
127
128     def _item (self, hostname):
129         for i in self.pool_items: 
130             if i.hostname==hostname: return i
131         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
132
133     def retrieve_userdata (self, hostname): 
134         return self._item(hostname).userdata
135
136     def get_ip (self, hostname):
137         try:    return self._item(hostname).get_ip()
138         except: return socket.gethostbyname(hostname)
139         
140     def set_mine (self, hostname):
141         try:
142             self._item(hostname).status='mine'
143         except:
144             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
145
146     def next_free (self):
147         for i in self.pool_items:
148             if i.status == 'free':
149                 i.status='mine'
150                 return (i.hostname,i.userdata)
151         return None
152
153     # the place were other test instances tell about their not-yet-started
154     # instances, that go undetected through sensing
155     starting='/root/starting'
156     def add_starting (self, name):
157         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
158         except: items=[]
159         if not name in items:
160             file(Pool.starting,'a').write(name+'\n')
161         for i in self.pool_items:
162             if i.hostname==name: i.status='mine'
163             
164     # we load this after actual sensing; 
165     def load_starting (self):
166         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
167         except: items=[]
168         for i in self.pool_items:
169             if i.hostname in items:
170                 if i.status=='free' : i.status='starting'
171
172     def release_my_starting (self):
173         for i in self.pool_items:
174             if i.status=='mine': 
175                 self.del_starting(i.hostname)
176                 i.status=None
177
178     def del_starting (self, name):
179         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
180         except: items=[]
181         if name in items:
182             f=file(Pool.starting,'w')
183             for item in items: 
184                 if item != name: f.write(item+'\n')
185             f.close()
186     
187     ##########
188     def _sense (self):
189         for item in self.pool_items:
190             if item.status is not None: 
191                 print item.char(),
192                 continue
193             if self.check_ping (item.hostname): 
194                 item.status='busy'
195                 print '*',
196             else:
197                 item.status='free'
198                 print '.',
199     
200     def sense (self):
201         print 'Sensing IP pool',self.message,
202         self._sense()
203         print 'Done'
204         self.load_starting()
205         print 'After starting: IP pool'
206         print self.line()
207
208     # OS-dependent ping option (support for macos, for convenience)
209     ping_timeout_option = None
210     # returns True when a given hostname/ip responds to ping
211     def check_ping (self,hostname):
212         if not Pool.ping_timeout_option:
213             (status,osname) = commands.getstatusoutput("uname -s")
214             if status != 0:
215                 raise Exception, "TestPool: Cannot figure your OS name"
216             if osname == "Linux":
217                 Pool.ping_timeout_option="-w"
218             elif osname == "Darwin":
219                 Pool.ping_timeout_option="-t"
220
221         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
222         (status,output) = commands.getstatusoutput(command)
223         return status == 0
224
225 ####################
226 class Box:
227     def __init__ (self,hostname):
228         self.hostname=hostname
229     def shortname (self):
230         return short_hostname(self.hostname)
231     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
232     def reboot (self, options):
233         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
234                             dry_run=options.dry_run)
235
236     def uptime(self):
237         if hasattr(self,'_uptime') and self._uptime: return self._uptime
238         return '*undef* uptime'
239     def sense_uptime (self):
240         command=['uptime']
241         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
242         if not self._uptime: self._uptime='unreachable'
243
244     def run(self,argv,message=None,trash_err=False,dry_run=False):
245         if dry_run:
246             print 'DRY_RUN:',
247             print " ".join(argv)
248             return 0
249         else:
250             header(message)
251             if not trash_err:
252                 return subprocess.call(argv)
253             else:
254                 return subprocess.call(argv,stderr=file('/dev/null','w'))
255                 
256     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
257         ssh_argv = self.test_ssh().actual_argv(argv)
258         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
259         if result!=0:
260             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
261         return result
262
263     def backquote (self, argv, trash_err=False):
264         if not trash_err:
265             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
266         else:
267             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
268         return result
269
270     def backquote_ssh (self, argv, trash_err=False):
271         # first probe the ssh link
272         probe_argv=self.test_ssh().actual_argv(['hostname'])
273         hostname=self.backquote ( probe_argv, trash_err=True )
274         if not hostname:
275             print "root@%s unreachable"%self.hostname
276             return ''
277         else:
278             return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
279
280 ############################################################
281 class BuildInstance:
282     def __init__ (self, buildname, pid, buildbox):
283         self.buildname=buildname
284         self.buildbox=buildbox
285         self.pids=[pid]
286
287     def add_pid(self,pid):
288         self.pids.append(pid)
289
290     def line (self):
291         return "== %s == (pids=%r)"%(self.buildname,self.pids)
292
293 class BuildBox (Box):
294     def __init__ (self,hostname):
295         Box.__init__(self,hostname)
296         self.build_instances=[]
297
298     def add_build (self,buildname,pid):
299         for build in self.build_instances:
300             if build.buildname==buildname: 
301                 build.add_pid(pid)
302                 return
303         self.build_instances.append(BuildInstance(buildname, pid, self))
304
305     def list(self):
306         if not self.build_instances: 
307             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
308         else:
309             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
310             for b in self.build_instances: 
311                 header (b.line(),banner=False)
312
313     def reboot (self, options):
314         if not options.soft:
315             self.reboot(options)
316         else:
317             command=['pkill','vbuild']
318             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
319
320     # inspect box and find currently running builds
321     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
322     def sense(self,options):
323         print 'b',
324         self.sense_uptime()
325         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
326         if not pids: return
327         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
328         ps_lines=self.backquote_ssh (command).split('\n')
329         for line in ps_lines:
330             if not line.strip() or line.find('PID')>=0: continue
331             m=BuildBox.matcher.match(line)
332             if m: 
333                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
334                 buildname=m.group('buildname').replace('@DATE@',date)
335                 self.add_build (buildname,m.group('pid'))
336             else: header('command %r returned line that failed to match'%command)
337
338 ############################################################
339 class PlcInstance:
340     def __init__ (self, vservername, ctxid, plcbox):
341         self.vservername=vservername
342         self.ctxid=ctxid
343         self.plc_box=plcbox
344         # unknown yet
345         self.timestamp=0
346
347     def set_timestamp (self,timestamp): self.timestamp=timestamp
348     def set_now (self): self.timestamp=int(time.time())
349     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
350
351     def vplcname (self):
352         return self.vservername.split('-')[-1]
353     def buildname (self):
354         return self.vservername.rsplit('-',2)[0]
355
356     def line (self):
357         msg="== %s =="%(self.vplcname())
358         msg += " [=%s]"%self.vservername
359         if self.ctxid==0:  msg+=" not (yet?) running"
360         else:              msg+=" (ctx=%s)"%self.ctxid     
361         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
362         else:              msg += " *unknown timestamp*"
363         return msg
364
365     def kill (self):
366         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
367         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
368         self.plc_box.forget(self)
369
370 class PlcBox (Box):
371     def __init__ (self, hostname, max_plcs):
372         Box.__init__(self,hostname)
373         self.plc_instances=[]
374         self.max_plcs=max_plcs
375
376     def add_vserver (self,vservername,ctxid):
377         for plc in self.plc_instances:
378             if plc.vservername==vservername: 
379                 header("WARNING, duplicate myplc %s running on %s"%\
380                            (vservername,self.hostname),banner=False)
381                 return
382         self.plc_instances.append(PlcInstance(vservername,ctxid,self))
383     
384     def forget (self, plc_instance):
385         self.plc_instances.remove(plc_instance)
386
387     # fill one slot even though this one is not started yet
388     def add_dummy (self, plcname):
389         dummy=PlcInstance('dummy_'+plcname,0,self)
390         dummy.set_now()
391         self.plc_instances.append(dummy)
392
393     def line(self): 
394         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_plcs,self.free_spots(),self.uname())
395         return msg
396         
397     def list(self):
398         if not self.plc_instances: 
399             header ('No vserver running on %s'%(self.line()))
400         else:
401             header ("Active plc VMs on %s"%self.line())
402             self.plc_instances.sort(timestamp_sort)
403             for p in self.plc_instances: 
404                 header (p.line(),banner=False)
405
406     def free_spots (self):
407         return self.max_plcs - len(self.plc_instances)
408
409     def uname(self):
410         if hasattr(self,'_uname') and self._uname: return self._uname
411         return '*undef* uname'
412
413     def plc_instance_by_vservername (self, vservername):
414         for p in self.plc_instances:
415             if p.vservername==vservername: return p
416         return None
417
418     def reboot (self, options):
419         if not options.soft:
420             self.reboot(options)
421         else:
422             self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
423                          dry_run=options.dry_run)
424
425     def sense (self, options):
426         print 'p',
427         self._uname=self.backquote_ssh(['uname','-r']).strip()
428         # try to find fullname (vserver_stat truncates to a ridiculously short name)
429         # fetch the contexts for all vservers on that box
430         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
431         context_map=self.backquote_ssh (map_command)
432         # at this point we have a set of lines like
433         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
434         ctx_dict={}
435         for map_line in context_map.split("\n"):
436             if not map_line: continue
437             [path,xid] = map_line.split(':')
438             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
439         # at this point ctx_id maps context id to vservername
440
441         command=['vserver-stat']
442         vserver_stat = self.backquote_ssh (command)
443         for vserver_line in vserver_stat.split("\n"):
444             if not vserver_line: continue
445             context=vserver_line.split()[0]
446             if context=="CTX": continue
447             longname=ctx_dict[context]
448             self.add_vserver(longname,context)
449 #            print self.margin_outline(self.vplcname(longname)),"%(vserver_line)s [=%(longname)s]"%locals()
450
451         # scan timestamps 
452         running_vsnames = [ i.vservername for i in self.plc_instances ]
453         command=   ['grep','.']
454         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
455         command += ['/dev/null']
456         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
457         for ts_line in ts_lines:
458             if not ts_line.strip(): continue
459             # expect /vservers/<vservername>.timestamp:<timestamp>
460             try:
461                 (ts_file,timestamp)=ts_line.split(':')
462                 ts_file=os.path.basename(ts_file)
463                 (vservername,_)=os.path.splitext(ts_file)
464                 timestamp=int(timestamp)
465                 p=self.plc_instance_by_vservername(vservername)
466                 if not p: 
467                     print 'WARNING zombie plc',self.hostname,ts_line
468                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
469                     continue
470                 p.set_timestamp(timestamp)
471             except:  print 'WARNING, could not parse ts line',ts_line
472         
473
474
475
476 ############################################################
477 class QemuInstance: 
478     def __init__ (self, nodename, pid, qemubox):
479         self.nodename=nodename
480         self.pid=pid
481         self.qemu_box=qemubox
482         # not known yet
483         self.buildname=None
484         self.timestamp=0
485         
486     def set_buildname (self,buildname): self.buildname=buildname
487     def set_timestamp (self,timestamp): self.timestamp=timestamp
488     def set_now (self): self.timestamp=int(time.time())
489     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
490     
491     def line (self):
492         msg = "== %s =="%(short_hostname(self.nodename))
493         msg += " [=%s]"%self.buildname
494         if self.pid:       msg += " (pid=%s)"%self.pid
495         else:              msg += " not (yet?) running"
496         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
497         else:              msg += " *unknown timestamp*"
498         return msg
499     
500     def kill(self):
501         if self.pid==0: 
502             print "cannot kill qemu %s with pid==0"%self.nodename
503             return
504         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
505         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
506         self.qemu_box.forget(self)
507
508
509 class QemuBox (Box):
510     def __init__ (self, hostname, max_qemus):
511         Box.__init__(self,hostname)
512         self.qemu_instances=[]
513         self.max_qemus=max_qemus
514
515     def add_node (self,nodename,pid):
516         for qemu in self.qemu_instances:
517             if qemu.nodename==nodename: 
518                 header("WARNING, duplicate qemu %s running on %s"%\
519                            (nodename,self.hostname), banner=False)
520                 return
521         self.qemu_instances.append(QemuInstance(nodename,pid,self))
522
523     def forget (self, qemu_instance):
524         self.qemu_instances.remove(qemu_instance)
525
526     # fill one slot even though this one is not started yet
527     def add_dummy (self, nodename):
528         dummy=QemuInstance('dummy_'+nodename,0,self)
529         dummy.set_now()
530         self.qemu_instances.append(dummy)
531
532     def line (self):
533         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_spots(),self.driver())
534         return msg
535
536     def list(self):
537         if not self.qemu_instances: 
538             header ('No qemu process on %s'%(self.line()))
539         else:
540             header ("Active qemu processes on %s"%(self.line()))
541             self.qemu_instances.sort(timestamp_sort)
542             for q in self.qemu_instances: 
543                 header (q.line(),banner=False)
544
545     def free_spots (self):
546         return self.max_qemus - len(self.qemu_instances)
547
548     def driver(self):
549         if hasattr(self,'_driver') and self._driver: return self._driver
550         return '*undef* driver'
551
552     def qemu_instance_by_pid (self,pid):
553         for q in self.qemu_instances:
554             if q.pid==pid: return q
555         return None
556
557     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
558         for q in self.qemu_instances:
559             if q.nodename==nodename and q.buildname==buildname:
560                 return q
561         return None
562
563     def reboot (self, options):
564         if not options.soft:
565             self.reboot(options)
566         else:
567             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
568                          dry_run=options.dry_run)
569
570     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
571     def sense(self, options):
572         print 'q',
573         modules=self.backquote_ssh(['lsmod']).split('\n')
574         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
575         for module in modules:
576             if module.find('kqemu')==0:
577                 self._driver='kqemu module loaded'
578             # kvm might be loaded without vkm_intel (we dont have AMD)
579             elif module.find('kvm_intel')==0:
580                 self._driver='kvm_intel module loaded'
581         ########## find out running pids
582         pids=self.backquote_ssh(['pgrep','qemu'])
583         if not pids: return
584         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
585         ps_lines = self.backquote_ssh (command).split("\n")
586         for line in ps_lines:
587             if not line.strip() or line.find('PID') >=0 : continue
588             m=QemuBox.matcher.match(line)
589             if m: self.add_node (m.group('nodename'),m.group('pid'))
590             else: header('command %r returned line that failed to match'%command)
591         ########## retrieve alive instances and map to build
592         live_builds=[]
593         command=['grep','.','*/*/qemu.pid','/dev/null']
594         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
595         for pid_line in pid_lines:
596             if not pid_line.strip(): continue
597             # expect <build>/<nodename>/qemu.pid:<pid>pid
598             try:
599                 (buildname,nodename,tail)=pid_line.split('/')
600                 (_,pid)=tail.split(':')
601                 q=self.qemu_instance_by_pid (pid)
602                 if not q: continue
603                 q.set_buildname(buildname)
604                 live_builds.append(buildname)
605             except: print 'WARNING, could not parse pid line',pid_line
606         # retrieve timestamps
607         command=   ['grep','.']
608         command += ['%s/*/timestamp'%b for b in live_builds]
609         command += ['/dev/null']
610         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
611         for ts_line in ts_lines:
612             if not ts_line.strip(): continue
613             # expect <build>/<nodename>/timestamp:<timestamp>
614             try:
615                 (buildname,nodename,tail)=ts_line.split('/')
616                 nodename=nodename.replace('qemu-','')
617                 (_,timestamp)=tail.split(':')
618                 timestamp=int(timestamp)
619                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
620                 if not q: 
621                     print 'WARNING zombie qemu',self.hostname,ts_line
622                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
623                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
624                     continue
625                 q.set_timestamp(timestamp)
626             except:  print 'WARNING, could not parse ts line',ts_line
627
628 ####################
629 class TestInstance:
630     def __init__ (self, pid, buildname):
631         self.pids=[pid]
632         self.buildname=buildname
633         # latest trace line
634         self.trace=''
635         # has a KO test
636         self.broken_steps=[]
637     def add_pid (self,pid):
638         self.pids.append(pid)
639     def set_broken (self,plcindex, step): 
640         self.broken_steps.append ( (plcindex, step,) )
641
642     def line (self):
643         msg = " == %s =="%self.buildname
644         msg += " (pid=%s)"%(self.pids)
645         if self.broken_steps:
646             msg += "\n BROKEN IN STEPS "
647             for (i,s) in self.broken_steps: msg += "step=%s,plc=%s"%(s,i)
648         return msg
649
650 class TestBox (Box):
651     def __init__ (self,hostname):
652         Box.__init__(self,hostname)
653         self.starting_ips=[]
654         self.test_instances=[]
655
656     def reboot (self, options):
657         # can't reboot a vserver VM
658         self.run_ssh (['pkill','run_log'],"Terminating current runs",
659                       dry_run=options.dry_run)
660         self.run_ssh (['rm','-f',Pool.starting],"Cleaning /root/starting",
661                       dry_run=options.dry_run)
662
663     def get_test (self, buildname):
664         for i in self.test_instances:
665             if i.buildname==buildname: return i
666
667     def add_test (self, pid, buildname):
668         i=self.get_test(buildname)
669         if i:
670             print "WARNING: 2 concurrent tests run on same build %s"%buildname
671             i.add_pid (pid)
672             return
673         self.test_instances.append (TestInstance (pid,buildname))
674
675     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
676     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
677     def sense (self, options):
678         print 't',
679         self.sense_uptime()
680         self.starting_ips=self.backquote_ssh(['cat',Pool.starting], trash_err=True).strip().split('\n')
681         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
682         if not pids: return
683         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
684         ps_lines=self.backquote_ssh (command).split('\n')
685         for line in ps_lines:
686             if not line.strip(): continue
687             m=TestBox.matcher_proc.match(line)
688             if m: 
689                 pid=m.group('pid')
690                 buildname=m.group('buildname')
691                 self.add_test(pid, buildname)
692             else: header("command %r returned line that failed to match\n%s"%(command,line))
693         buildnames=[i.buildname for i in self.test_instances]
694         if not buildnames: return
695
696 # messy - tail has different output if one or several args
697 #        command=['tail','-n','1'] + [ "/root/%s/logs/trace"%b for b in buildnames ]
698 #        trace_lines=self.backquote_ssh (command).split('\n\n')
699 #        header('TAIL')
700 #        for line in trace_lines:
701 #            if not line.strip(): continue
702 #            print 'line [[[%s]]]'%line
703         command=['grep','KO'] + [ "/root/%s/logs/trace"%b for b in buildnames ] + [ "/dev/null" ]
704         trace_lines=self.backquote_ssh (command).split('\n')
705         for line in trace_lines:
706             if not line.strip(): continue
707             m=TestBox.matcher_grep.match(line)
708             if m: 
709                 buildname=m.group('buildname')
710                 plcindex=m.group('plcindex')
711                 step=m.group('step')
712                 self.get_test(buildname).set_broken (plcindex, step)
713             else: header("command %r returned line that failed to match\n%s"%(command,line))
714             
715         
716         
717     def line (self):
718         return "%s (%s)"%(self.hostname,self.uptime())
719
720     def list (self):
721         if not self.starting_ips:
722             header ("No starting IP addresses on %s"%self.line())
723         else:
724             header ("IP addresses currently starting up on %s"%self.line())
725             self.starting_ips.sort()
726             for starting in self.starting_ips: print starting
727         if not self.test_instances:
728             header ("No running tests on %s"%self.line())
729         else:
730             header ("Running tests on %s"%self.line())
731             for i in self.test_instances: print i.line()
732
733 ############################################################
734 class Options: pass
735
736 class Substrate:
737
738     def __init__ (self):
739         self.options=Options()
740         self.options.dry_run=False
741         self.options.verbose=False
742         self.options.reboot=False
743         self.options.soft=False
744         self.test_box = TestBox (self.test_box_spec())
745         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
746         self.plc_boxes = [ PlcBox (h,m) for (h,m) in self.plc_boxes_spec ()]
747         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
748         self.all_boxes = self.plc_boxes + self.qemu_boxes
749         self._sensed=False
750
751         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs")
752         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes")
753
754     def fqdn (self, hostname):
755         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
756         return hostname
757
758     # return True if actual sensing takes place
759     def sense (self,force=False):
760         if self._sensed and not force: return False
761         print 'Sensing local substrate...',
762         for b in self.all_boxes: b.sense(self.options)
763         print 'Done'
764         self._sensed=True
765         return True
766
767     def add_dummy_plc (self, plc_boxname, plcname):
768         for pb in self.plc_boxes:
769             if pb.hostname==plc_boxname:
770                 pb.add_dummy(plcname)
771     def add_dummy_qemu (self, qemu_boxname, qemuname):
772         for qb in self.qemu_boxes:
773             if qb.hostname==qemu_boxname:
774                 qb.add_dummy(qemuname)
775
776     ########## 
777     def provision (self,plcs,options):
778         try:
779             # attach each plc to a plc box and an IP address
780             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
781             # attach each node/qemu to a qemu box with an IP address
782             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
783             # update the SFA spec accordingly
784             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
785             return plcs
786         except Exception, e:
787             print '* Could not provision this test on current substrate','--',e,'--','exiting'
788             traceback.print_exc()
789             sys.exit(1)
790
791     # it is expected that a couple of options like ips_bplc and ips_vplc 
792     # are set or unset together
793     @staticmethod
794     def check_options (x,y):
795         if not x and not y: return True
796         return len(x)==len(y)
797
798     # find an available plc box (or make space)
799     # and a free IP address (using options if present)
800     def provision_plc (self, plc, options):
801         
802         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
803
804         #### let's find an IP address for that plc
805         # look in options 
806         if options.ips_vplc:
807             # this is a rerun
808             # we don't check anything here, 
809             # it is the caller's responsability to cleanup and make sure this makes sense
810             plc_boxname = options.ips_bplc.pop()
811             vplc_hostname=options.ips_vplc.pop()
812         else:
813             if self.sense(): self.list_all()
814             plc_boxname=None
815             vplc_hostname=None
816             # try to find an available IP 
817             self.vplc_pool.sense()
818             couple=self.vplc_pool.next_free()
819             if couple:
820                 (vplc_hostname,unused)=couple
821             #### we need to find one plc box that still has a slot
822             max_free=0
823             # use the box that has max free spots for load balancing
824             for pb in self.plc_boxes:
825                 free=pb.free_spots()
826                 if free>max_free:
827                     plc_boxname=pb.hostname
828                     max_free=free
829             # if there's no available slot in the plc_boxes, or we need a free IP address
830             # make space by killing the oldest running instance
831             if not plc_boxname or not vplc_hostname:
832                 # find the oldest of all our instances
833                 all_plc_instances=reduce(lambda x, y: x+y, 
834                                          [ pb.plc_instances for pb in self.plc_boxes ],
835                                          [])
836                 all_plc_instances.sort(timestamp_sort)
837                 try:
838                     plc_instance_to_kill=all_plc_instances[0]
839                 except:
840                     msg=""
841                     if not plc_boxname: msg += " PLC boxes are full"
842                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
843                     raise Exception,"Could not make space for a PLC instance:"+msg
844                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
845                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
846                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
847                                                                   freed_plc_boxname)
848                 plc_instance_to_kill.kill()
849                 # use this new plcbox if that was the problem
850                 if not plc_boxname:
851                     plc_boxname=freed_plc_boxname
852                 # ditto for the IP address
853                 if not vplc_hostname:
854                     vplc_hostname=freed_vplc_hostname
855                     # record in pool as mine
856                     self.vplc_pool.set_mine(vplc_hostname)
857
858         # 
859         self.add_dummy_plc(plc_boxname,plc['name'])
860         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
861         self.vplc_pool.add_starting(vplc_hostname)
862
863         #### compute a helpful vserver name
864         # remove domain in hostname
865         vplc_short = short_hostname(vplc_hostname)
866         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
867         plc_name = "%s_%s"%(plc['name'],vplc_short)
868
869         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
870                           (plc['name'],plc_boxname,vplc_hostname,vservername))
871
872         #### apply in the plc_spec
873         # # informative
874         # label=options.personality.replace("linux","")
875         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
876                                    # 'name':'%s-'+label,
877                                    'name': plc_name,
878                                    'vservername':vservername,
879                                    'vserverip':vplc_ip,
880                                    'PLC_DB_HOST':vplc_hostname,
881                                    'PLC_API_HOST':vplc_hostname,
882                                    'PLC_BOOT_HOST':vplc_hostname,
883                                    'PLC_WWW_HOST':vplc_hostname,
884                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
885                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
886                                    } ) ]
887                   }
888
889
890         # mappers only work on a list of plcs
891         return TestMapper([plc],options).map(mapper)[0]
892
893     ##########
894     def provision_qemus (self, plc, options):
895
896         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
897
898         test_mapper = TestMapper ([plc], options)
899         nodenames = test_mapper.node_names()
900         maps=[]
901         for nodename in nodenames:
902
903             if options.ips_vnode:
904                 # as above, it's a rerun, take it for granted
905                 qemu_boxname=options.ips_bnode.pop()
906                 vnode_hostname=options.ips_vnode.pop()
907             else:
908                 if self.sense(): self.list_all()
909                 qemu_boxname=None
910                 vnode_hostname=None
911                 # try to find an available IP 
912                 self.vnode_pool.sense()
913                 couple=self.vnode_pool.next_free()
914                 if couple:
915                     (vnode_hostname,unused)=couple
916                 # find a physical box
917                 max_free=0
918                 # use the box that has max free spots for load balancing
919                 for qb in self.qemu_boxes:
920                     free=qb.free_spots()
921                     if free>max_free:
922                         qemu_boxname=qb.hostname
923                         max_free=free
924                 # if we miss the box or the IP, kill the oldest instance
925                 if not qemu_boxname or not vnode_hostname:
926                 # find the oldest of all our instances
927                     all_qemu_instances=reduce(lambda x, y: x+y, 
928                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
929                                               [])
930                     all_qemu_instances.sort(timestamp_sort)
931                     try:
932                         qemu_instance_to_kill=all_qemu_instances[0]
933                     except:
934                         msg=""
935                         if not qemu_boxname: msg += " QEMU boxes are full"
936                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
937                         raise Exception,"Could not make space for a QEMU instance:"+msg
938                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
939                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
940                     # kill it
941                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
942                                                                    freed_qemu_boxname)
943                     qemu_instance_to_kill.kill()
944                     # use these freed resources where needed
945                     if not qemu_boxname:
946                         qemu_boxname=freed_qemu_boxname
947                     if not vnode_hostname:
948                         vnode_hostname=freed_vnode_hostname
949                         self.vnode_pool.set_mine(vnode_hostname)
950
951             self.add_dummy_qemu (qemu_boxname,nodename)
952             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
953             ip=self.vnode_pool.get_ip (vnode_hostname)
954             self.vnode_pool.add_starting(vnode_hostname)
955
956             vnode_fqdn = self.fqdn(vnode_hostname)
957             nodemap={'host_box':qemu_boxname,
958                      'node_fields:hostname':vnode_fqdn,
959                      'interface_fields:ip':ip, 
960                      'interface_fields:mac':mac,
961                      }
962             nodemap.update(self.network_settings())
963             maps.append ( (nodename, nodemap) )
964
965             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
966                              (nodename,qemu_boxname,vnode_hostname,mac))
967
968         return test_mapper.map({'node':maps})[0]
969
970     def localize_sfa_rspec (self,plc,options):
971        
972         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
973         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
974         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
975         plc['sfa']['SFA_PLC_DB_HOST'] = plc['PLC_DB_HOST']
976         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
977         for site in plc['sites']:
978             for node in site['nodes']:
979                 plc['sfa']['sfa_slice_rspec']['part4'] = node['node_fields']['hostname']
980         return plc
981
982     #################### release:
983     def release (self,options):
984         self.vplc_pool.release_my_starting()
985         self.vnode_pool.release_my_starting()
986         pass
987
988     #################### show results for interactive mode
989     def get_box (self,boxname):
990         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
991             if b.shortname()==boxname:
992                 return b
993         print "Could not find box %s"%boxname
994         return None
995
996     def list_boxes(self,boxes):
997         print 'Sensing',
998         for box in boxes:
999             b=self.get_box(box)
1000             if not b: continue
1001             b.sense(self.options)
1002         print 'Done'
1003         for box in boxes:
1004             b=self.get_box(box)
1005             if not b: continue
1006             b.list()
1007
1008     def reboot_boxes(self,boxes):
1009         for box in boxes:
1010             b=self.get_box(box)
1011             if not b: continue
1012             b.reboot(self.options)
1013
1014     ####################
1015     # can be run as a utility to manage the local infrastructure
1016     def main (self):
1017         parser=OptionParser()
1018         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1019                            help='reboot mode (use shutdown -r)')
1020         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1021                            help='soft mode for reboot (vserver stop or kill qemus)')
1022         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1023                            help='add test box') 
1024         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1025                            help='add build boxes')
1026         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1027                            help='add plc boxes')
1028         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1029                            help='add qemu boxes') 
1030         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1031                            help='verbose mode')
1032         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1033                            help='dry run mode')
1034         (self.options,args)=parser.parse_args()
1035
1036         boxes=args
1037         if self.options.testbox: boxes += [self.test_box.hostname]
1038         if self.options.builds: boxes += [b.hostname for b in self.build_boxes]
1039         if self.options.plcs: boxes += [b.hostname for b in self.plc_boxes]
1040         if self.options.qemus: boxes += [b.hostname for b in self.qemu_boxes]
1041         boxes=list(set(boxes))
1042         
1043         # default scope
1044         if not boxes:
1045             boxes = [ b.hostname for b in \
1046                           self.build_boxes + [ self.test_box ] + \
1047                           self.plc_boxes + self.qemu_boxes ]
1048
1049         if self.options.reboot: self.reboot_boxes (boxes)
1050         else:                   self.list_boxes (boxes)