cosmetic
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72 ####################
73 # pool class
74 # allows to pick an available IP among a pool
75 # input is expressed as a list of tuples (hostname,ip,user_data)
76 # that can be searched iteratively for a free slot
77 # e.g.
78 # pool = [ (hostname1,user_data1),  
79 #          (hostname2,user_data2),  
80 #          (hostname3,user_data2),  
81 #          (hostname4,user_data4) ]
82 # assuming that ip1 and ip3 are taken (pingable), then we'd get
83 # pool=Pool(pool)
84 # pool.next_free() -> entry2
85 # pool.next_free() -> entry4
86 # pool.next_free() -> None
87 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
88
89 class PoolItem:
90     def __init__ (self,hostname,userdata):
91         self.hostname=hostname
92         self.userdata=userdata
93         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
94         # 'mine' is for our own stuff, 'starting' from the concurrent tests
95         self.status=None
96         self.ip=None
97
98     def line(self):
99         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
100
101     def char (self):
102         if   self.status==None:       return '?'
103         elif self.status=='busy':     return '+'
104         elif self.status=='free':     return '-'
105         elif self.status=='mine':     return 'M'
106         elif self.status=='starting': return 'S'
107
108     def get_ip(self):
109         if self.ip: return self.ip
110         ip=socket.gethostbyname(self.hostname)
111         self.ip=ip
112         return ip
113
114 class Pool:
115
116     def __init__ (self, tuples,message):
117         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
118         self.message=message
119
120     def list (self):
121         for i in self.pool_items: print i.line()
122
123     def line (self):
124         line=self.message
125         for i in self.pool_items: line += ' ' + i.char()
126         return line
127
128     def _item (self, hostname):
129         for i in self.pool_items: 
130             if i.hostname==hostname: return i
131         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
132
133     def retrieve_userdata (self, hostname): 
134         return self._item(hostname).userdata
135
136     def get_ip (self, hostname):
137         try:    return self._item(hostname).get_ip()
138         except: return socket.gethostbyname(hostname)
139         
140     def set_mine (self, hostname):
141         try:
142             self._item(hostname).status='mine'
143         except:
144             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
145
146     def next_free (self):
147         for i in self.pool_items:
148             if i.status == 'free':
149                 i.status='mine'
150                 return (i.hostname,i.userdata)
151         return None
152
153     # the place were other test instances tell about their not-yet-started
154     # instances, that go undetected through sensing
155     starting='/root/starting'
156     def add_starting (self, name):
157         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
158         except: items=[]
159         if not name in items:
160             file(Pool.starting,'a').write(name+'\n')
161         for i in self.pool_items:
162             if i.hostname==name: i.status='mine'
163             
164     # we load this after actual sensing; 
165     def load_starting (self):
166         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
167         except: items=[]
168         for i in self.pool_items:
169             if i.hostname in items:
170                 if i.status=='free' : i.status='starting'
171
172     def release_my_starting (self):
173         for i in self.pool_items:
174             if i.status=='mine': 
175                 self.del_starting(i.hostname)
176                 i.status=None
177
178     def del_starting (self, name):
179         try:    items=[line.strip() for line in file(Pool.starting).readlines()]
180         except: items=[]
181         if name in items:
182             f=file(Pool.starting,'w')
183             for item in items: 
184                 if item != name: f.write(item+'\n')
185             f.close()
186     
187     ##########
188     def _sense (self):
189         for item in self.pool_items:
190             if item.status is not None: 
191                 print item.char(),
192                 continue
193             if self.check_ping (item.hostname): 
194                 item.status='busy'
195                 print '*',
196             else:
197                 item.status='free'
198                 print '.',
199     
200     def sense (self):
201         print 'Sensing IP pool',self.message,
202         self._sense()
203         print 'Done'
204         self.load_starting()
205         print 'After starting: IP pool'
206         print self.line()
207
208     # OS-dependent ping option (support for macos, for convenience)
209     ping_timeout_option = None
210     # returns True when a given hostname/ip responds to ping
211     def check_ping (self,hostname):
212         if not Pool.ping_timeout_option:
213             (status,osname) = commands.getstatusoutput("uname -s")
214             if status != 0:
215                 raise Exception, "TestPool: Cannot figure your OS name"
216             if osname == "Linux":
217                 Pool.ping_timeout_option="-w"
218             elif osname == "Darwin":
219                 Pool.ping_timeout_option="-t"
220
221         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
222         (status,output) = commands.getstatusoutput(command)
223         return status == 0
224
225 ####################
226 class Box:
227     def __init__ (self,hostname):
228         self.hostname=hostname
229         self._probed=None
230     def shortname (self):
231         return short_hostname(self.hostname)
232     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
233     def reboot (self, options):
234         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
235                             dry_run=options.dry_run)
236
237     def uptime(self):
238         if hasattr(self,'_uptime') and self._uptime: return self._uptime
239         return '*undef* uptime'
240     def sense_uptime (self):
241         command=['uptime']
242         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
243         if not self._uptime: self._uptime='unreachable'
244
245     def run(self,argv,message=None,trash_err=False,dry_run=False):
246         if dry_run:
247             print 'DRY_RUN:',
248             print " ".join(argv)
249             return 0
250         else:
251             header(message)
252             if not trash_err:
253                 return subprocess.call(argv)
254             else:
255                 return subprocess.call(argv,stderr=file('/dev/null','w'))
256                 
257     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
258         ssh_argv = self.test_ssh().actual_argv(argv)
259         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
260         if result!=0:
261             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
262         return result
263
264     def backquote (self, argv, trash_err=False):
265         # print 'running backquote',argv
266         if not trash_err:
267             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
268         else:
269             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
270         return result
271
272     def probe (self):
273         if self._probed is not None: return self._probed
274         # first probe the ssh link
275         probe_argv=self.test_ssh().actual_argv(['hostname'])
276         self._probed=self.backquote ( probe_argv, trash_err=True )
277         if not self._probed: print "root@%s unreachable"%self.hostname
278         return self._probed
279
280     def backquote_ssh (self, argv, trash_err=False):
281         if not self.probe(): return ''
282         return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
283
284 ############################################################
285 class BuildInstance:
286     def __init__ (self, buildname, pid, buildbox):
287         self.buildname=buildname
288         self.buildbox=buildbox
289         self.pids=[pid]
290
291     def add_pid(self,pid):
292         self.pids.append(pid)
293
294     def line (self):
295         return "== %s == (pids=%r)"%(self.buildname,self.pids)
296
297 class BuildBox (Box):
298     def __init__ (self,hostname):
299         Box.__init__(self,hostname)
300         self.build_instances=[]
301
302     def add_build (self,buildname,pid):
303         for build in self.build_instances:
304             if build.buildname==buildname: 
305                 build.add_pid(pid)
306                 return
307         self.build_instances.append(BuildInstance(buildname, pid, self))
308
309     def list(self):
310         if not self.build_instances: 
311             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
312         else:
313             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
314             for b in self.build_instances: 
315                 header (b.line(),banner=False)
316
317     def reboot (self, options):
318         if not options.soft:
319             self.reboot(options)
320         else:
321             command=['pkill','vbuild']
322             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
323
324     # inspect box and find currently running builds
325     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
326     def sense(self, options):
327         print 'b',
328         self.sense_uptime()
329         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
330         if not pids: return
331         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
332         ps_lines=self.backquote_ssh (command).split('\n')
333         for line in ps_lines:
334             if not line.strip() or line.find('PID')>=0: continue
335             m=BuildBox.matcher.match(line)
336             if m: 
337                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
338                 buildname=m.group('buildname').replace('@DATE@',date)
339                 self.add_build (buildname,m.group('pid'))
340             else: header('command %r returned line that failed to match'%command)
341
342 ############################################################
343 class PlcInstance:
344     def __init__ (self, vservername, ctxid, plcbox):
345         self.vservername=vservername
346         self.ctxid=ctxid
347         self.plc_box=plcbox
348         # unknown yet
349         self.timestamp=0
350
351     def set_timestamp (self,timestamp): self.timestamp=timestamp
352     def set_now (self): self.timestamp=int(time.time())
353     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
354
355     def vplcname (self):
356         return self.vservername.split('-')[-1]
357     def buildname (self):
358         return self.vservername.rsplit('-',2)[0]
359
360     def line (self):
361         msg="== %s =="%(self.vplcname())
362         msg += " [=%s]"%self.vservername
363         if self.ctxid==0:  msg+=" not (yet?) running"
364         else:              msg+=" (ctx=%s)"%self.ctxid     
365         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
366         else:              msg += " *unknown timestamp*"
367         return msg
368
369     def kill (self):
370         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
371         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
372         self.plc_box.forget(self)
373
374 class PlcBox (Box):
375     def __init__ (self, hostname, max_plcs):
376         Box.__init__(self,hostname)
377         self.plc_instances=[]
378         self.max_plcs=max_plcs
379
380     def add_vserver (self,vservername,ctxid):
381         for plc in self.plc_instances:
382             if plc.vservername==vservername: 
383                 header("WARNING, duplicate myplc %s running on %s"%\
384                            (vservername,self.hostname),banner=False)
385                 return
386         self.plc_instances.append(PlcInstance(vservername,ctxid,self))
387     
388     def forget (self, plc_instance):
389         self.plc_instances.remove(plc_instance)
390
391     # fill one slot even though this one is not started yet
392     def add_dummy (self, plcname):
393         dummy=PlcInstance('dummy_'+plcname,0,self)
394         dummy.set_now()
395         self.plc_instances.append(dummy)
396
397     def line(self): 
398         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_plcs,self.free_spots(),self.uname())
399         return msg
400         
401     def list(self):
402         if not self.plc_instances: 
403             header ('No vserver running on %s'%(self.line()))
404         else:
405             header ("Active plc VMs on %s"%self.line())
406             self.plc_instances.sort(timestamp_sort)
407             for p in self.plc_instances: 
408                 header (p.line(),banner=False)
409
410     def free_spots (self):
411         return self.max_plcs - len(self.plc_instances)
412
413     def uname(self):
414         if hasattr(self,'_uname') and self._uname: return self._uname
415         return '*undef* uname'
416
417     def plc_instance_by_vservername (self, vservername):
418         for p in self.plc_instances:
419             if p.vservername==vservername: return p
420         return None
421
422     def reboot (self, options):
423         if not options.soft:
424             self.reboot(options)
425         else:
426             self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
427                          dry_run=options.dry_run)
428
429     def sense (self, options):
430         print 'p',
431         self._uname=self.backquote_ssh(['uname','-r']).strip()
432         # try to find fullname (vserver_stat truncates to a ridiculously short name)
433         # fetch the contexts for all vservers on that box
434         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
435         context_map=self.backquote_ssh (map_command)
436         # at this point we have a set of lines like
437         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
438         ctx_dict={}
439         for map_line in context_map.split("\n"):
440             if not map_line: continue
441             [path,xid] = map_line.split(':')
442             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
443         # at this point ctx_id maps context id to vservername
444
445         command=['vserver-stat']
446         vserver_stat = self.backquote_ssh (command)
447         for vserver_line in vserver_stat.split("\n"):
448             if not vserver_line: continue
449             context=vserver_line.split()[0]
450             if context=="CTX": continue
451             longname=ctx_dict[context]
452             self.add_vserver(longname,context)
453 #            print self.margin_outline(self.vplcname(longname)),"%(vserver_line)s [=%(longname)s]"%locals()
454
455         # scan timestamps 
456         running_vsnames = [ i.vservername for i in self.plc_instances ]
457         command=   ['grep','.']
458         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
459         command += ['/dev/null']
460         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
461         for ts_line in ts_lines:
462             if not ts_line.strip(): continue
463             # expect /vservers/<vservername>.timestamp:<timestamp>
464             try:
465                 (ts_file,timestamp)=ts_line.split(':')
466                 ts_file=os.path.basename(ts_file)
467                 (vservername,_)=os.path.splitext(ts_file)
468                 timestamp=int(timestamp)
469                 p=self.plc_instance_by_vservername(vservername)
470                 if not p: 
471                     print 'WARNING zombie plc',self.hostname,ts_line
472                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
473                     continue
474                 p.set_timestamp(timestamp)
475             except:  print 'WARNING, could not parse ts line',ts_line
476         
477
478
479
480 ############################################################
481 class QemuInstance: 
482     def __init__ (self, nodename, pid, qemubox):
483         self.nodename=nodename
484         self.pid=pid
485         self.qemu_box=qemubox
486         # not known yet
487         self.buildname=None
488         self.timestamp=0
489         
490     def set_buildname (self,buildname): self.buildname=buildname
491     def set_timestamp (self,timestamp): self.timestamp=timestamp
492     def set_now (self): self.timestamp=int(time.time())
493     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
494     
495     def line (self):
496         msg = "== %s =="%(short_hostname(self.nodename))
497         msg += " [=%s]"%self.buildname
498         if self.pid:       msg += " (pid=%s)"%self.pid
499         else:              msg += " not (yet?) running"
500         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
501         else:              msg += " *unknown timestamp*"
502         return msg
503     
504     def kill(self):
505         if self.pid==0: 
506             print "cannot kill qemu %s with pid==0"%self.nodename
507             return
508         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
509         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
510         self.qemu_box.forget(self)
511
512
513 class QemuBox (Box):
514     def __init__ (self, hostname, max_qemus):
515         Box.__init__(self,hostname)
516         self.qemu_instances=[]
517         self.max_qemus=max_qemus
518
519     def add_node (self,nodename,pid):
520         for qemu in self.qemu_instances:
521             if qemu.nodename==nodename: 
522                 header("WARNING, duplicate qemu %s running on %s"%\
523                            (nodename,self.hostname), banner=False)
524                 return
525         self.qemu_instances.append(QemuInstance(nodename,pid,self))
526
527     def forget (self, qemu_instance):
528         self.qemu_instances.remove(qemu_instance)
529
530     # fill one slot even though this one is not started yet
531     def add_dummy (self, nodename):
532         dummy=QemuInstance('dummy_'+nodename,0,self)
533         dummy.set_now()
534         self.qemu_instances.append(dummy)
535
536     def line (self):
537         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_spots(),self.driver())
538         return msg
539
540     def list(self):
541         if not self.qemu_instances: 
542             header ('No qemu process on %s'%(self.line()))
543         else:
544             header ("Active qemu processes on %s"%(self.line()))
545             self.qemu_instances.sort(timestamp_sort)
546             for q in self.qemu_instances: 
547                 header (q.line(),banner=False)
548
549     def free_spots (self):
550         return self.max_qemus - len(self.qemu_instances)
551
552     def driver(self):
553         if hasattr(self,'_driver') and self._driver: return self._driver
554         return '*undef* driver'
555
556     def qemu_instance_by_pid (self,pid):
557         for q in self.qemu_instances:
558             if q.pid==pid: return q
559         return None
560
561     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
562         for q in self.qemu_instances:
563             if q.nodename==nodename and q.buildname==buildname:
564                 return q
565         return None
566
567     def reboot (self, options):
568         if not options.soft:
569             self.reboot(options)
570         else:
571             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
572                          dry_run=options.dry_run)
573
574     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
575     def sense(self, options):
576         print 'q',
577         modules=self.backquote_ssh(['lsmod']).split('\n')
578         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
579         for module in modules:
580             if module.find('kqemu')==0:
581                 self._driver='kqemu module loaded'
582             # kvm might be loaded without vkm_intel (we dont have AMD)
583             elif module.find('kvm_intel')==0:
584                 self._driver='kvm_intel module loaded'
585         ########## find out running pids
586         pids=self.backquote_ssh(['pgrep','qemu'])
587         if not pids: return
588         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
589         ps_lines = self.backquote_ssh (command).split("\n")
590         for line in ps_lines:
591             if not line.strip() or line.find('PID') >=0 : continue
592             m=QemuBox.matcher.match(line)
593             if m: self.add_node (m.group('nodename'),m.group('pid'))
594             else: header('command %r returned line that failed to match'%command)
595         ########## retrieve alive instances and map to build
596         live_builds=[]
597         command=['grep','.','*/*/qemu.pid','/dev/null']
598         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
599         for pid_line in pid_lines:
600             if not pid_line.strip(): continue
601             # expect <build>/<nodename>/qemu.pid:<pid>pid
602             try:
603                 (buildname,nodename,tail)=pid_line.split('/')
604                 (_,pid)=tail.split(':')
605                 q=self.qemu_instance_by_pid (pid)
606                 if not q: continue
607                 q.set_buildname(buildname)
608                 live_builds.append(buildname)
609             except: print 'WARNING, could not parse pid line',pid_line
610         # retrieve timestamps
611         command=   ['grep','.']
612         command += ['%s/*/timestamp'%b for b in live_builds]
613         command += ['/dev/null']
614         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
615         for ts_line in ts_lines:
616             if not ts_line.strip(): continue
617             # expect <build>/<nodename>/timestamp:<timestamp>
618             try:
619                 (buildname,nodename,tail)=ts_line.split('/')
620                 nodename=nodename.replace('qemu-','')
621                 (_,timestamp)=tail.split(':')
622                 timestamp=int(timestamp)
623                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
624                 if not q: 
625                     print 'WARNING zombie qemu',self.hostname,ts_line
626                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
627                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
628                     continue
629                 q.set_timestamp(timestamp)
630             except:  print 'WARNING, could not parse ts line',ts_line
631
632 ####################
633 class TestInstance:
634     def __init__ (self, buildname, pid=0):
635         self.pids=[]
636         if pid!=0: self.pid.append(pid)
637         self.buildname=buildname
638         # latest trace line
639         self.trace=''
640         # has a KO test
641         self.broken_steps=[]
642         self.timestamp = 0
643
644     def set_timestamp (self,timestamp): self.timestamp=timestamp
645     def set_now (self): self.timestamp=int(time.time())
646     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
647
648
649     def add_pid (self,pid):
650         self.pids.append(pid)
651     def set_broken (self,plcindex, step): 
652         self.broken_steps.append ( (plcindex, step,) )
653
654     def line (self):
655         double='=='
656         if self.pids: double='*'+double[1]
657         if self.broken_steps: double=double[0]+'B'
658         msg = " %s %s =="%(double,self.buildname)
659         if not self.pids:       pass
660         elif len(self.pids)==1: msg += " (pid=%s)"%self.pids[0]
661         else:                   msg += " !!!pids=%s!!!"%self.pids
662         msg += " @%s"%self.pretty_timestamp()
663         if self.broken_steps:
664             msg += "\n BROKEN IN STEPS"
665             for (i,s) in self.broken_steps: msg += " %s@%s"%(s,i)
666         return msg
667
668 class TestBox (Box):
669     def __init__ (self,hostname):
670         Box.__init__(self,hostname)
671         self.starting_ips=[]
672         self.test_instances=[]
673
674     def reboot (self, options):
675         # can't reboot a vserver VM
676         self.run_ssh (['pkill','run_log'],"Terminating current runs",
677                       dry_run=options.dry_run)
678         self.run_ssh (['rm','-f',Pool.starting],"Cleaning %s"%Pool.starting,
679                       dry_run=options.dry_run)
680
681     def get_test (self, buildname):
682         for i in self.test_instances:
683             if i.buildname==buildname: return i
684
685     # we scan ALL remaining test results, even the ones not running
686     def add_timestamp (self, buildname, timestamp):
687         i=self.get_test(buildname)
688         if i:   
689             i.set_timestamp(timestamp)
690         else:   
691             i=TestInstance(buildname,0)
692             i.set_timestamp(timestamp)
693             self.test_instances.append(i)
694
695     def add_running_test (self, pid, buildname):
696         i=self.get_test(buildname)
697         if not i:
698             self.test_instances.append (TestInstance (buildname,pid))
699             return
700         if i.pids:
701             print "WARNING: 2 concurrent tests run on same build %s"%buildname
702         i.add_pid (pid)
703
704     def add_broken (self, buildname, plcindex, step):
705         i=self.get_test(buildname)
706         if not i:
707             i=TestInstance(buildname)
708             self.test_instances.append(i)
709         i.set_broken(plcindex, step)
710
711     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
712     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
713     def sense (self, options):
714         print 't',
715         self.sense_uptime()
716         self.starting_ips=[x for x in self.backquote_ssh(['cat',Pool.starting], trash_err=True).strip().split('\n') if x]
717
718         # scan timestamps on all tests
719         # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
720         # xxx would make sense above too
721         command=['bash','-c',"grep . /root/*/timestamp /dev/null"]
722         #ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
723         ts_lines=self.backquote_ssh(command).split('\n')
724         for ts_line in ts_lines:
725             if not ts_line.strip(): continue
726             # expect /root/<buildname>/timestamp:<timestamp>
727             try:
728                 (ts_file,timestamp)=ts_line.split(':')
729                 ts_file=os.path.dirname(ts_file)
730                 buildname=os.path.basename(ts_file)
731                 timestamp=int(timestamp)
732                 t=self.add_timestamp(buildname,timestamp)
733             except:  print 'WARNING, could not parse ts line',ts_line
734
735         command=['bash','-c',"grep KO /root/*/logs/trace* /dev/null" ]
736         trace_lines=self.backquote_ssh (command).split('\n')
737         for line in trace_lines:
738             if not line.strip(): continue
739             m=TestBox.matcher_grep.match(line)
740             if m: 
741                 buildname=m.group('buildname')
742                 plcindex=m.group('plcindex')
743                 step=m.group('step')
744                 self.add_broken(buildname,plcindex, step)
745             else: header("command %r returned line that failed to match\n%s"%(command,line))
746
747         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
748         if not pids: return
749         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
750         ps_lines=self.backquote_ssh (command).split('\n')
751         for line in ps_lines:
752             if not line.strip(): continue
753             m=TestBox.matcher_proc.match(line)
754             if m: 
755                 pid=m.group('pid')
756                 buildname=m.group('buildname')
757                 self.add_running_test(pid, buildname)
758             else: header("command %r returned line that failed to match\n%s"%(command,line))
759         
760         
761     def line (self):
762         return "%s (%s)"%(self.hostname,self.uptime())
763
764     def list (self):
765         if not self.test_instances:
766             header ("No known tests on %s"%self.line())
767         else:
768             header ("Known tests on %s"%self.line())
769             self.test_instances.sort(timestamp_sort)
770             for i in self.test_instances: print i.line()
771         if self.starting_ips:
772             header ("Starting IP addresses on %s"%self.line())
773             self.starting_ips.sort()
774             for starting in self.starting_ips: print starting
775
776 ############################################################
777 class Options: pass
778
779 class Substrate:
780
781     def __init__ (self):
782         self.options=Options()
783         self.options.dry_run=False
784         self.options.verbose=False
785         self.options.reboot=False
786         self.options.soft=False
787         self.test_box = TestBox (self.test_box_spec())
788         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
789         self.plc_boxes = [ PlcBox (h,m) for (h,m) in self.plc_boxes_spec ()]
790         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
791         self.all_boxes = self.plc_boxes + self.qemu_boxes
792         self._sensed=False
793
794         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs")
795         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes")
796
797     def fqdn (self, hostname):
798         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
799         return hostname
800
801     # return True if actual sensing takes place
802     def sense (self,force=False):
803         if self._sensed and not force: return False
804         print 'Sensing local substrate...',
805         for b in self.all_boxes: b.sense(self.options)
806         print 'Done'
807         self._sensed=True
808         return True
809
810     def list (self):
811         for b in self.all_boxes:
812             b.list()
813
814     def add_dummy_plc (self, plc_boxname, plcname):
815         for pb in self.plc_boxes:
816             if pb.hostname==plc_boxname:
817                 pb.add_dummy(plcname)
818     def add_dummy_qemu (self, qemu_boxname, qemuname):
819         for qb in self.qemu_boxes:
820             if qb.hostname==qemu_boxname:
821                 qb.add_dummy(qemuname)
822
823     ########## 
824     def provision (self,plcs,options):
825         try:
826             # attach each plc to a plc box and an IP address
827             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
828             # attach each node/qemu to a qemu box with an IP address
829             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
830             # update the SFA spec accordingly
831             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
832             return plcs
833         except Exception, e:
834             print '* Could not provision this test on current substrate','--',e,'--','exiting'
835             traceback.print_exc()
836             sys.exit(1)
837
838     # it is expected that a couple of options like ips_bplc and ips_vplc 
839     # are set or unset together
840     @staticmethod
841     def check_options (x,y):
842         if not x and not y: return True
843         return len(x)==len(y)
844
845     # find an available plc box (or make space)
846     # and a free IP address (using options if present)
847     def provision_plc (self, plc, options):
848         
849         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
850
851         #### let's find an IP address for that plc
852         # look in options 
853         if options.ips_vplc:
854             # this is a rerun
855             # we don't check anything here, 
856             # it is the caller's responsability to cleanup and make sure this makes sense
857             plc_boxname = options.ips_bplc.pop()
858             vplc_hostname=options.ips_vplc.pop()
859         else:
860             if self.sense(): self.list()
861             plc_boxname=None
862             vplc_hostname=None
863             # try to find an available IP 
864             self.vplc_pool.sense()
865             couple=self.vplc_pool.next_free()
866             if couple:
867                 (vplc_hostname,unused)=couple
868             #### we need to find one plc box that still has a slot
869             max_free=0
870             # use the box that has max free spots for load balancing
871             for pb in self.plc_boxes:
872                 free=pb.free_spots()
873                 if free>max_free:
874                     plc_boxname=pb.hostname
875                     max_free=free
876             # if there's no available slot in the plc_boxes, or we need a free IP address
877             # make space by killing the oldest running instance
878             if not plc_boxname or not vplc_hostname:
879                 # find the oldest of all our instances
880                 all_plc_instances=reduce(lambda x, y: x+y, 
881                                          [ pb.plc_instances for pb in self.plc_boxes ],
882                                          [])
883                 all_plc_instances.sort(timestamp_sort)
884                 try:
885                     plc_instance_to_kill=all_plc_instances[0]
886                 except:
887                     msg=""
888                     if not plc_boxname: msg += " PLC boxes are full"
889                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
890                     raise Exception,"Could not make space for a PLC instance:"+msg
891                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
892                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
893                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
894                                                                   freed_plc_boxname)
895                 plc_instance_to_kill.kill()
896                 # use this new plcbox if that was the problem
897                 if not plc_boxname:
898                     plc_boxname=freed_plc_boxname
899                 # ditto for the IP address
900                 if not vplc_hostname:
901                     vplc_hostname=freed_vplc_hostname
902                     # record in pool as mine
903                     self.vplc_pool.set_mine(vplc_hostname)
904
905         # 
906         self.add_dummy_plc(plc_boxname,plc['name'])
907         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
908         self.vplc_pool.add_starting(vplc_hostname)
909
910         #### compute a helpful vserver name
911         # remove domain in hostname
912         vplc_short = short_hostname(vplc_hostname)
913         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
914         plc_name = "%s_%s"%(plc['name'],vplc_short)
915
916         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
917                           (plc['name'],plc_boxname,vplc_hostname,vservername))
918
919         #### apply in the plc_spec
920         # # informative
921         # label=options.personality.replace("linux","")
922         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
923                                    # 'name':'%s-'+label,
924                                    'name': plc_name,
925                                    'vservername':vservername,
926                                    'vserverip':vplc_ip,
927                                    'PLC_DB_HOST':vplc_hostname,
928                                    'PLC_API_HOST':vplc_hostname,
929                                    'PLC_BOOT_HOST':vplc_hostname,
930                                    'PLC_WWW_HOST':vplc_hostname,
931                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
932                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
933                                    } ) ]
934                   }
935
936
937         # mappers only work on a list of plcs
938         return TestMapper([plc],options).map(mapper)[0]
939
940     ##########
941     def provision_qemus (self, plc, options):
942
943         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
944
945         test_mapper = TestMapper ([plc], options)
946         nodenames = test_mapper.node_names()
947         maps=[]
948         for nodename in nodenames:
949
950             if options.ips_vnode:
951                 # as above, it's a rerun, take it for granted
952                 qemu_boxname=options.ips_bnode.pop()
953                 vnode_hostname=options.ips_vnode.pop()
954             else:
955                 if self.sense(): self.list()
956                 qemu_boxname=None
957                 vnode_hostname=None
958                 # try to find an available IP 
959                 self.vnode_pool.sense()
960                 couple=self.vnode_pool.next_free()
961                 if couple:
962                     (vnode_hostname,unused)=couple
963                 # find a physical box
964                 max_free=0
965                 # use the box that has max free spots for load balancing
966                 for qb in self.qemu_boxes:
967                     free=qb.free_spots()
968                     if free>max_free:
969                         qemu_boxname=qb.hostname
970                         max_free=free
971                 # if we miss the box or the IP, kill the oldest instance
972                 if not qemu_boxname or not vnode_hostname:
973                 # find the oldest of all our instances
974                     all_qemu_instances=reduce(lambda x, y: x+y, 
975                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
976                                               [])
977                     all_qemu_instances.sort(timestamp_sort)
978                     try:
979                         qemu_instance_to_kill=all_qemu_instances[0]
980                     except:
981                         msg=""
982                         if not qemu_boxname: msg += " QEMU boxes are full"
983                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
984                         raise Exception,"Could not make space for a QEMU instance:"+msg
985                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
986                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
987                     # kill it
988                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
989                                                                    freed_qemu_boxname)
990                     qemu_instance_to_kill.kill()
991                     # use these freed resources where needed
992                     if not qemu_boxname:
993                         qemu_boxname=freed_qemu_boxname
994                     if not vnode_hostname:
995                         vnode_hostname=freed_vnode_hostname
996                         self.vnode_pool.set_mine(vnode_hostname)
997
998             self.add_dummy_qemu (qemu_boxname,nodename)
999             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
1000             ip=self.vnode_pool.get_ip (vnode_hostname)
1001             self.vnode_pool.add_starting(vnode_hostname)
1002
1003             vnode_fqdn = self.fqdn(vnode_hostname)
1004             nodemap={'host_box':qemu_boxname,
1005                      'node_fields:hostname':vnode_fqdn,
1006                      'interface_fields:ip':ip, 
1007                      'interface_fields:mac':mac,
1008                      }
1009             nodemap.update(self.network_settings())
1010             maps.append ( (nodename, nodemap) )
1011
1012             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
1013                              (nodename,qemu_boxname,vnode_hostname,mac))
1014
1015         return test_mapper.map({'node':maps})[0]
1016
1017     def localize_sfa_rspec (self,plc,options):
1018        
1019         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
1020         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
1021         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
1022         plc['sfa']['SFA_PLC_DB_HOST'] = plc['PLC_DB_HOST']
1023         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
1024         for site in plc['sites']:
1025             for node in site['nodes']:
1026                 plc['sfa']['sfa_slice_rspec']['part4'] = node['node_fields']['hostname']
1027         return plc
1028
1029     #################### release:
1030     def release (self,options):
1031         self.vplc_pool.release_my_starting()
1032         self.vnode_pool.release_my_starting()
1033         pass
1034
1035     #################### show results for interactive mode
1036     def get_box (self,boxname):
1037         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1038             if b.shortname()==boxname:
1039                 return b
1040         print "Could not find box %s"%boxname
1041         return None
1042
1043     def list_boxes(self,boxnames):
1044         print 'Sensing',
1045         for boxname in boxnames:
1046             b=self.get_box(boxname)
1047             if not b: continue
1048             b.sense(self.options)
1049         print 'Done'
1050         for boxname in boxnames:
1051             b=self.get_box(boxname)
1052             if not b: continue
1053             b.list()
1054
1055     def reboot_boxes(self,boxnames):
1056         for boxname in boxnames:
1057             b=self.get_box(boxname)
1058             if not b: continue
1059             b.reboot(self.options)
1060
1061     ####################
1062     # can be run as a utility to manage the local infrastructure
1063     def main (self):
1064         parser=OptionParser()
1065         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1066                            help='reboot mode (use shutdown -r)')
1067         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1068                            help='soft mode for reboot (vserver stop or kill qemus)')
1069         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1070                            help='add test box') 
1071         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1072                            help='add build boxes')
1073         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1074                            help='add plc boxes')
1075         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1076                            help='add qemu boxes') 
1077         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1078                            help='verbose mode')
1079         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1080                            help='dry run mode')
1081         (self.options,args)=parser.parse_args()
1082
1083         boxes=args
1084         if self.options.testbox: boxes += [self.test_box.hostname]
1085         if self.options.builds: boxes += [b.hostname for b in self.build_boxes]
1086         if self.options.plcs: boxes += [b.hostname for b in self.plc_boxes]
1087         if self.options.qemus: boxes += [b.hostname for b in self.qemu_boxes]
1088         boxes=list(set(boxes))
1089         
1090         # default scope
1091         if not boxes:
1092             boxes = [ b.hostname for b in \
1093                           self.build_boxes + self.plc_boxes + self.qemu_boxes ]
1094
1095         if self.options.reboot: self.reboot_boxes (boxes)
1096         else:                   self.list_boxes (boxes)