testmaster sensing
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72 ####################
73 # pool class
74 # allows to pick an available IP among a pool
75 # input is expressed as a list of tuples (hostname,ip,user_data)
76 # that can be searched iteratively for a free slot
77 # e.g.
78 # pool = [ (hostname1,user_data1),  
79 #          (hostname2,user_data2),  
80 #          (hostname3,user_data2),  
81 #          (hostname4,user_data4) ]
82 # assuming that ip1 and ip3 are taken (pingable), then we'd get
83 # pool=Pool(pool)
84 # pool.next_free() -> entry2
85 # pool.next_free() -> entry4
86 # pool.next_free() -> None
87 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
88
89 class PoolItem:
90     def __init__ (self,hostname,userdata):
91         self.hostname=hostname
92         self.userdata=userdata
93         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
94         # 'mine' is for our own stuff, 'starting' from the concurrent tests
95         self.status=None
96         self.ip=None
97
98     def line(self):
99         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
100
101     def char (self):
102         if   self.status==None:       return '?'
103         elif self.status=='busy':     return '+'
104         elif self.status=='free':     return '-'
105         elif self.status=='mine':     return 'M'
106         elif self.status=='starting': return 'S'
107
108     def get_ip(self):
109         if self.ip: return self.ip
110         ip=socket.gethostbyname(self.hostname)
111         self.ip=ip
112         return ip
113
114 class Pool:
115
116     def __init__ (self, tuples,message):
117         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
118         self.message=message
119
120     def list (self):
121         for i in self.pool_items: print i.line()
122
123     def line (self):
124         line=self.message
125         for i in self.pool_items: line += ' ' + i.char()
126         return line
127
128     def _item (self, hostname):
129         for i in self.pool_items: 
130             if i.hostname==hostname: return i
131         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
132
133     def retrieve_userdata (self, hostname): 
134         return self._item(hostname).userdata
135
136     def get_ip (self, hostname):
137         try:    return self._item(hostname).get_ip()
138         except: return socket.gethostbyname(hostname)
139         
140     def set_mine (self, hostname):
141         try:
142             self._item(hostname).status='mine'
143         except:
144             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
145
146     def next_free (self):
147         for i in self.pool_items:
148             if i.status == 'free':
149                 i.status='mine'
150                 return (i.hostname,i.userdata)
151         return None
152
153     # the place were other test instances tell about their not-yet-started
154     # instances, that go undetected through sensing
155     starting='/root/starting'
156     def add_starting (self, name):
157         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
158         except: items=[]
159         if not name in items:
160             file(Pool.starting,'a').write(name+'\n')
161         for i in self.pool_items:
162             if i.hostname==name: i.status='mine'
163             
164     # we load this after actual sensing; 
165     def load_starting (self):
166         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
167         except: items=[]
168         for i in self.pool_items:
169             if i.hostname in items:
170                 if i.status=='free' : i.status='starting'
171
172     def release_my_starting (self):
173         for i in self.pool_items:
174             if i.status=='mine': 
175                 self.del_starting(i.hostname)
176                 i.status=None
177
178     def del_starting (self, name):
179         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
180         except: items=[]
181         if name in items:
182             f=file(Pool.starting,'w')
183             for item in items: 
184                 if item != name: f.write(item+'\n')
185             f.close()
186     
187     ##########
188     def _sense (self):
189         for item in self.pool_items:
190             if item.status is not None: 
191                 print item.char(),
192                 continue
193             if self.check_ping (item.hostname): 
194                 item.status='busy'
195                 print '*',
196             else:
197                 item.status='free'
198                 print '.',
199     
200     def sense (self):
201         print 'Sensing IP pool',self.message,
202         self._sense()
203         print 'Done'
204         self.load_starting()
205         print 'After starting: IP pool'
206         print self.line()
207
208     # OS-dependent ping option (support for macos, for convenience)
209     ping_timeout_option = None
210     # returns True when a given hostname/ip responds to ping
211     def check_ping (self,hostname):
212         if not Pool.ping_timeout_option:
213             (status,osname) = commands.getstatusoutput("uname -s")
214             if status != 0:
215                 raise Exception, "TestPool: Cannot figure your OS name"
216             if osname == "Linux":
217                 Pool.ping_timeout_option="-w"
218             elif osname == "Darwin":
219                 Pool.ping_timeout_option="-t"
220
221         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
222         (status,output) = commands.getstatusoutput(command)
223         return status == 0
224
225 ####################
226 class Box:
227     def __init__ (self,hostname):
228         self.hostname=hostname
229         self._probed=None
230     def shortname (self):
231         return short_hostname(self.hostname)
232     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
233     def reboot (self, options):
234         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
235                             dry_run=options.dry_run)
236
237     def uptime(self):
238         if hasattr(self,'_uptime') and self._uptime: return self._uptime
239         return '*undef* uptime'
240     def sense_uptime (self):
241         command=['uptime']
242         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
243         if not self._uptime: self._uptime='unreachable'
244
245     def run(self,argv,message=None,trash_err=False,dry_run=False):
246         if dry_run:
247             print 'DRY_RUN:',
248             print " ".join(argv)
249             return 0
250         else:
251             header(message)
252             if not trash_err:
253                 return subprocess.call(argv)
254             else:
255                 return subprocess.call(argv,stderr=file('/dev/null','w'))
256                 
257     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
258         ssh_argv = self.test_ssh().actual_argv(argv)
259         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
260         if result!=0:
261             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
262         return result
263
264     def backquote (self, argv, trash_err=False):
265         # print 'running backquote',argv
266         if not trash_err:
267             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
268         else:
269             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
270         return result
271
272     def probe (self):
273         if self._probed is not None: return self._probed
274         # first probe the ssh link
275         probe_argv=self.test_ssh().actual_argv(['hostname'])
276         self._probed=self.backquote ( probe_argv, trash_err=True )
277         if not self._probed: print "root@%s unreachable"%self.hostname
278         return self._probed
279
280     def backquote_ssh (self, argv, trash_err=False):
281         if not self.probe(): return ''
282         return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
283
284 ############################################################
285 class BuildInstance:
286     def __init__ (self, buildname, pid, buildbox):
287         self.buildname=buildname
288         self.buildbox=buildbox
289         self.pids=[pid]
290
291     def add_pid(self,pid):
292         self.pids.append(pid)
293
294     def line (self):
295         return "== %s == (pids=%r)"%(self.buildname,self.pids)
296
297 class BuildBox (Box):
298     def __init__ (self,hostname):
299         Box.__init__(self,hostname)
300         self.build_instances=[]
301
302     def add_build (self,buildname,pid):
303         for build in self.build_instances:
304             if build.buildname==buildname: 
305                 build.add_pid(pid)
306                 return
307         self.build_instances.append(BuildInstance(buildname, pid, self))
308
309     def list(self):
310         if not self.build_instances: 
311             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
312         else:
313             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
314             for b in self.build_instances: 
315                 header (b.line(),banner=False)
316
317     def reboot (self, options):
318         if not options.soft:
319             self.reboot(options)
320         else:
321             command=['pkill','vbuild']
322             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
323
324     # inspect box and find currently running builds
325     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
326     def sense(self, options):
327         print 'b',
328         self.sense_uptime()
329         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
330         if not pids: return
331         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
332         ps_lines=self.backquote_ssh (command).split('\n')
333         for line in ps_lines:
334             if not line.strip() or line.find('PID')>=0: continue
335             m=BuildBox.matcher.match(line)
336             if m: 
337                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
338                 buildname=m.group('buildname').replace('@DATE@',date)
339                 self.add_build (buildname,m.group('pid'))
340             else: header('command %r returned line that failed to match'%command)
341
342 ############################################################
343 class PlcInstance:
344     def __init__ (self, vservername, ctxid, plcbox):
345         self.vservername=vservername
346         self.ctxid=ctxid
347         self.plc_box=plcbox
348         # unknown yet
349         self.timestamp=0
350
351     def set_timestamp (self,timestamp): self.timestamp=timestamp
352     def set_now (self): self.timestamp=int(time.time())
353     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
354
355     def vplcname (self):
356         return self.vservername.split('-')[-1]
357     def buildname (self):
358         return self.vservername.rsplit('-',2)[0]
359
360     def line (self):
361         msg="== %s =="%(self.vplcname())
362         msg += " [=%s]"%self.vservername
363         if self.ctxid==0:  msg+=" not (yet?) running"
364         else:              msg+=" (ctx=%s)"%self.ctxid     
365         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
366         else:              msg += " *unknown timestamp*"
367         return msg
368
369     def kill (self):
370         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
371         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
372         self.plc_box.forget(self)
373
374 class PlcBox (Box):
375     def __init__ (self, hostname, max_plcs):
376         Box.__init__(self,hostname)
377         self.plc_instances=[]
378         self.max_plcs=max_plcs
379
380     def add_vserver (self,vservername,ctxid):
381         for plc in self.plc_instances:
382             if plc.vservername==vservername: 
383                 header("WARNING, duplicate myplc %s running on %s"%\
384                            (vservername,self.hostname),banner=False)
385                 return
386         self.plc_instances.append(PlcInstance(vservername,ctxid,self))
387     
388     def forget (self, plc_instance):
389         self.plc_instances.remove(plc_instance)
390
391     # fill one slot even though this one is not started yet
392     def add_dummy (self, plcname):
393         dummy=PlcInstance('dummy_'+plcname,0,self)
394         dummy.set_now()
395         self.plc_instances.append(dummy)
396
397     def line(self): 
398         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_plcs,self.free_spots(),self.uname())
399         return msg
400         
401     def list(self):
402         if not self.plc_instances: 
403             header ('No vserver running on %s'%(self.line()))
404         else:
405             header ("Active plc VMs on %s"%self.line())
406             self.plc_instances.sort(timestamp_sort)
407             for p in self.plc_instances: 
408                 header (p.line(),banner=False)
409
410     def free_spots (self):
411         return self.max_plcs - len(self.plc_instances)
412
413     def uname(self):
414         if hasattr(self,'_uname') and self._uname: return self._uname
415         return '*undef* uname'
416
417     def plc_instance_by_vservername (self, vservername):
418         for p in self.plc_instances:
419             if p.vservername==vservername: return p
420         return None
421
422     def reboot (self, options):
423         if not options.soft:
424             self.reboot(options)
425         else:
426             self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
427                          dry_run=options.dry_run)
428
429     def sense (self, options):
430         print 'p',
431         self._uname=self.backquote_ssh(['uname','-r']).strip()
432         # try to find fullname (vserver_stat truncates to a ridiculously short name)
433         # fetch the contexts for all vservers on that box
434         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
435         context_map=self.backquote_ssh (map_command)
436         # at this point we have a set of lines like
437         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
438         ctx_dict={}
439         for map_line in context_map.split("\n"):
440             if not map_line: continue
441             [path,xid] = map_line.split(':')
442             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
443         # at this point ctx_id maps context id to vservername
444
445         command=['vserver-stat']
446         vserver_stat = self.backquote_ssh (command)
447         for vserver_line in vserver_stat.split("\n"):
448             if not vserver_line: continue
449             context=vserver_line.split()[0]
450             if context=="CTX": continue
451             longname=ctx_dict[context]
452             self.add_vserver(longname,context)
453 #            print self.margin_outline(self.vplcname(longname)),"%(vserver_line)s [=%(longname)s]"%locals()
454
455         # scan timestamps 
456         running_vsnames = [ i.vservername for i in self.plc_instances ]
457         command=   ['grep','.']
458         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
459         command += ['/dev/null']
460         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
461         for ts_line in ts_lines:
462             if not ts_line.strip(): continue
463             # expect /vservers/<vservername>.timestamp:<timestamp>
464             try:
465                 (ts_file,timestamp)=ts_line.split(':')
466                 ts_file=os.path.basename(ts_file)
467                 (vservername,_)=os.path.splitext(ts_file)
468                 timestamp=int(timestamp)
469                 p=self.plc_instance_by_vservername(vservername)
470                 if not p: 
471                     print 'WARNING zombie plc',self.hostname,ts_line
472                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
473                     continue
474                 p.set_timestamp(timestamp)
475             except:  print 'WARNING, could not parse ts line',ts_line
476         
477
478
479
480 ############################################################
481 class QemuInstance: 
482     def __init__ (self, nodename, pid, qemubox):
483         self.nodename=nodename
484         self.pid=pid
485         self.qemu_box=qemubox
486         # not known yet
487         self.buildname=None
488         self.timestamp=0
489         
490     def set_buildname (self,buildname): self.buildname=buildname
491     def set_timestamp (self,timestamp): self.timestamp=timestamp
492     def set_now (self): self.timestamp=int(time.time())
493     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
494     
495     def line (self):
496         msg = "== %s =="%(short_hostname(self.nodename))
497         msg += " [=%s]"%self.buildname
498         if self.pid:       msg += " (pid=%s)"%self.pid
499         else:              msg += " not (yet?) running"
500         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
501         else:              msg += " *unknown timestamp*"
502         return msg
503     
504     def kill(self):
505         if self.pid==0: 
506             print "cannot kill qemu %s with pid==0"%self.nodename
507             return
508         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
509         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
510         self.qemu_box.forget(self)
511
512
513 class QemuBox (Box):
514     def __init__ (self, hostname, max_qemus):
515         Box.__init__(self,hostname)
516         self.qemu_instances=[]
517         self.max_qemus=max_qemus
518
519     def add_node (self,nodename,pid):
520         for qemu in self.qemu_instances:
521             if qemu.nodename==nodename: 
522                 header("WARNING, duplicate qemu %s running on %s"%\
523                            (nodename,self.hostname), banner=False)
524                 return
525         self.qemu_instances.append(QemuInstance(nodename,pid,self))
526
527     def forget (self, qemu_instance):
528         self.qemu_instances.remove(qemu_instance)
529
530     # fill one slot even though this one is not started yet
531     def add_dummy (self, nodename):
532         dummy=QemuInstance('dummy_'+nodename,0,self)
533         dummy.set_now()
534         self.qemu_instances.append(dummy)
535
536     def line (self):
537         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_spots(),self.driver())
538         return msg
539
540     def list(self):
541         if not self.qemu_instances: 
542             header ('No qemu process on %s'%(self.line()))
543         else:
544             header ("Active qemu processes on %s"%(self.line()))
545             self.qemu_instances.sort(timestamp_sort)
546             for q in self.qemu_instances: 
547                 header (q.line(),banner=False)
548
549     def free_spots (self):
550         return self.max_qemus - len(self.qemu_instances)
551
552     def driver(self):
553         if hasattr(self,'_driver') and self._driver: return self._driver
554         return '*undef* driver'
555
556     def qemu_instance_by_pid (self,pid):
557         for q in self.qemu_instances:
558             if q.pid==pid: return q
559         return None
560
561     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
562         for q in self.qemu_instances:
563             if q.nodename==nodename and q.buildname==buildname:
564                 return q
565         return None
566
567     def reboot (self, options):
568         if not options.soft:
569             self.reboot(options)
570         else:
571             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
572                          dry_run=options.dry_run)
573
574     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
575     def sense(self, options):
576         print 'q',
577         modules=self.backquote_ssh(['lsmod']).split('\n')
578         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
579         for module in modules:
580             if module.find('kqemu')==0:
581                 self._driver='kqemu module loaded'
582             # kvm might be loaded without vkm_intel (we dont have AMD)
583             elif module.find('kvm_intel')==0:
584                 self._driver='kvm_intel module loaded'
585         ########## find out running pids
586         pids=self.backquote_ssh(['pgrep','qemu'])
587         if not pids: return
588         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
589         ps_lines = self.backquote_ssh (command).split("\n")
590         for line in ps_lines:
591             if not line.strip() or line.find('PID') >=0 : continue
592             m=QemuBox.matcher.match(line)
593             if m: self.add_node (m.group('nodename'),m.group('pid'))
594             else: header('command %r returned line that failed to match'%command)
595         ########## retrieve alive instances and map to build
596         live_builds=[]
597         command=['grep','.','*/*/qemu.pid','/dev/null']
598         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
599         for pid_line in pid_lines:
600             if not pid_line.strip(): continue
601             # expect <build>/<nodename>/qemu.pid:<pid>pid
602             try:
603                 (buildname,nodename,tail)=pid_line.split('/')
604                 (_,pid)=tail.split(':')
605                 q=self.qemu_instance_by_pid (pid)
606                 if not q: continue
607                 q.set_buildname(buildname)
608                 live_builds.append(buildname)
609             except: print 'WARNING, could not parse pid line',pid_line
610         # retrieve timestamps
611         command=   ['grep','.']
612         command += ['%s/*/timestamp'%b for b in live_builds]
613         command += ['/dev/null']
614         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
615         for ts_line in ts_lines:
616             if not ts_line.strip(): continue
617             # expect <build>/<nodename>/timestamp:<timestamp>
618             try:
619                 (buildname,nodename,tail)=ts_line.split('/')
620                 nodename=nodename.replace('qemu-','')
621                 (_,timestamp)=tail.split(':')
622                 timestamp=int(timestamp)
623                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
624                 if not q: 
625                     print 'WARNING zombie qemu',self.hostname,ts_line
626                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
627                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
628                     continue
629                 q.set_timestamp(timestamp)
630             except:  print 'WARNING, could not parse ts line',ts_line
631
632 ####################
633 class TestInstance:
634     def __init__ (self, buildname, pid=0):
635         self.pids=[]
636         if pid!=0: self.pid.append(pid)
637         self.buildname=buildname
638         # latest trace line
639         self.trace=''
640         # has a KO test
641         self.broken_steps=[]
642         self.timestamp = 0
643
644     def set_timestamp (self,timestamp): self.timestamp=timestamp
645     def set_now (self): self.timestamp=int(time.time())
646     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
647
648
649     def add_pid (self,pid):
650         self.pids.append(pid)
651     def set_broken (self,plcindex, step): 
652         self.broken_steps.append ( (plcindex, step,) )
653
654     def line (self):
655         double='=='
656         if self.pids: double='*'+double[1]
657         if self.broken_steps: double=double[0]+'B'
658         msg = " %s %s =="%(double,self.buildname)
659         if not self.pids:       pass
660         elif len(self.pids)==1: msg += " (pid=%s)"%self.pids[0]
661         else:                   msg += " !!!pids=%s!!!"%self.pids
662         msg += " @%s"%self.pretty_timestamp()
663         if self.broken_steps:
664             msg += "\n BROKEN IN STEPS "
665             for (i,s) in self.broken_steps: msg += "%s@%s"%(s,i)
666         return msg
667
668 class TestBox (Box):
669     def __init__ (self,hostname):
670         Box.__init__(self,hostname)
671         self.starting_ips=[]
672         self.test_instances=[]
673
674     def reboot (self, options):
675         # can't reboot a vserver VM
676         self.run_ssh (['pkill','run_log'],"Terminating current runs",
677                       dry_run=options.dry_run)
678         self.run_ssh (['rm','-f',Pool.starting],"Cleaning %s"%Pool.starting,
679                       dry_run=options.dry_run)
680
681     def get_test (self, buildname):
682         for i in self.test_instances:
683             if i.buildname==buildname: return i
684
685     # we scan ALL remaining test results, even the ones not running
686     def add_timestamp (self, buildname, timestamp):
687         i=self.get_test(buildname)
688         if i:   
689             i.set_timestamp(timestamp)
690         else:   
691             i=TestInstance(buildname,0)
692             i.set_timestamp(timestamp)
693             self.test_instances.append(i)
694
695     def add_running_test (self, pid, buildname):
696         i=self.get_test(buildname)
697         if not i:
698             self.test_instances.append (TestInstance (buildname,pid))
699             return
700         if i.pid!=0:
701             print "WARNING: 2 concurrent tests run on same build %s"%buildname
702             i.add_pid (pid)
703             return
704         else:
705             i.pid=pid
706
707     def add_broken (self, buildname, plcindex, step):
708         i=self.get_test(buildname)
709         if not i:
710             i=TestInstance(buildname)
711             self.test_instances.append(i)
712         i.set_broken(plcindex, step)
713
714     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
715     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
716     def sense (self, options):
717         print 't',
718         self.sense_uptime()
719         self.starting_ips=self.backquote_ssh(['cat',Pool.starting], trash_err=True).strip().split('\n')
720
721         # scan timestamps on all tests
722         # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
723         # xxx would make sense above too
724         command=['bash','-c',"grep . /root/*/timestamp /dev/null"]
725         #ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
726         ts_lines=self.backquote_ssh(command).split('\n')
727         for ts_line in ts_lines:
728             if not ts_line.strip(): continue
729             # expect /root/<buildname>/timestamp:<timestamp>
730             try:
731                 (ts_file,timestamp)=ts_line.split(':')
732                 ts_file=os.path.dirname(ts_file)
733                 buildname=os.path.basename(ts_file)
734                 timestamp=int(timestamp)
735                 t=self.add_timestamp(buildname,timestamp)
736             except:  print 'WARNING, could not parse ts line',ts_line
737
738         command=['bash','-c',"grep KO /root/*/logs/trace* /dev/null" ]
739         trace_lines=self.backquote_ssh (command).split('\n')
740         for line in trace_lines:
741             if not line.strip(): continue
742             m=TestBox.matcher_grep.match(line)
743             if m: 
744                 buildname=m.group('buildname')
745                 plcindex=m.group('plcindex')
746                 step=m.group('step')
747                 self.add_broken(buildname,plcindex, step)
748             else: header("command %r returned line that failed to match\n%s"%(command,line))
749
750         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
751         if not pids: return
752         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
753         ps_lines=self.backquote_ssh (command).split('\n')
754         for line in ps_lines:
755             if not line.strip(): continue
756             m=TestBox.matcher_proc.match(line)
757             if m: 
758                 pid=m.group('pid')
759                 buildname=m.group('buildname')
760                 self.add_running_test(pid, buildname)
761             else: header("command %r returned line that failed to match\n%s"%(command,line))
762         
763         
764     def line (self):
765         return "%s (%s)"%(self.hostname,self.uptime())
766
767     def list (self):
768         if not self.test_instances:
769             header ("No known tests on %s"%self.line())
770         else:
771             header ("Known tests on %s"%self.line())
772             self.test_instances.sort(timestamp_sort)
773             for i in self.test_instances: print i.line()
774         if self.starting_ips:
775             header ("Starting IP addresses on %s"%self.line())
776             self.starting_ips.sort()
777             for starting in self.starting_ips: print starting
778
779 ############################################################
780 class Options: pass
781
782 class Substrate:
783
784     def __init__ (self):
785         self.options=Options()
786         self.options.dry_run=False
787         self.options.verbose=False
788         self.options.reboot=False
789         self.options.soft=False
790         self.test_box = TestBox (self.test_box_spec())
791         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
792         self.plc_boxes = [ PlcBox (h,m) for (h,m) in self.plc_boxes_spec ()]
793         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
794         self.all_boxes = self.plc_boxes + self.qemu_boxes
795         self._sensed=False
796
797         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs")
798         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes")
799
800     def fqdn (self, hostname):
801         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
802         return hostname
803
804     # return True if actual sensing takes place
805     def sense (self,force=False):
806         if self._sensed and not force: return False
807         print 'Sensing local substrate...',
808         for b in self.all_boxes: b.sense(self.options)
809         print 'Done'
810         self._sensed=True
811         return True
812
813     def list (self):
814         for b in self.all_boxes:
815             b.list()
816
817     def add_dummy_plc (self, plc_boxname, plcname):
818         for pb in self.plc_boxes:
819             if pb.hostname==plc_boxname:
820                 pb.add_dummy(plcname)
821     def add_dummy_qemu (self, qemu_boxname, qemuname):
822         for qb in self.qemu_boxes:
823             if qb.hostname==qemu_boxname:
824                 qb.add_dummy(qemuname)
825
826     ########## 
827     def provision (self,plcs,options):
828         try:
829             # attach each plc to a plc box and an IP address
830             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
831             # attach each node/qemu to a qemu box with an IP address
832             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
833             # update the SFA spec accordingly
834             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
835             return plcs
836         except Exception, e:
837             print '* Could not provision this test on current substrate','--',e,'--','exiting'
838             traceback.print_exc()
839             sys.exit(1)
840
841     # it is expected that a couple of options like ips_bplc and ips_vplc 
842     # are set or unset together
843     @staticmethod
844     def check_options (x,y):
845         if not x and not y: return True
846         return len(x)==len(y)
847
848     # find an available plc box (or make space)
849     # and a free IP address (using options if present)
850     def provision_plc (self, plc, options):
851         
852         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
853
854         #### let's find an IP address for that plc
855         # look in options 
856         if options.ips_vplc:
857             # this is a rerun
858             # we don't check anything here, 
859             # it is the caller's responsability to cleanup and make sure this makes sense
860             plc_boxname = options.ips_bplc.pop()
861             vplc_hostname=options.ips_vplc.pop()
862         else:
863             if self.sense(): self.list()
864             plc_boxname=None
865             vplc_hostname=None
866             # try to find an available IP 
867             self.vplc_pool.sense()
868             couple=self.vplc_pool.next_free()
869             if couple:
870                 (vplc_hostname,unused)=couple
871             #### we need to find one plc box that still has a slot
872             max_free=0
873             # use the box that has max free spots for load balancing
874             for pb in self.plc_boxes:
875                 free=pb.free_spots()
876                 if free>max_free:
877                     plc_boxname=pb.hostname
878                     max_free=free
879             # if there's no available slot in the plc_boxes, or we need a free IP address
880             # make space by killing the oldest running instance
881             if not plc_boxname or not vplc_hostname:
882                 # find the oldest of all our instances
883                 all_plc_instances=reduce(lambda x, y: x+y, 
884                                          [ pb.plc_instances for pb in self.plc_boxes ],
885                                          [])
886                 all_plc_instances.sort(timestamp_sort)
887                 try:
888                     plc_instance_to_kill=all_plc_instances[0]
889                 except:
890                     msg=""
891                     if not plc_boxname: msg += " PLC boxes are full"
892                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
893                     raise Exception,"Could not make space for a PLC instance:"+msg
894                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
895                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
896                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
897                                                                   freed_plc_boxname)
898                 plc_instance_to_kill.kill()
899                 # use this new plcbox if that was the problem
900                 if not plc_boxname:
901                     plc_boxname=freed_plc_boxname
902                 # ditto for the IP address
903                 if not vplc_hostname:
904                     vplc_hostname=freed_vplc_hostname
905                     # record in pool as mine
906                     self.vplc_pool.set_mine(vplc_hostname)
907
908         # 
909         self.add_dummy_plc(plc_boxname,plc['name'])
910         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
911         self.vplc_pool.add_starting(vplc_hostname)
912
913         #### compute a helpful vserver name
914         # remove domain in hostname
915         vplc_short = short_hostname(vplc_hostname)
916         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
917         plc_name = "%s_%s"%(plc['name'],vplc_short)
918
919         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
920                           (plc['name'],plc_boxname,vplc_hostname,vservername))
921
922         #### apply in the plc_spec
923         # # informative
924         # label=options.personality.replace("linux","")
925         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
926                                    # 'name':'%s-'+label,
927                                    'name': plc_name,
928                                    'vservername':vservername,
929                                    'vserverip':vplc_ip,
930                                    'PLC_DB_HOST':vplc_hostname,
931                                    'PLC_API_HOST':vplc_hostname,
932                                    'PLC_BOOT_HOST':vplc_hostname,
933                                    'PLC_WWW_HOST':vplc_hostname,
934                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
935                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
936                                    } ) ]
937                   }
938
939
940         # mappers only work on a list of plcs
941         return TestMapper([plc],options).map(mapper)[0]
942
943     ##########
944     def provision_qemus (self, plc, options):
945
946         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
947
948         test_mapper = TestMapper ([plc], options)
949         nodenames = test_mapper.node_names()
950         maps=[]
951         for nodename in nodenames:
952
953             if options.ips_vnode:
954                 # as above, it's a rerun, take it for granted
955                 qemu_boxname=options.ips_bnode.pop()
956                 vnode_hostname=options.ips_vnode.pop()
957             else:
958                 if self.sense(): self.list()
959                 qemu_boxname=None
960                 vnode_hostname=None
961                 # try to find an available IP 
962                 self.vnode_pool.sense()
963                 couple=self.vnode_pool.next_free()
964                 if couple:
965                     (vnode_hostname,unused)=couple
966                 # find a physical box
967                 max_free=0
968                 # use the box that has max free spots for load balancing
969                 for qb in self.qemu_boxes:
970                     free=qb.free_spots()
971                     if free>max_free:
972                         qemu_boxname=qb.hostname
973                         max_free=free
974                 # if we miss the box or the IP, kill the oldest instance
975                 if not qemu_boxname or not vnode_hostname:
976                 # find the oldest of all our instances
977                     all_qemu_instances=reduce(lambda x, y: x+y, 
978                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
979                                               [])
980                     all_qemu_instances.sort(timestamp_sort)
981                     try:
982                         qemu_instance_to_kill=all_qemu_instances[0]
983                     except:
984                         msg=""
985                         if not qemu_boxname: msg += " QEMU boxes are full"
986                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
987                         raise Exception,"Could not make space for a QEMU instance:"+msg
988                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
989                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
990                     # kill it
991                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
992                                                                    freed_qemu_boxname)
993                     qemu_instance_to_kill.kill()
994                     # use these freed resources where needed
995                     if not qemu_boxname:
996                         qemu_boxname=freed_qemu_boxname
997                     if not vnode_hostname:
998                         vnode_hostname=freed_vnode_hostname
999                         self.vnode_pool.set_mine(vnode_hostname)
1000
1001             self.add_dummy_qemu (qemu_boxname,nodename)
1002             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
1003             ip=self.vnode_pool.get_ip (vnode_hostname)
1004             self.vnode_pool.add_starting(vnode_hostname)
1005
1006             vnode_fqdn = self.fqdn(vnode_hostname)
1007             nodemap={'host_box':qemu_boxname,
1008                      'node_fields:hostname':vnode_fqdn,
1009                      'interface_fields:ip':ip, 
1010                      'interface_fields:mac':mac,
1011                      }
1012             nodemap.update(self.network_settings())
1013             maps.append ( (nodename, nodemap) )
1014
1015             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
1016                              (nodename,qemu_boxname,vnode_hostname,mac))
1017
1018         return test_mapper.map({'node':maps})[0]
1019
1020     def localize_sfa_rspec (self,plc,options):
1021        
1022         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
1023         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
1024         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
1025         plc['sfa']['SFA_PLC_DB_HOST'] = plc['PLC_DB_HOST']
1026         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
1027         for site in plc['sites']:
1028             for node in site['nodes']:
1029                 plc['sfa']['sfa_slice_rspec']['part4'] = node['node_fields']['hostname']
1030         return plc
1031
1032     #################### release:
1033     def release (self,options):
1034         self.vplc_pool.release_my_starting()
1035         self.vnode_pool.release_my_starting()
1036         pass
1037
1038     #################### show results for interactive mode
1039     def get_box (self,boxname):
1040         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1041             if b.shortname()==boxname:
1042                 return b
1043         print "Could not find box %s"%boxname
1044         return None
1045
1046     def list_boxes(self,boxnames):
1047         print 'Sensing',
1048         for boxname in boxnames:
1049             b=self.get_box(boxname)
1050             if not b: continue
1051             b.sense(self.options)
1052         print 'Done'
1053         for boxname in boxnames:
1054             b=self.get_box(boxname)
1055             if not b: continue
1056             b.list()
1057
1058     def reboot_boxes(self,boxnames):
1059         for boxname in boxnames:
1060             b=self.get_box(boxname)
1061             if not b: continue
1062             b.reboot(self.options)
1063
1064     ####################
1065     # can be run as a utility to manage the local infrastructure
1066     def main (self):
1067         parser=OptionParser()
1068         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1069                            help='reboot mode (use shutdown -r)')
1070         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1071                            help='soft mode for reboot (vserver stop or kill qemus)')
1072         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1073                            help='add test box') 
1074         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1075                            help='add build boxes')
1076         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1077                            help='add plc boxes')
1078         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1079                            help='add qemu boxes') 
1080         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1081                            help='verbose mode')
1082         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1083                            help='dry run mode')
1084         (self.options,args)=parser.parse_args()
1085
1086         boxes=args
1087         if self.options.testbox: boxes += [self.test_box.hostname]
1088         if self.options.builds: boxes += [b.hostname for b in self.build_boxes]
1089         if self.options.plcs: boxes += [b.hostname for b in self.plc_boxes]
1090         if self.options.qemus: boxes += [b.hostname for b in self.qemu_boxes]
1091         boxes=list(set(boxes))
1092         
1093         # default scope
1094         if not boxes:
1095             boxes = [ b.hostname for b in \
1096                           self.build_boxes + [ self.test_box ] + \
1097                           self.plc_boxes + self.qemu_boxes ]
1098
1099         if self.options.reboot: self.reboot_boxes (boxes)
1100         else:                   self.list_boxes (boxes)