cosmetic
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72
73 ####################
74 # the place were other test instances tell about their not-yet-started
75 # instances, that go undetected through sensing
76 class Starting:
77
78     location='/root/starting'
79     def __init__ (self):
80         self.tuples=[]
81
82     def load (self):
83         try:    self.tuples=[line.strip().split('@') 
84                              for line in file(Starting.location).readlines()]
85         except: self.tuples=[]
86
87     def vnames (self) : 
88         self.load()
89         return [ x for (x,_) in self.tuples ]
90
91     def add (self, vname, bname):
92         if not vname in self.vnames():
93             file(Starting.location,'a').write("%s@%s\n"%(vname,bname))
94             
95     def delete_vname (self, vname):
96         self.load()
97         if vname in self.vnames():
98             f=file(Starting.location,'w')
99             for (v,b) in self.tuples: 
100                 if v != vname: f.write("%s@%s\n"%(v,b))
101             f.close()
102     
103 ####################
104 # pool class
105 # allows to pick an available IP among a pool
106 # input is expressed as a list of tuples (hostname,ip,user_data)
107 # that can be searched iteratively for a free slot
108 # e.g.
109 # pool = [ (hostname1,user_data1),  
110 #          (hostname2,user_data2),  
111 #          (hostname3,user_data2),  
112 #          (hostname4,user_data4) ]
113 # assuming that ip1 and ip3 are taken (pingable), then we'd get
114 # pool=Pool(pool)
115 # pool.next_free() -> entry2
116 # pool.next_free() -> entry4
117 # pool.next_free() -> None
118 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
119
120 class PoolItem:
121     def __init__ (self,hostname,userdata):
122         self.hostname=hostname
123         self.userdata=userdata
124         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
125         # 'mine' is for our own stuff, 'starting' from the concurrent tests
126         self.status=None
127         self.ip=None
128
129     def line(self):
130         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
131
132     def char (self):
133         if   self.status==None:       return '?'
134         elif self.status=='busy':     return '+'
135         elif self.status=='free':     return '-'
136         elif self.status=='mine':     return 'M'
137         elif self.status=='starting': return 'S'
138
139     def get_ip(self):
140         if self.ip: return self.ip
141         ip=socket.gethostbyname(self.hostname)
142         self.ip=ip
143         return ip
144
145 class Pool:
146
147     def __init__ (self, tuples,message, substrate):
148         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
149         self.message=message
150         # where to send notifications upon load_starting
151         self.substrate=substrate
152
153     def list (self):
154         for i in self.pool_items: print i.line()
155
156     def line (self):
157         line=self.message
158         for i in self.pool_items: line += ' ' + i.char()
159         return line
160
161     def _item (self, hostname):
162         for i in self.pool_items: 
163             if i.hostname==hostname: return i
164         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
165
166     def retrieve_userdata (self, hostname): 
167         return self._item(hostname).userdata
168
169     def get_ip (self, hostname):
170         try:    return self._item(hostname).get_ip()
171         except: return socket.gethostbyname(hostname)
172         
173     def set_mine (self, hostname):
174         try:
175             self._item(hostname).status='mine'
176         except:
177             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
178
179     def next_free (self):
180         for i in self.pool_items:
181             if i.status == 'free':
182                 i.status='mine'
183                 return (i.hostname,i.userdata)
184         return None
185
186     ####################
187     # we have a starting instance of our own
188     def add_starting (self, vname, bname):
189         Starting().add(vname,bname)
190         for i in self.pool_items:
191             if i.hostname==vname: i.status='mine'
192
193     # load the starting instances from the common file
194     # remember that might be ours
195     # return the list of (vname,bname) that are not ours
196     def load_starting (self):
197         starting=Starting()
198         starting.load()
199         new_tuples=[]
200         for (v,b) in starting.tuples:
201             for i in self.pool_items:
202                 if i.hostname==v and i.status=='free':
203                     i.status='starting'
204                     new_tuples.append( (v,b,) )
205         return new_tuples
206
207     def release_my_starting (self):
208         for i in self.pool_items:
209             if i.status=='mine':
210                 Starting().delete_vname (i.hostname)
211                 i.status=None
212
213
214     ##########
215     def _sense (self):
216         for item in self.pool_items:
217             if item.status is not None: 
218                 print item.char(),
219                 continue
220             if self.check_ping (item.hostname): 
221                 item.status='busy'
222                 print '*',
223             else:
224                 item.status='free'
225                 print '.',
226     
227     def sense (self):
228         print 'Sensing IP pool',self.message,
229         self._sense()
230         print 'Done'
231         for (vname,bname) in self.load_starting():
232             self.substrate.add_starting_dummy (bname, vname)
233         print 'After starting: IP pool'
234         print self.line()
235     # OS-dependent ping option (support for macos, for convenience)
236     ping_timeout_option = None
237     # returns True when a given hostname/ip responds to ping
238     def check_ping (self,hostname):
239         if not Pool.ping_timeout_option:
240             (status,osname) = commands.getstatusoutput("uname -s")
241             if status != 0:
242                 raise Exception, "TestPool: Cannot figure your OS name"
243             if osname == "Linux":
244                 Pool.ping_timeout_option="-w"
245             elif osname == "Darwin":
246                 Pool.ping_timeout_option="-t"
247
248         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
249         (status,output) = commands.getstatusoutput(command)
250         return status == 0
251
252 ####################
253 class Box:
254     def __init__ (self,hostname):
255         self.hostname=hostname
256         self._probed=None
257     def shortname (self):
258         return short_hostname(self.hostname)
259     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
260     def reboot (self, options):
261         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
262                             dry_run=options.dry_run)
263
264     def uptime(self):
265         if hasattr(self,'_uptime') and self._uptime: return self._uptime
266         return '*undef* uptime'
267     def sense_uptime (self):
268         command=['uptime']
269         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
270         if not self._uptime: self._uptime='unreachable'
271
272     def run(self,argv,message=None,trash_err=False,dry_run=False):
273         if dry_run:
274             print 'DRY_RUN:',
275             print " ".join(argv)
276             return 0
277         else:
278             header(message)
279             if not trash_err:
280                 return subprocess.call(argv)
281             else:
282                 return subprocess.call(argv,stderr=file('/dev/null','w'))
283                 
284     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
285         ssh_argv = self.test_ssh().actual_argv(argv)
286         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
287         if result!=0:
288             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
289         return result
290
291     def backquote (self, argv, trash_err=False):
292         # print 'running backquote',argv
293         if not trash_err:
294             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
295         else:
296             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
297         return result
298
299     def probe (self):
300         if self._probed is not None: return self._probed
301         # first probe the ssh link
302         probe_argv=self.test_ssh().actual_argv(['hostname'])
303         self._probed=self.backquote ( probe_argv, trash_err=True )
304         if not self._probed: print "root@%s unreachable"%self.hostname
305         return self._probed
306
307     # use argv=['bash','-c',"the command line"]
308     # if you have any shell-expanded arguments like *
309     # and if there's any chance the command is adressed to the local host
310     def backquote_ssh (self, argv, trash_err=False):
311         if not self.probe(): return ''
312         return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
313
314 ############################################################
315 class BuildInstance:
316     def __init__ (self, buildname, pid, buildbox):
317         self.buildname=buildname
318         self.buildbox=buildbox
319         self.pids=[pid]
320
321     def add_pid(self,pid):
322         self.pids.append(pid)
323
324     def line (self):
325         return "== %s == (pids=%r)"%(self.buildname,self.pids)
326
327 class BuildBox (Box):
328     def __init__ (self,hostname):
329         Box.__init__(self,hostname)
330         self.build_instances=[]
331
332     def add_build (self,buildname,pid):
333         for build in self.build_instances:
334             if build.buildname==buildname: 
335                 build.add_pid(pid)
336                 return
337         self.build_instances.append(BuildInstance(buildname, pid, self))
338
339     def list(self):
340         if not self.build_instances: 
341             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
342         else:
343             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
344             for b in self.build_instances: 
345                 header (b.line(),banner=False)
346
347     def reboot (self, options):
348         if not options.soft:
349             self.reboot(options)
350         else:
351             command=['pkill','vbuild']
352             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
353
354     # inspect box and find currently running builds
355     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
356     matcher_building_vm=re.compile("\s*(?P<pid>[0-9]+).*init-vserver.*-i\s+eth.\s+(?P<buildname>[^\s]+)\s*\Z")
357     def sense(self, options):
358         print 'b',
359         self.sense_uptime()
360         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
361         if not pids: return
362         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
363         ps_lines=self.backquote_ssh (command).split('\n')
364         for line in ps_lines:
365             if not line.strip() or line.find('PID')>=0: continue
366             m=BuildBox.matcher.match(line)
367             if m: 
368                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
369                 buildname=m.group('buildname').replace('@DATE@',date)
370                 self.add_build (buildname,m.group('pid'))
371                 continue
372             m=BuildBox.matcher_building_vm.match(line)
373             if m: 
374                 # buildname is expansed here
375                 self.add_build (buildname,m.group('pid'))
376                 continue
377             header('BuildBox.sense: command %r returned line that failed to match'%command)
378             header(">>%s<<"%line)
379
380 ############################################################
381 class PlcInstance:
382     def __init__ (self, vservername, ctxid, plcbox):
383         self.vservername=vservername
384         self.ctxid=ctxid
385         self.plc_box=plcbox
386         # unknown yet
387         self.timestamp=0
388
389     def set_timestamp (self,timestamp): self.timestamp=timestamp
390     def set_now (self): self.timestamp=int(time.time())
391     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
392
393     def vplcname (self):
394         return self.vservername.split('-')[-1]
395     def buildname (self):
396         return self.vservername.rsplit('-',2)[0]
397
398     def line (self):
399         msg="== %s =="%(self.vplcname())
400         msg += " [=%s]"%self.vservername
401         if self.ctxid==0:  msg+=" not (yet?) running"
402         else:              msg+=" (ctx=%s)"%self.ctxid     
403         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
404         else:              msg += " *unknown timestamp*"
405         return msg
406
407     def kill (self):
408         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
409         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
410         self.plc_box.forget(self)
411
412 class PlcBox (Box):
413     def __init__ (self, hostname, max_plcs):
414         Box.__init__(self,hostname)
415         self.plc_instances=[]
416         self.max_plcs=max_plcs
417
418     def add_vserver (self,vservername,ctxid):
419         for plc in self.plc_instances:
420             if plc.vservername==vservername: 
421                 header("WARNING, duplicate myplc %s running on %s"%\
422                            (vservername,self.hostname),banner=False)
423                 return
424         self.plc_instances.append(PlcInstance(vservername,ctxid,self))
425     
426     def forget (self, plc_instance):
427         self.plc_instances.remove(plc_instance)
428
429     # fill one slot even though this one is not started yet
430     def add_dummy (self, plcname):
431         dummy=PlcInstance('dummy_'+plcname,0,self)
432         dummy.set_now()
433         self.plc_instances.append(dummy)
434
435     def line(self): 
436         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_plcs,self.free_spots(),self.uname())
437         return msg
438         
439     def list(self):
440         if not self.plc_instances: 
441             header ('No vserver running on %s'%(self.line()))
442         else:
443             header ("Active plc VMs on %s"%self.line())
444             self.plc_instances.sort(timestamp_sort)
445             for p in self.plc_instances: 
446                 header (p.line(),banner=False)
447
448     def free_spots (self):
449         return self.max_plcs - len(self.plc_instances)
450
451     def uname(self):
452         if hasattr(self,'_uname') and self._uname: return self._uname
453         return '*undef* uname'
454
455     def plc_instance_by_vservername (self, vservername):
456         for p in self.plc_instances:
457             if p.vservername==vservername: return p
458         return None
459
460     def reboot (self, options):
461         if not options.soft:
462             self.reboot(options)
463         else:
464             self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
465                          dry_run=options.dry_run)
466
467     def sense (self, options):
468         print 'p',
469         self._uname=self.backquote_ssh(['uname','-r']).strip()
470         # try to find fullname (vserver_stat truncates to a ridiculously short name)
471         # fetch the contexts for all vservers on that box
472         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
473         context_map=self.backquote_ssh (map_command)
474         # at this point we have a set of lines like
475         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
476         ctx_dict={}
477         for map_line in context_map.split("\n"):
478             if not map_line: continue
479             [path,xid] = map_line.split(':')
480             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
481         # at this point ctx_id maps context id to vservername
482
483         command=['vserver-stat']
484         vserver_stat = self.backquote_ssh (command)
485         for vserver_line in vserver_stat.split("\n"):
486             if not vserver_line: continue
487             context=vserver_line.split()[0]
488             if context=="CTX": continue
489             try:
490                 longname=ctx_dict[context]
491                 self.add_vserver(longname,context)
492             except:
493                 print 'WARNING: found ctx %s in vserver_stat but was unable to figure a corresp. vserver'%context
494
495         # scan timestamps 
496         running_vsnames = [ i.vservername for i in self.plc_instances ]
497         command=   ['grep','.']
498         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
499         command += ['/dev/null']
500         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
501         for ts_line in ts_lines:
502             if not ts_line.strip(): continue
503             # expect /vservers/<vservername>.timestamp:<timestamp>
504             try:
505                 (ts_file,timestamp)=ts_line.split(':')
506                 ts_file=os.path.basename(ts_file)
507                 (vservername,_)=os.path.splitext(ts_file)
508                 timestamp=int(timestamp)
509                 p=self.plc_instance_by_vservername(vservername)
510                 if not p: 
511                     print 'WARNING zombie plc',self.hostname,ts_line
512                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
513                     continue
514                 p.set_timestamp(timestamp)
515             except:  print 'WARNING, could not parse ts line',ts_line
516         
517
518
519
520 ############################################################
521 class QemuInstance: 
522     def __init__ (self, nodename, pid, qemubox):
523         self.nodename=nodename
524         self.pid=pid
525         self.qemu_box=qemubox
526         # not known yet
527         self.buildname=None
528         self.timestamp=0
529         
530     def set_buildname (self,buildname): self.buildname=buildname
531     def set_timestamp (self,timestamp): self.timestamp=timestamp
532     def set_now (self): self.timestamp=int(time.time())
533     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
534     
535     def line (self):
536         msg = "== %s =="%(short_hostname(self.nodename))
537         msg += " [=%s]"%self.buildname
538         if self.pid:       msg += " (pid=%s)"%self.pid
539         else:              msg += " not (yet?) running"
540         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
541         else:              msg += " *unknown timestamp*"
542         return msg
543     
544     def kill(self):
545         if self.pid==0: 
546             print "cannot kill qemu %s with pid==0"%self.nodename
547             return
548         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
549         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
550         self.qemu_box.forget(self)
551
552
553 class QemuBox (Box):
554     def __init__ (self, hostname, max_qemus):
555         Box.__init__(self,hostname)
556         self.qemu_instances=[]
557         self.max_qemus=max_qemus
558
559     def add_node (self,nodename,pid):
560         for qemu in self.qemu_instances:
561             if qemu.nodename==nodename: 
562                 header("WARNING, duplicate qemu %s running on %s"%\
563                            (nodename,self.hostname), banner=False)
564                 return
565         self.qemu_instances.append(QemuInstance(nodename,pid,self))
566
567     def forget (self, qemu_instance):
568         self.qemu_instances.remove(qemu_instance)
569
570     # fill one slot even though this one is not started yet
571     def add_dummy (self, nodename):
572         dummy=QemuInstance('dummy_'+nodename,0,self)
573         dummy.set_now()
574         self.qemu_instances.append(dummy)
575
576     def line (self):
577         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_spots(),self.driver())
578         return msg
579
580     def list(self):
581         if not self.qemu_instances: 
582             header ('No qemu process on %s'%(self.line()))
583         else:
584             header ("Active qemu processes on %s"%(self.line()))
585             self.qemu_instances.sort(timestamp_sort)
586             for q in self.qemu_instances: 
587                 header (q.line(),banner=False)
588
589     def free_spots (self):
590         return self.max_qemus - len(self.qemu_instances)
591
592     def driver(self):
593         if hasattr(self,'_driver') and self._driver: return self._driver
594         return '*undef* driver'
595
596     def qemu_instance_by_pid (self,pid):
597         for q in self.qemu_instances:
598             if q.pid==pid: return q
599         return None
600
601     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
602         for q in self.qemu_instances:
603             if q.nodename==nodename and q.buildname==buildname:
604                 return q
605         return None
606
607     def reboot (self, options):
608         if not options.soft:
609             self.reboot(options)
610         else:
611             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
612                          dry_run=options.dry_run)
613
614     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
615     def sense(self, options):
616         print 'q',
617         modules=self.backquote_ssh(['lsmod']).split('\n')
618         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
619         for module in modules:
620             if module.find('kqemu')==0:
621                 self._driver='kqemu module loaded'
622             # kvm might be loaded without vkm_intel (we dont have AMD)
623             elif module.find('kvm_intel')==0:
624                 self._driver='kvm_intel module loaded'
625         ########## find out running pids
626         pids=self.backquote_ssh(['pgrep','qemu'])
627         if not pids: return
628         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
629         ps_lines = self.backquote_ssh (command).split("\n")
630         for line in ps_lines:
631             if not line.strip() or line.find('PID') >=0 : continue
632             m=QemuBox.matcher.match(line)
633             if m: 
634                 self.add_node (m.group('nodename'),m.group('pid'))
635                 continue
636             header('QemuBox.sense: command %r returned line that failed to match'%command)
637             header(">>%s<<"%line)
638         ########## retrieve alive instances and map to build
639         live_builds=[]
640         command=['grep','.','*/*/qemu.pid','/dev/null']
641         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
642         for pid_line in pid_lines:
643             if not pid_line.strip(): continue
644             # expect <build>/<nodename>/qemu.pid:<pid>pid
645             try:
646                 (buildname,nodename,tail)=pid_line.split('/')
647                 (_,pid)=tail.split(':')
648                 q=self.qemu_instance_by_pid (pid)
649                 if not q: continue
650                 q.set_buildname(buildname)
651                 live_builds.append(buildname)
652             except: print 'WARNING, could not parse pid line',pid_line
653         # retrieve timestamps
654         if not live_builds: return
655         command=   ['grep','.']
656         command += ['%s/*/timestamp'%b for b in live_builds]
657         command += ['/dev/null']
658         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
659         for ts_line in ts_lines:
660             if not ts_line.strip(): continue
661             # expect <build>/<nodename>/timestamp:<timestamp>
662             try:
663                 (buildname,nodename,tail)=ts_line.split('/')
664                 nodename=nodename.replace('qemu-','')
665                 (_,timestamp)=tail.split(':')
666                 timestamp=int(timestamp)
667                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
668                 if not q: 
669                     print 'WARNING zombie qemu',self.hostname,ts_line
670                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
671                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
672                     continue
673                 q.set_timestamp(timestamp)
674             except:  print 'WARNING, could not parse ts line',ts_line
675
676 ####################
677 class TestInstance:
678     def __init__ (self, buildname, pid=0):
679         self.pids=[]
680         if pid!=0: self.pid.append(pid)
681         self.buildname=buildname
682         # latest trace line
683         self.trace=''
684         # has a KO test
685         self.broken_steps=[]
686         self.timestamp = 0
687
688     def set_timestamp (self,timestamp): self.timestamp=timestamp
689     def set_now (self): self.timestamp=int(time.time())
690     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
691
692
693     def add_pid (self,pid):
694         self.pids.append(pid)
695     def set_broken (self, plcindex, step): 
696         self.broken_steps.append ( (plcindex, step,) )
697
698     def line (self):
699         double='=='
700         if self.pids: double='*'+double[1]
701         if self.broken_steps: double=double[0]+'B'
702         msg = " %s %s =="%(double,self.buildname)
703         if not self.pids:       pass
704         elif len(self.pids)==1: msg += " (pid=%s)"%self.pids[0]
705         else:                   msg += " !!!pids=%s!!!"%self.pids
706         msg += " @%s"%self.pretty_timestamp()
707         if self.broken_steps:
708             msg += " [BROKEN=" + " ".join( [ "%s@%s"%(s,i) for (i,s) in self.broken_steps ] ) + "]"
709         return msg
710
711 class TestBox (Box):
712     def __init__ (self,hostname):
713         Box.__init__(self,hostname)
714         self.starting_ips=[]
715         self.test_instances=[]
716
717     def reboot (self, options):
718         # can't reboot a vserver VM
719         self.run_ssh (['pkill','run_log'],"Terminating current runs",
720                       dry_run=options.dry_run)
721         self.run_ssh (['rm','-f',Starting.location],"Cleaning %s"%Starting.location,
722                       dry_run=options.dry_run)
723
724     def get_test (self, buildname):
725         for i in self.test_instances:
726             if i.buildname==buildname: return i
727
728     # we scan ALL remaining test results, even the ones not running
729     def add_timestamp (self, buildname, timestamp):
730         i=self.get_test(buildname)
731         if i:   
732             i.set_timestamp(timestamp)
733         else:   
734             i=TestInstance(buildname,0)
735             i.set_timestamp(timestamp)
736             self.test_instances.append(i)
737
738     def add_running_test (self, pid, buildname):
739         i=self.get_test(buildname)
740         if not i:
741             self.test_instances.append (TestInstance (buildname,pid))
742             return
743         if i.pids:
744             print "WARNING: 2 concurrent tests run on same build %s"%buildname
745         i.add_pid (pid)
746
747     def add_broken (self, buildname, plcindex, step):
748         i=self.get_test(buildname)
749         if not i:
750             i=TestInstance(buildname)
751             self.test_instances.append(i)
752         i.set_broken(plcindex, step)
753
754     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
755     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
756     def sense (self, options):
757         print 't',
758         self.sense_uptime()
759         self.starting_ips=[x for x in self.backquote_ssh(['cat',Starting.location], trash_err=True).strip().split('\n') if x]
760
761         # scan timestamps on all tests
762         # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
763         # xxx would make sense above too
764         command=['bash','-c',"grep . /root/*/timestamp /dev/null"]
765         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
766         for ts_line in ts_lines:
767             if not ts_line.strip(): continue
768             # expect /root/<buildname>/timestamp:<timestamp>
769             try:
770                 (ts_file,timestamp)=ts_line.split(':')
771                 ts_file=os.path.dirname(ts_file)
772                 buildname=os.path.basename(ts_file)
773                 timestamp=int(timestamp)
774                 t=self.add_timestamp(buildname,timestamp)
775             except:  print 'WARNING, could not parse ts line',ts_line
776
777         command=['bash','-c',"grep KO /root/*/logs/trace-* /dev/null" ]
778         trace_lines=self.backquote_ssh (command).split('\n')
779         for line in trace_lines:
780             if not line.strip(): continue
781             m=TestBox.matcher_grep.match(line)
782             if m: 
783                 buildname=m.group('buildname')
784                 plcindex=m.group('plcindex')
785                 step=m.group('step')
786                 self.add_broken(buildname,plcindex, step)
787                 continue
788             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
789             header(">>%s<<"%line)
790
791         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
792         if not pids: return
793         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
794         ps_lines=self.backquote_ssh (command).split('\n')
795         for line in ps_lines:
796             if not line.strip(): continue
797             m=TestBox.matcher_proc.match(line)
798             if m: 
799                 pid=m.group('pid')
800                 buildname=m.group('buildname')
801                 self.add_running_test(pid, buildname)
802                 continue
803             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
804             header(">>%s<<"%line)
805         
806         
807     def line (self):
808         return "%s (%s)"%(self.hostname,self.uptime())
809
810     def list (self):
811         if not self.test_instances:
812             header ("No known tests on %s"%self.line())
813         else:
814             header ("Known tests on %s"%self.line())
815             self.test_instances.sort(timestamp_sort)
816             for i in self.test_instances: print i.line()
817         if self.starting_ips:
818             header ("Starting IP addresses on %s"%self.line())
819             self.starting_ips.sort()
820             for starting in self.starting_ips: print starting
821
822 ############################################################
823 class Options: pass
824
825 class Substrate:
826
827     def __init__ (self):
828         self.options=Options()
829         self.options.dry_run=False
830         self.options.verbose=False
831         self.options.reboot=False
832         self.options.soft=False
833         self.test_box = TestBox (self.test_box_spec())
834         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
835         self.plc_boxes = [ PlcBox (h,m) for (h,m) in self.plc_boxes_spec ()]
836         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
837         self.default_boxes = self.plc_boxes + self.qemu_boxes
838         self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
839         self._sensed=False
840
841         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs",self)
842         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes",self)
843
844     def fqdn (self, hostname):
845         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
846         return hostname
847
848     # return True if actual sensing takes place
849     def sense (self,force=False):
850         if self._sensed and not force: return False
851         print 'Sensing local substrate...',
852         for b in self.default_boxes: b.sense(self.options)
853         print 'Done'
854         self._sensed=True
855         return True
856
857     def list (self):
858         for b in self.default_boxes:
859             b.list()
860
861     def add_dummy_plc (self, plc_boxname, plcname):
862         for pb in self.plc_boxes:
863             if pb.hostname==plc_boxname:
864                 pb.add_dummy(plcname)
865                 return True
866     def add_dummy_qemu (self, qemu_boxname, qemuname):
867         for qb in self.qemu_boxes:
868             if qb.hostname==qemu_boxname:
869                 qb.add_dummy(qemuname)
870                 return True
871
872     def add_starting_dummy (self, bname, vname):
873         return self.add_dummy_plc (bname, vname) or self.add_dummy_qemu (bname, vname)
874
875     ########## 
876     def provision (self,plcs,options):
877         try:
878             # attach each plc to a plc box and an IP address
879             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
880             # attach each node/qemu to a qemu box with an IP address
881             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
882             # update the SFA spec accordingly
883             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
884             self.list()
885             return plcs
886         except Exception, e:
887             print '* Could not provision this test on current substrate','--',e,'--','exiting'
888             traceback.print_exc()
889             sys.exit(1)
890
891     # it is expected that a couple of options like ips_bplc and ips_vplc 
892     # are set or unset together
893     @staticmethod
894     def check_options (x,y):
895         if not x and not y: return True
896         return len(x)==len(y)
897
898     # find an available plc box (or make space)
899     # and a free IP address (using options if present)
900     def provision_plc (self, plc, options):
901         
902         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
903
904         #### let's find an IP address for that plc
905         # look in options 
906         if options.ips_vplc:
907             # this is a rerun
908             # we don't check anything here, 
909             # it is the caller's responsability to cleanup and make sure this makes sense
910             plc_boxname = options.ips_bplc.pop()
911             vplc_hostname=options.ips_vplc.pop()
912         else:
913             if self.sense(): self.list()
914             plc_boxname=None
915             vplc_hostname=None
916             # try to find an available IP 
917             self.vplc_pool.sense()
918             couple=self.vplc_pool.next_free()
919             if couple:
920                 (vplc_hostname,unused)=couple
921             #### we need to find one plc box that still has a slot
922             max_free=0
923             # use the box that has max free spots for load balancing
924             for pb in self.plc_boxes:
925                 free=pb.free_spots()
926                 if free>max_free:
927                     plc_boxname=pb.hostname
928                     max_free=free
929             # if there's no available slot in the plc_boxes, or we need a free IP address
930             # make space by killing the oldest running instance
931             if not plc_boxname or not vplc_hostname:
932                 # find the oldest of all our instances
933                 all_plc_instances=reduce(lambda x, y: x+y, 
934                                          [ pb.plc_instances for pb in self.plc_boxes ],
935                                          [])
936                 all_plc_instances.sort(timestamp_sort)
937                 try:
938                     plc_instance_to_kill=all_plc_instances[0]
939                 except:
940                     msg=""
941                     if not plc_boxname: msg += " PLC boxes are full"
942                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
943                     raise Exception,"Could not make space for a PLC instance:"+msg
944                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
945                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
946                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
947                                                                   freed_plc_boxname)
948                 plc_instance_to_kill.kill()
949                 # use this new plcbox if that was the problem
950                 if not plc_boxname:
951                     plc_boxname=freed_plc_boxname
952                 # ditto for the IP address
953                 if not vplc_hostname:
954                     vplc_hostname=freed_vplc_hostname
955                     # record in pool as mine
956                     self.vplc_pool.set_mine(vplc_hostname)
957
958         # 
959         self.add_dummy_plc(plc_boxname,plc['name'])
960         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
961         self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
962
963         #### compute a helpful vserver name
964         # remove domain in hostname
965         vplc_short = short_hostname(vplc_hostname)
966         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
967         plc_name = "%s_%s"%(plc['name'],vplc_short)
968
969         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
970                           (plc['name'],plc_boxname,vplc_hostname,vservername))
971
972         #### apply in the plc_spec
973         # # informative
974         # label=options.personality.replace("linux","")
975         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
976                                    # 'name':'%s-'+label,
977                                    'name': plc_name,
978                                    'vservername':vservername,
979                                    'vserverip':vplc_ip,
980                                    'PLC_DB_HOST':vplc_hostname,
981                                    'PLC_API_HOST':vplc_hostname,
982                                    'PLC_BOOT_HOST':vplc_hostname,
983                                    'PLC_WWW_HOST':vplc_hostname,
984                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
985                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
986                                    } ) ]
987                   }
988
989
990         # mappers only work on a list of plcs
991         return TestMapper([plc],options).map(mapper)[0]
992
993     ##########
994     def provision_qemus (self, plc, options):
995
996         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
997
998         test_mapper = TestMapper ([plc], options)
999         nodenames = test_mapper.node_names()
1000         maps=[]
1001         for nodename in nodenames:
1002
1003             if options.ips_vnode:
1004                 # as above, it's a rerun, take it for granted
1005                 qemu_boxname=options.ips_bnode.pop()
1006                 vnode_hostname=options.ips_vnode.pop()
1007             else:
1008                 if self.sense(): self.list()
1009                 qemu_boxname=None
1010                 vnode_hostname=None
1011                 # try to find an available IP 
1012                 self.vnode_pool.sense()
1013                 couple=self.vnode_pool.next_free()
1014                 if couple:
1015                     (vnode_hostname,unused)=couple
1016                 # find a physical box
1017                 max_free=0
1018                 # use the box that has max free spots for load balancing
1019                 for qb in self.qemu_boxes:
1020                     free=qb.free_spots()
1021                     if free>max_free:
1022                         qemu_boxname=qb.hostname
1023                         max_free=free
1024                 # if we miss the box or the IP, kill the oldest instance
1025                 if not qemu_boxname or not vnode_hostname:
1026                 # find the oldest of all our instances
1027                     all_qemu_instances=reduce(lambda x, y: x+y, 
1028                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
1029                                               [])
1030                     all_qemu_instances.sort(timestamp_sort)
1031                     try:
1032                         qemu_instance_to_kill=all_qemu_instances[0]
1033                     except:
1034                         msg=""
1035                         if not qemu_boxname: msg += " QEMU boxes are full"
1036                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
1037                         raise Exception,"Could not make space for a QEMU instance:"+msg
1038                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
1039                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
1040                     # kill it
1041                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
1042                                                                    freed_qemu_boxname)
1043                     qemu_instance_to_kill.kill()
1044                     # use these freed resources where needed
1045                     if not qemu_boxname:
1046                         qemu_boxname=freed_qemu_boxname
1047                     if not vnode_hostname:
1048                         vnode_hostname=freed_vnode_hostname
1049                         self.vnode_pool.set_mine(vnode_hostname)
1050
1051             self.add_dummy_qemu (qemu_boxname,vnode_hostname)
1052             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
1053             ip=self.vnode_pool.get_ip (vnode_hostname)
1054             self.vnode_pool.add_starting(vnode_hostname,qemu_boxname)
1055
1056             vnode_fqdn = self.fqdn(vnode_hostname)
1057             nodemap={'host_box':qemu_boxname,
1058                      'node_fields:hostname':vnode_fqdn,
1059                      'interface_fields:ip':ip, 
1060                      'interface_fields:mac':mac,
1061                      }
1062             nodemap.update(self.network_settings())
1063             maps.append ( (nodename, nodemap) )
1064
1065             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
1066                              (nodename,qemu_boxname,vnode_hostname,mac))
1067
1068         return test_mapper.map({'node':maps})[0]
1069
1070     def localize_sfa_rspec (self,plc,options):
1071        
1072         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
1073         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
1074         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
1075         plc['sfa']['SFA_DB_HOST'] = plc['PLC_DB_HOST']
1076         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
1077         return plc
1078
1079     #################### release:
1080     def release (self,options):
1081         self.vplc_pool.release_my_starting()
1082         self.vnode_pool.release_my_starting()
1083         pass
1084
1085     #################### show results for interactive mode
1086     def get_box (self,boxname):
1087         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1088             if b.shortname()==boxname:
1089                 return b
1090         print "Could not find box %s"%boxname
1091         return None
1092
1093     def list_boxes(self,box_or_names):
1094         print 'Sensing',
1095         for box in box_or_names:
1096             if not isinstance(box,Box): box=self.get_box(box)
1097             if not box: continue
1098             box.sense(self.options)
1099         print 'Done'
1100         for box in box_or_names:
1101             if not isinstance(box,Box): box=self.get_box(box)
1102             if not box: continue
1103             box.list()
1104
1105     def reboot_boxes(self,box_or_names):
1106         for box in box_or_names:
1107             if not isinstance(box,Box): box=self.get_box(box)
1108             if not box: continue
1109             box.reboot(self.options)
1110
1111     ####################
1112     # can be run as a utility to manage the local infrastructure
1113     def main (self):
1114         parser=OptionParser()
1115         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1116                            help='reboot mode (use shutdown -r)')
1117         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1118                            help='soft mode for reboot (vserver stop or kill qemus)')
1119         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1120                            help='add test box') 
1121         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1122                            help='add build boxes')
1123         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1124                            help='add plc boxes')
1125         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1126                            help='add qemu boxes') 
1127         parser.add_option ('-a',"--all",action='store_true',dest='all',default=False,
1128                            help='address all known  boxes, like -b -t -p -q')
1129         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1130                            help='verbose mode')
1131         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1132                            help='dry run mode')
1133         (self.options,args)=parser.parse_args()
1134
1135         boxes=args
1136         if self.options.testbox: boxes += [self.test_box]
1137         if self.options.builds: boxes += self.build_boxes
1138         if self.options.plcs: boxes += self.plc_boxes
1139         if self.options.qemus: boxes += self.qemu_boxes
1140         if self.options.all: boxes += self.all_boxes
1141         
1142         # default scope is -b -p -q
1143         if not boxes:
1144             boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes
1145
1146         if self.options.reboot: self.reboot_boxes (boxes)
1147         else:                   self.list_boxes (boxes)