again
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72
73 ####################
74 # the place were other test instances tell about their not-yet-started
75 # instances, that go undetected through sensing
76 class Starting:
77
78     location='/root/starting'
79     def __init__ (self):
80         self.tuples=[]
81
82     def load (self):
83         try:    self.tuples=[line.strip().split('@') 
84                              for line in file(Starting.location).readlines()]
85         except: self.tuples=[]
86
87     def vnames (self) : 
88         self.load()
89         return [ x for (x,_) in self.tuples ]
90
91     def add (self, vname, bname):
92         if not vname in self.vnames():
93             file(Starting.location,'a').write("%s@%s\n"%(vname,bname))
94             
95     def delete_vname (self, vname):
96         self.load()
97         if vname in self.vnames():
98             f=file(Starting.location,'w')
99             for (v,b) in self.tuples: 
100                 if v != vname: f.write("%s@%s\n"%(v,b))
101             f.close()
102     
103 ####################
104 # pool class
105 # allows to pick an available IP among a pool
106 # input is expressed as a list of tuples (hostname,ip,user_data)
107 # that can be searched iteratively for a free slot
108 # e.g.
109 # pool = [ (hostname1,user_data1),  
110 #          (hostname2,user_data2),  
111 #          (hostname3,user_data2),  
112 #          (hostname4,user_data4) ]
113 # assuming that ip1 and ip3 are taken (pingable), then we'd get
114 # pool=Pool(pool)
115 # pool.next_free() -> entry2
116 # pool.next_free() -> entry4
117 # pool.next_free() -> None
118 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
119
120 class PoolItem:
121     def __init__ (self,hostname,userdata):
122         self.hostname=hostname
123         self.userdata=userdata
124         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
125         # 'mine' is for our own stuff, 'starting' from the concurrent tests
126         self.status=None
127         self.ip=None
128
129     def line(self):
130         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
131
132     def char (self):
133         if   self.status==None:       return '?'
134         elif self.status=='busy':     return '+'
135         elif self.status=='free':     return '-'
136         elif self.status=='mine':     return 'M'
137         elif self.status=='starting': return 'S'
138
139     def get_ip(self):
140         if self.ip: return self.ip
141         ip=socket.gethostbyname(self.hostname)
142         self.ip=ip
143         return ip
144
145 class Pool:
146
147     def __init__ (self, tuples,message, substrate):
148         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
149         self.message=message
150         # where to send notifications upon load_starting
151         self.substrate=substrate
152
153     def list (self):
154         for i in self.pool_items: print i.line()
155
156     def line (self):
157         line=self.message
158         for i in self.pool_items: line += ' ' + i.char()
159         return line
160
161     def _item (self, hostname):
162         for i in self.pool_items: 
163             if i.hostname==hostname: return i
164         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
165
166     def retrieve_userdata (self, hostname): 
167         return self._item(hostname).userdata
168
169     def get_ip (self, hostname):
170         try:    return self._item(hostname).get_ip()
171         except: return socket.gethostbyname(hostname)
172         
173     def set_mine (self, hostname):
174         try:
175             self._item(hostname).status='mine'
176         except:
177             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
178
179     def next_free (self):
180         for i in self.pool_items:
181             if i.status == 'free':
182                 i.status='mine'
183                 return (i.hostname,i.userdata)
184         return None
185
186     ####################
187     # we have a starting instance of our own
188     def add_starting (self, vname, bname):
189         Starting().add(vname,bname)
190         for i in self.pool_items:
191             if i.hostname==vname: i.status='mine'
192
193     # load the starting instances from the common file
194     # remember that might be ours
195     # return the list of (vname,bname) that are not ours
196     def load_starting (self):
197         starting=Starting()
198         starting.load()
199         new_tuples=[]
200         for (v,b) in starting.tuples:
201             for i in self.pool_items:
202                 if i.hostname==v and i.status=='free':
203                     i.status='starting'
204                     new_tuples.append( (v,b,) )
205         return new_tuples
206
207     def release_my_starting (self):
208         for i in self.pool_items:
209             if i.status=='mine':
210                 Starting().delete_vname (i.hostname)
211                 i.status=None
212
213
214     ##########
215     def _sense (self):
216         for item in self.pool_items:
217             if item.status is not None: 
218                 print item.char(),
219                 continue
220             if self.check_ping (item.hostname): 
221                 item.status='busy'
222                 print '*',
223             else:
224                 item.status='free'
225                 print '.',
226     
227     def sense (self):
228         print 'Sensing IP pool',self.message,
229         self._sense()
230         print 'Done'
231         for (vname,bname) in self.load_starting():
232             self.substrate.add_starting_dummy (bname, vname)
233         print 'After starting: IP pool'
234         print self.line()
235     # OS-dependent ping option (support for macos, for convenience)
236     ping_timeout_option = None
237     # returns True when a given hostname/ip responds to ping
238     def check_ping (self,hostname):
239         if not Pool.ping_timeout_option:
240             (status,osname) = commands.getstatusoutput("uname -s")
241             if status != 0:
242                 raise Exception, "TestPool: Cannot figure your OS name"
243             if osname == "Linux":
244                 Pool.ping_timeout_option="-w"
245             elif osname == "Darwin":
246                 Pool.ping_timeout_option="-t"
247
248         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
249         (status,output) = commands.getstatusoutput(command)
250         return status == 0
251
252 ####################
253 class Box:
254     def __init__ (self,hostname):
255         self.hostname=hostname
256         self._probed=None
257     def shortname (self):
258         return short_hostname(self.hostname)
259     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
260     def reboot (self, options):
261         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
262                             dry_run=options.dry_run)
263
264     def uptime(self):
265         if hasattr(self,'_uptime') and self._uptime: return self._uptime
266         return '*undef* uptime'
267     def sense_uptime (self):
268         command=['uptime']
269         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
270         if not self._uptime: self._uptime='unreachable'
271
272     def run(self,argv,message=None,trash_err=False,dry_run=False):
273         if dry_run:
274             print 'DRY_RUN:',
275             print " ".join(argv)
276             return 0
277         else:
278             header(message)
279             if not trash_err:
280                 return subprocess.call(argv)
281             else:
282                 return subprocess.call(argv,stderr=file('/dev/null','w'))
283                 
284     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
285         ssh_argv = self.test_ssh().actual_argv(argv)
286         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
287         if result!=0:
288             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
289         return result
290
291     def backquote (self, argv, trash_err=False):
292         # print 'running backquote',argv
293         if not trash_err:
294             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
295         else:
296             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
297         return result
298
299     def probe (self):
300         if self._probed is not None: return self._probed
301         # first probe the ssh link
302         probe_argv=self.test_ssh().actual_argv(['hostname'])
303         self._probed=self.backquote ( probe_argv, trash_err=True )
304         if not self._probed: print "root@%s unreachable"%self.hostname
305         return self._probed
306
307     # use argv=['bash','-c',"the command line"]
308     # if you have any shell-expanded arguments like *
309     # and if there's any chance the command is adressed to the local host
310     def backquote_ssh (self, argv, trash_err=False):
311         if not self.probe(): return ''
312         return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
313
314 ############################################################
315 class BuildInstance:
316     def __init__ (self, buildname, pid, buildbox):
317         self.buildname=buildname
318         self.buildbox=buildbox
319         self.pids=[pid]
320
321     def add_pid(self,pid):
322         self.pids.append(pid)
323
324     def line (self):
325         return "== %s == (pids=%r)"%(self.buildname,self.pids)
326
327 class BuildBox (Box):
328     def __init__ (self,hostname):
329         Box.__init__(self,hostname)
330         self.build_instances=[]
331
332     def add_build (self,buildname,pid):
333         for build in self.build_instances:
334             if build.buildname==buildname: 
335                 build.add_pid(pid)
336                 return
337         self.build_instances.append(BuildInstance(buildname, pid, self))
338
339     def list(self):
340         if not self.build_instances: 
341             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
342         else:
343             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
344             for b in self.build_instances: 
345                 header (b.line(),banner=False)
346
347     def reboot (self, options):
348         if not options.soft:
349             self.reboot(options)
350         else:
351             command=['pkill','vbuild']
352             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
353
354     # inspect box and find currently running builds
355     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
356     matcher_building_vm=re.compile("\s*(?P<pid>[0-9]+).*init-vserver.*-i\s+eth.\s+(?P<buildname>[^\s]+)\s*\Z")
357     def sense(self, options):
358         print 'b',
359         self.sense_uptime()
360         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
361         if not pids: return
362         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
363         ps_lines=self.backquote_ssh (command).split('\n')
364         for line in ps_lines:
365             if not line.strip() or line.find('PID')>=0: continue
366             m=BuildBox.matcher.match(line)
367             if m: 
368                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
369                 buildname=m.group('buildname').replace('@DATE@',date)
370                 self.add_build (buildname,m.group('pid'))
371                 continue
372             m=BuildBox.matcher_building_vm.match(line)
373             if m: 
374                 # buildname is expansed here
375                 self.add_build (buildname,m.group('pid'))
376                 continue
377             header('BuildBox.sense: command %r returned line that failed to match'%command)
378             header(">>%s<<"%line)
379
380 ############################################################
381 class PlcInstance:
382     def __init__ (self, plcbox):
383         self.plc_box=plcbox
384         # unknown yet
385         self.timestamp=0
386         
387     def set_timestamp (self,timestamp): self.timestamp=timestamp
388     def set_now (self): self.timestamp=int(time.time())
389     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
390
391 class PlcVsInstance (PlcInstance):
392     def __init__ (self, plcbox, vservername, ctxid):
393         PlcInstance.__init__(self,plcbox)
394         self.vservername=vservername
395         self.ctxid=ctxid
396
397     def vplcname (self):
398         return self.vservername.split('-')[-1]
399     def buildname (self):
400         return self.vservername.rsplit('-',2)[0]
401
402     def line (self):
403         msg="== %s =="%(self.vplcname())
404         msg += " [=%s]"%self.vservername
405         if self.ctxid==0:  msg+=" not (yet?) running"
406         else:              msg+=" (ctx=%s)"%self.ctxid     
407         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
408         else:              msg += " *unknown timestamp*"
409         return msg
410
411     def kill (self):
412         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
413         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
414         self.plc_box.forget(self)
415
416 class PlcLxcInstance (PlcInstance):
417     # does lxc have a context id of any kind ?
418     def __init__ (self, plcbox, lxcname):
419         PlcInstance.__init__(self, plcbox)
420         self.lxcname = lxcname
421
422     def kill (self):
423         print "TODO PlcLxcInstance.kill"
424
425     def line (self):
426         return "TODO PlcLxcInstance.line"
427
428 ##########
429 class PlcBox (Box):
430     def __init__ (self, hostname, max_plcs):
431         Box.__init__(self,hostname)
432         self.plc_instances=[]
433         self.max_plcs=max_plcs
434
435 class PlcVsBox (PlcBox):
436
437     def add_vserver (self,vservername,ctxid):
438         for plc in self.plc_instances:
439             if plc.vservername==vservername: 
440                 header("WARNING, duplicate myplc %s running on %s"%\
441                            (vservername,self.hostname),banner=False)
442                 return
443         self.plc_instances.append(PlcVsInstance(self,vservername,ctxid))
444     
445     def forget (self, plc_instance):
446         self.plc_instances.remove(plc_instance)
447
448     # fill one slot even though this one is not started yet
449     def add_dummy (self, plcname):
450         dummy=PlcVsInstance(self,'dummy_'+plcname,0)
451         dummy.set_now()
452         self.plc_instances.append(dummy)
453
454     def line(self): 
455         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_plcs,self.free_slots(),self.uname())
456         return msg
457         
458     def list(self):
459         if not self.plc_instances: 
460             header ('No vserver running on %s'%(self.line()))
461         else:
462             header ("Active plc VMs on %s"%self.line())
463             self.plc_instances.sort(timestamp_sort)
464             for p in self.plc_instances: 
465                 header (p.line(),banner=False)
466
467     def free_slots (self):
468         return self.max_plcs - len(self.plc_instances)
469
470     def uname(self):
471         if hasattr(self,'_uname') and self._uname: return self._uname
472         return '*undef* uname'
473
474     def plc_instance_by_vservername (self, vservername):
475         for p in self.plc_instances:
476             if p.vservername==vservername: return p
477         return None
478
479     def reboot (self, options):
480         if not options.soft:
481             self.reboot(options)
482         else:
483             self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
484                          dry_run=options.dry_run)
485
486     def sense (self, options):
487         print 'p',
488         self._uname=self.backquote_ssh(['uname','-r']).strip()
489         # try to find fullname (vserver_stat truncates to a ridiculously short name)
490         # fetch the contexts for all vservers on that box
491         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
492         context_map=self.backquote_ssh (map_command)
493         # at this point we have a set of lines like
494         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
495         ctx_dict={}
496         for map_line in context_map.split("\n"):
497             if not map_line: continue
498             [path,xid] = map_line.split(':')
499             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
500         # at this point ctx_id maps context id to vservername
501
502         command=['vserver-stat']
503         vserver_stat = self.backquote_ssh (command)
504         for vserver_line in vserver_stat.split("\n"):
505             if not vserver_line: continue
506             context=vserver_line.split()[0]
507             if context=="CTX": continue
508             try:
509                 longname=ctx_dict[context]
510                 self.add_vserver(longname,context)
511             except:
512                 print 'WARNING: found ctx %s in vserver_stat but was unable to figure a corresp. vserver'%context
513
514         # scan timestamps 
515         running_vsnames = [ i.vservername for i in self.plc_instances ]
516         command=   ['grep','.']
517         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
518         command += ['/dev/null']
519         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
520         for ts_line in ts_lines:
521             if not ts_line.strip(): continue
522             # expect /vservers/<vservername>.timestamp:<timestamp>
523             try:
524                 (ts_file,timestamp)=ts_line.split(':')
525                 ts_file=os.path.basename(ts_file)
526                 (vservername,_)=os.path.splitext(ts_file)
527                 timestamp=int(timestamp)
528                 p=self.plc_instance_by_vservername(vservername)
529                 if not p: 
530                     print 'WARNING zombie plc',self.hostname,ts_line
531                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
532                     continue
533                 p.set_timestamp(timestamp)
534             except:  print 'WARNING, could not parse ts line',ts_line
535         
536
537 class PlcLxcBox (PlcBox):
538
539     def add_dummy (self, plcname):
540         print "TODO PlcLxcBox.add_dummy"
541
542     def free_slots (self):
543         print "TODO PlcLxcBox.free_slots"
544
545     def list (self):
546         print "TODO PlcLxcBox.list"
547
548     def reboot (self, options):
549         print "TODO PlcLxcBox.reboot"
550
551     def sense (self, options):
552         print "TODO PlcLxcBox.sense"
553
554
555 ############################################################
556 class QemuInstance: 
557     def __init__ (self, nodename, pid, qemubox):
558         self.nodename=nodename
559         self.pid=pid
560         self.qemu_box=qemubox
561         # not known yet
562         self.buildname=None
563         self.timestamp=0
564         
565     def set_buildname (self,buildname): self.buildname=buildname
566     def set_timestamp (self,timestamp): self.timestamp=timestamp
567     def set_now (self): self.timestamp=int(time.time())
568     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
569     
570     def line (self):
571         msg = "== %s =="%(short_hostname(self.nodename))
572         msg += " [=%s]"%self.buildname
573         if self.pid:       msg += " (pid=%s)"%self.pid
574         else:              msg += " not (yet?) running"
575         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
576         else:              msg += " *unknown timestamp*"
577         return msg
578     
579     def kill(self):
580         if self.pid==0: 
581             print "cannot kill qemu %s with pid==0"%self.nodename
582             return
583         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
584         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
585         self.qemu_box.forget(self)
586
587
588 class QemuBox (Box):
589     def __init__ (self, hostname, max_qemus):
590         Box.__init__(self,hostname)
591         self.qemu_instances=[]
592         self.max_qemus=max_qemus
593
594     def add_node (self,nodename,pid):
595         for qemu in self.qemu_instances:
596             if qemu.nodename==nodename: 
597                 header("WARNING, duplicate qemu %s running on %s"%\
598                            (nodename,self.hostname), banner=False)
599                 return
600         self.qemu_instances.append(QemuInstance(nodename,pid,self))
601
602     def forget (self, qemu_instance):
603         self.qemu_instances.remove(qemu_instance)
604
605     # fill one slot even though this one is not started yet
606     def add_dummy (self, nodename):
607         dummy=QemuInstance('dummy_'+nodename,0,self)
608         dummy.set_now()
609         self.qemu_instances.append(dummy)
610
611     def line (self):
612         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_slots(),self.driver())
613         return msg
614
615     def list(self):
616         if not self.qemu_instances: 
617             header ('No qemu process on %s'%(self.line()))
618         else:
619             header ("Active qemu processes on %s"%(self.line()))
620             self.qemu_instances.sort(timestamp_sort)
621             for q in self.qemu_instances: 
622                 header (q.line(),banner=False)
623
624     def free_slots (self):
625         return self.max_qemus - len(self.qemu_instances)
626
627     def driver(self):
628         if hasattr(self,'_driver') and self._driver: return self._driver
629         return '*undef* driver'
630
631     def qemu_instance_by_pid (self,pid):
632         for q in self.qemu_instances:
633             if q.pid==pid: return q
634         return None
635
636     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
637         for q in self.qemu_instances:
638             if q.nodename==nodename and q.buildname==buildname:
639                 return q
640         return None
641
642     def reboot (self, options):
643         if not options.soft:
644             self.reboot(options)
645         else:
646             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
647                          dry_run=options.dry_run)
648
649     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
650     def sense(self, options):
651         print 'q',
652         modules=self.backquote_ssh(['lsmod']).split('\n')
653         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
654         for module in modules:
655             if module.find('kqemu')==0:
656                 self._driver='kqemu module loaded'
657             # kvm might be loaded without vkm_intel (we dont have AMD)
658             elif module.find('kvm_intel')==0:
659                 self._driver='kvm_intel module loaded'
660         ########## find out running pids
661         pids=self.backquote_ssh(['pgrep','qemu'])
662         if not pids: return
663         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
664         ps_lines = self.backquote_ssh (command).split("\n")
665         for line in ps_lines:
666             if not line.strip() or line.find('PID') >=0 : continue
667             m=QemuBox.matcher.match(line)
668             if m: 
669                 self.add_node (m.group('nodename'),m.group('pid'))
670                 continue
671             header('QemuBox.sense: command %r returned line that failed to match'%command)
672             header(">>%s<<"%line)
673         ########## retrieve alive instances and map to build
674         live_builds=[]
675         command=['grep','.','*/*/qemu.pid','/dev/null']
676         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
677         for pid_line in pid_lines:
678             if not pid_line.strip(): continue
679             # expect <build>/<nodename>/qemu.pid:<pid>pid
680             try:
681                 (buildname,nodename,tail)=pid_line.split('/')
682                 (_,pid)=tail.split(':')
683                 q=self.qemu_instance_by_pid (pid)
684                 if not q: continue
685                 q.set_buildname(buildname)
686                 live_builds.append(buildname)
687             except: print 'WARNING, could not parse pid line',pid_line
688         # retrieve timestamps
689         if not live_builds: return
690         command=   ['grep','.']
691         command += ['%s/*/timestamp'%b for b in live_builds]
692         command += ['/dev/null']
693         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
694         for ts_line in ts_lines:
695             if not ts_line.strip(): continue
696             # expect <build>/<nodename>/timestamp:<timestamp>
697             try:
698                 (buildname,nodename,tail)=ts_line.split('/')
699                 nodename=nodename.replace('qemu-','')
700                 (_,timestamp)=tail.split(':')
701                 timestamp=int(timestamp)
702                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
703                 if not q: 
704                     print 'WARNING zombie qemu',self.hostname,ts_line
705                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
706                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
707                     continue
708                 q.set_timestamp(timestamp)
709             except:  print 'WARNING, could not parse ts line',ts_line
710
711 ####################
712 class TestInstance:
713     def __init__ (self, buildname, pid=0):
714         self.pids=[]
715         if pid!=0: self.pid.append(pid)
716         self.buildname=buildname
717         # latest trace line
718         self.trace=''
719         # has a KO test
720         self.broken_steps=[]
721         self.timestamp = 0
722
723     def set_timestamp (self,timestamp): self.timestamp=timestamp
724     def set_now (self): self.timestamp=int(time.time())
725     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
726
727
728     def add_pid (self,pid):
729         self.pids.append(pid)
730     def set_broken (self, plcindex, step): 
731         self.broken_steps.append ( (plcindex, step,) )
732
733     def line (self):
734         double='=='
735         if self.pids: double='*'+double[1]
736         if self.broken_steps: double=double[0]+'B'
737         msg = " %s %s =="%(double,self.buildname)
738         if not self.pids:       pass
739         elif len(self.pids)==1: msg += " (pid=%s)"%self.pids[0]
740         else:                   msg += " !!!pids=%s!!!"%self.pids
741         msg += " @%s"%self.pretty_timestamp()
742         if self.broken_steps:
743             msg += " [BROKEN=" + " ".join( [ "%s@%s"%(s,i) for (i,s) in self.broken_steps ] ) + "]"
744         return msg
745
746 class TestBox (Box):
747     def __init__ (self,hostname):
748         Box.__init__(self,hostname)
749         self.starting_ips=[]
750         self.test_instances=[]
751
752     def reboot (self, options):
753         # can't reboot a vserver VM
754         self.run_ssh (['pkill','run_log'],"Terminating current runs",
755                       dry_run=options.dry_run)
756         self.run_ssh (['rm','-f',Starting.location],"Cleaning %s"%Starting.location,
757                       dry_run=options.dry_run)
758
759     def get_test (self, buildname):
760         for i in self.test_instances:
761             if i.buildname==buildname: return i
762
763     # we scan ALL remaining test results, even the ones not running
764     def add_timestamp (self, buildname, timestamp):
765         i=self.get_test(buildname)
766         if i:   
767             i.set_timestamp(timestamp)
768         else:   
769             i=TestInstance(buildname,0)
770             i.set_timestamp(timestamp)
771             self.test_instances.append(i)
772
773     def add_running_test (self, pid, buildname):
774         i=self.get_test(buildname)
775         if not i:
776             self.test_instances.append (TestInstance (buildname,pid))
777             return
778         if i.pids:
779             print "WARNING: 2 concurrent tests run on same build %s"%buildname
780         i.add_pid (pid)
781
782     def add_broken (self, buildname, plcindex, step):
783         i=self.get_test(buildname)
784         if not i:
785             i=TestInstance(buildname)
786             self.test_instances.append(i)
787         i.set_broken(plcindex, step)
788
789     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
790     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
791     def sense (self, options):
792         print 't',
793         self.sense_uptime()
794         self.starting_ips=[x for x in self.backquote_ssh(['cat',Starting.location], trash_err=True).strip().split('\n') if x]
795
796         # scan timestamps on all tests
797         # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
798         # xxx would make sense above too
799         command=['bash','-c',"grep . /root/*/timestamp /dev/null"]
800         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
801         for ts_line in ts_lines:
802             if not ts_line.strip(): continue
803             # expect /root/<buildname>/timestamp:<timestamp>
804             try:
805                 (ts_file,timestamp)=ts_line.split(':')
806                 ts_file=os.path.dirname(ts_file)
807                 buildname=os.path.basename(ts_file)
808                 timestamp=int(timestamp)
809                 t=self.add_timestamp(buildname,timestamp)
810             except:  print 'WARNING, could not parse ts line',ts_line
811
812         command=['bash','-c',"grep KO /root/*/logs/trace-* /dev/null" ]
813         trace_lines=self.backquote_ssh (command).split('\n')
814         for line in trace_lines:
815             if not line.strip(): continue
816             m=TestBox.matcher_grep.match(line)
817             if m: 
818                 buildname=m.group('buildname')
819                 plcindex=m.group('plcindex')
820                 step=m.group('step')
821                 self.add_broken(buildname,plcindex, step)
822                 continue
823             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
824             header(">>%s<<"%line)
825
826         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
827         if not pids: return
828         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
829         ps_lines=self.backquote_ssh (command).split('\n')
830         for line in ps_lines:
831             if not line.strip(): continue
832             m=TestBox.matcher_proc.match(line)
833             if m: 
834                 pid=m.group('pid')
835                 buildname=m.group('buildname')
836                 self.add_running_test(pid, buildname)
837                 continue
838             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
839             header(">>%s<<"%line)
840         
841         
842     def line (self):
843         return "%s (%s)"%(self.hostname,self.uptime())
844
845     def list (self):
846         if not self.test_instances:
847             header ("No known tests on %s"%self.line())
848         else:
849             header ("Known tests on %s"%self.line())
850             self.test_instances.sort(timestamp_sort)
851             for i in self.test_instances: print i.line()
852         if self.starting_ips:
853             header ("Starting IP addresses on %s"%self.line())
854             self.starting_ips.sort()
855             for starting in self.starting_ips: print starting
856
857 ############################################################
858 class Options: pass
859
860 class Substrate:
861
862     def __init__ (self, plcs_on_vs=True, plcs_on_lxc=False):
863         self.options=Options()
864         self.options.dry_run=False
865         self.options.verbose=False
866         self.options.reboot=False
867         self.options.soft=False
868         self.test_box = TestBox (self.test_box_spec())
869         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
870         # for compat with older LocalSubstrate
871         try:
872             self.plc_vs_boxes = [ PlcVsBox (h,m) for (h,m) in self.plc_vs_boxes_spec ()]
873             self.plc_lxc_boxes = [ PlcLxcBox (h,m) for (h,m) in self.plc_lxc_boxes_spec ()]
874         except:
875             self.plc_vs_boxes = [ PlcVsBox (h,m) for (h,m) in self.plc_boxes_spec ()]
876             self.plc_lxc_boxes = [ ]
877         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
878         self._sensed=False
879
880         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs",self)
881         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes",self)
882         
883         self.rescope (plcs_on_vs=plcs_on_vs, plcs_on_lxc=plcs_on_lxc)
884
885     def rescope(self, plcs_on_vs, plcs_on_lxc):
886         self.plc_boxes=[]
887         if plcs_on_vs: self.plc_boxes += self.plc_vs_boxes
888         if plcs_on_lxc: self.plc_boxes += self.plc_lxc_boxes
889         self.default_boxes = self.plc_boxes + self.qemu_boxes
890         self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
891
892     def fqdn (self, hostname):
893         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
894         return hostname
895
896     # return True if actual sensing takes place
897     def sense (self,force=False):
898         if self._sensed and not force: return False
899         print 'Sensing local substrate...',
900         for b in self.default_boxes: b.sense(self.options)
901         print 'Done'
902         self._sensed=True
903         return True
904
905     def list (self):
906         for b in self.default_boxes:
907             b.list()
908
909     def add_dummy_plc (self, plc_boxname, plcname):
910         for pb in self.plc_boxes:
911             if pb.hostname==plc_boxname:
912                 pb.add_dummy(plcname)
913                 return True
914     def add_dummy_qemu (self, qemu_boxname, qemuname):
915         for qb in self.qemu_boxes:
916             if qb.hostname==qemu_boxname:
917                 qb.add_dummy(qemuname)
918                 return True
919
920     def add_starting_dummy (self, bname, vname):
921         return self.add_dummy_plc (bname, vname) or self.add_dummy_qemu (bname, vname)
922
923     ########## 
924     def provision (self,plcs,options):
925         try:
926             # attach each plc to a plc box and an IP address
927             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
928             # attach each node/qemu to a qemu box with an IP address
929             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
930             # update the SFA spec accordingly
931             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
932             self.list()
933             return plcs
934         except Exception, e:
935             print '* Could not provision this test on current substrate','--',e,'--','exiting'
936             traceback.print_exc()
937             sys.exit(1)
938
939     # it is expected that a couple of options like ips_bplc and ips_vplc 
940     # are set or unset together
941     @staticmethod
942     def check_options (x,y):
943         if not x and not y: return True
944         return len(x)==len(y)
945
946     # find an available plc box (or make space)
947     # and a free IP address (using options if present)
948     def provision_plc (self, plc, options):
949         
950         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
951
952         #### let's find an IP address for that plc
953         # look in options 
954         if options.ips_vplc:
955             # this is a rerun
956             # we don't check anything here, 
957             # it is the caller's responsability to cleanup and make sure this makes sense
958             plc_boxname = options.ips_bplc.pop()
959             vplc_hostname=options.ips_vplc.pop()
960         else:
961             if self.sense(): self.list()
962             plc_boxname=None
963             vplc_hostname=None
964             # try to find an available IP 
965             self.vplc_pool.sense()
966             couple=self.vplc_pool.next_free()
967             if couple:
968                 (vplc_hostname,unused)=couple
969             #### we need to find one plc box that still has a slot
970             max_free=0
971             # use the box that has max free spots for load balancing
972             for pb in self.plc_boxes:
973                 free=pb.free_slots()
974                 if free>max_free:
975                     plc_boxname=pb.hostname
976                     max_free=free
977             # if there's no available slot in the plc_boxes, or we need a free IP address
978             # make space by killing the oldest running instance
979             if not plc_boxname or not vplc_hostname:
980                 # find the oldest of all our instances
981                 all_plc_instances=reduce(lambda x, y: x+y, 
982                                          [ pb.plc_instances for pb in self.plc_boxes ],
983                                          [])
984                 all_plc_instances.sort(timestamp_sort)
985                 try:
986                     plc_instance_to_kill=all_plc_instances[0]
987                 except:
988                     msg=""
989                     if not plc_boxname: msg += " PLC boxes are full"
990                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
991                     raise Exception,"Could not make space for a PLC instance:"+msg
992                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
993                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
994                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
995                                                                   freed_plc_boxname)
996                 plc_instance_to_kill.kill()
997                 # use this new plcbox if that was the problem
998                 if not plc_boxname:
999                     plc_boxname=freed_plc_boxname
1000                 # ditto for the IP address
1001                 if not vplc_hostname:
1002                     vplc_hostname=freed_vplc_hostname
1003                     # record in pool as mine
1004                     self.vplc_pool.set_mine(vplc_hostname)
1005
1006         # 
1007         self.add_dummy_plc(plc_boxname,plc['name'])
1008         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
1009         self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
1010
1011         #### compute a helpful vserver name
1012         # remove domain in hostname
1013         vplc_short = short_hostname(vplc_hostname)
1014         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
1015         plc_name = "%s_%s"%(plc['name'],vplc_short)
1016
1017         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
1018                           (plc['name'],plc_boxname,vplc_hostname,vservername))
1019
1020         #### apply in the plc_spec
1021         # # informative
1022         # label=options.personality.replace("linux","")
1023         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
1024                                    # 'name':'%s-'+label,
1025                                    'name': plc_name,
1026                                    'vservername':vservername,
1027                                    'vserverip':vplc_ip,
1028                                    'PLC_DB_HOST':vplc_hostname,
1029                                    'PLC_API_HOST':vplc_hostname,
1030                                    'PLC_BOOT_HOST':vplc_hostname,
1031                                    'PLC_WWW_HOST':vplc_hostname,
1032                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
1033                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
1034                                    } ) ]
1035                   }
1036
1037
1038         # mappers only work on a list of plcs
1039         return TestMapper([plc],options).map(mapper)[0]
1040
1041     ##########
1042     def provision_qemus (self, plc, options):
1043
1044         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
1045
1046         test_mapper = TestMapper ([plc], options)
1047         nodenames = test_mapper.node_names()
1048         maps=[]
1049         for nodename in nodenames:
1050
1051             if options.ips_vnode:
1052                 # as above, it's a rerun, take it for granted
1053                 qemu_boxname=options.ips_bnode.pop()
1054                 vnode_hostname=options.ips_vnode.pop()
1055             else:
1056                 if self.sense(): self.list()
1057                 qemu_boxname=None
1058                 vnode_hostname=None
1059                 # try to find an available IP 
1060                 self.vnode_pool.sense()
1061                 couple=self.vnode_pool.next_free()
1062                 if couple:
1063                     (vnode_hostname,unused)=couple
1064                 # find a physical box
1065                 max_free=0
1066                 # use the box that has max free spots for load balancing
1067                 for qb in self.qemu_boxes:
1068                     free=qb.free_slots()
1069                     if free>max_free:
1070                         qemu_boxname=qb.hostname
1071                         max_free=free
1072                 # if we miss the box or the IP, kill the oldest instance
1073                 if not qemu_boxname or not vnode_hostname:
1074                 # find the oldest of all our instances
1075                     all_qemu_instances=reduce(lambda x, y: x+y, 
1076                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
1077                                               [])
1078                     all_qemu_instances.sort(timestamp_sort)
1079                     try:
1080                         qemu_instance_to_kill=all_qemu_instances[0]
1081                     except:
1082                         msg=""
1083                         if not qemu_boxname: msg += " QEMU boxes are full"
1084                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
1085                         raise Exception,"Could not make space for a QEMU instance:"+msg
1086                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
1087                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
1088                     # kill it
1089                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
1090                                                                    freed_qemu_boxname)
1091                     qemu_instance_to_kill.kill()
1092                     # use these freed resources where needed
1093                     if not qemu_boxname:
1094                         qemu_boxname=freed_qemu_boxname
1095                     if not vnode_hostname:
1096                         vnode_hostname=freed_vnode_hostname
1097                         self.vnode_pool.set_mine(vnode_hostname)
1098
1099             self.add_dummy_qemu (qemu_boxname,vnode_hostname)
1100             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
1101             ip=self.vnode_pool.get_ip (vnode_hostname)
1102             self.vnode_pool.add_starting(vnode_hostname,qemu_boxname)
1103
1104             vnode_fqdn = self.fqdn(vnode_hostname)
1105             nodemap={'host_box':qemu_boxname,
1106                      'node_fields:hostname':vnode_fqdn,
1107                      'interface_fields:ip':ip, 
1108                      'interface_fields:mac':mac,
1109                      }
1110             nodemap.update(self.network_settings())
1111             maps.append ( (nodename, nodemap) )
1112
1113             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
1114                              (nodename,qemu_boxname,vnode_hostname,mac))
1115
1116         return test_mapper.map({'node':maps})[0]
1117
1118     def localize_sfa_rspec (self,plc,options):
1119        
1120         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
1121         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
1122         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
1123         plc['sfa']['SFA_DB_HOST'] = plc['PLC_DB_HOST']
1124         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
1125         return plc
1126
1127     #################### release:
1128     def release (self,options):
1129         self.vplc_pool.release_my_starting()
1130         self.vnode_pool.release_my_starting()
1131         pass
1132
1133     #################### show results for interactive mode
1134     def get_box (self,boxname):
1135         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1136             if b.shortname()==boxname:
1137                 return b
1138         print "Could not find box %s"%boxname
1139         return None
1140
1141     def list_boxes(self,box_or_names):
1142         print 'Sensing',
1143         for box in box_or_names:
1144             if not isinstance(box,Box): box=self.get_box(box)
1145             if not box: continue
1146             box.sense(self.options)
1147         print 'Done'
1148         for box in box_or_names:
1149             if not isinstance(box,Box): box=self.get_box(box)
1150             if not box: continue
1151             box.list()
1152
1153     def reboot_boxes(self,box_or_names):
1154         for box in box_or_names:
1155             if not isinstance(box,Box): box=self.get_box(box)
1156             if not box: continue
1157             box.reboot(self.options)
1158
1159     ####################
1160     # can be run as a utility to manage the local infrastructure
1161     def main (self):
1162         parser=OptionParser()
1163         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1164                            help='reboot mode (use shutdown -r)')
1165         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1166                            help='soft mode for reboot (vserver stop or kill qemus)')
1167         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1168                            help='add test box') 
1169         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1170                            help='add build boxes')
1171         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1172                            help='add plc boxes')
1173         parser.add_option ('-X', "--lxc",action='store_true',dest='plcs_use_lxc',
1174                            help='use lxc-enabled plc boxes instead of vs-enabled ones')
1175         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1176                            help='add qemu boxes') 
1177         parser.add_option ('-a',"--all",action='store_true',dest='all',default=False,
1178                            help='address all known  boxes, like -b -t -p -q')
1179         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1180                            help='verbose mode')
1181         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1182                            help='dry run mode')
1183         (self.options,args)=parser.parse_args()
1184
1185         if self.options.plcs_use_lxc:
1186             self.rescope (plcs_on_vs=False, plcs_on_lxc=True)
1187
1188         boxes=args
1189         if self.options.testbox: boxes += [self.test_box]
1190         if self.options.builds: boxes += self.build_boxes
1191         if self.options.plcs: boxes += self.plc_boxes
1192         if self.options.qemus: boxes += self.qemu_boxes
1193         if self.options.all: boxes += self.all_boxes
1194         
1195         # default scope is -b -p -q
1196         if not boxes:
1197             boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes
1198
1199         if self.options.reboot: self.reboot_boxes (boxes)
1200         else:                   self.list_boxes (boxes)