more changes towards a dual vs/lxc test infra for plc's
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72
73 ####################
74 # the place were other test instances tell about their not-yet-started
75 # instances, that go undetected through sensing
76 class Starting:
77
78     location='/root/starting'
79     def __init__ (self):
80         self.tuples=[]
81
82     def load (self):
83         try:    self.tuples=[line.strip().split('@') 
84                              for line in file(Starting.location).readlines()]
85         except: self.tuples=[]
86
87     def vnames (self) : 
88         self.load()
89         return [ x for (x,_) in self.tuples ]
90
91     def add (self, vname, bname):
92         if not vname in self.vnames():
93             file(Starting.location,'a').write("%s@%s\n"%(vname,bname))
94             
95     def delete_vname (self, vname):
96         self.load()
97         if vname in self.vnames():
98             f=file(Starting.location,'w')
99             for (v,b) in self.tuples: 
100                 if v != vname: f.write("%s@%s\n"%(v,b))
101             f.close()
102     
103 ####################
104 # pool class
105 # allows to pick an available IP among a pool
106 # input is expressed as a list of tuples (hostname,ip,user_data)
107 # that can be searched iteratively for a free slot
108 # e.g.
109 # pool = [ (hostname1,user_data1),  
110 #          (hostname2,user_data2),  
111 #          (hostname3,user_data2),  
112 #          (hostname4,user_data4) ]
113 # assuming that ip1 and ip3 are taken (pingable), then we'd get
114 # pool=Pool(pool)
115 # pool.next_free() -> entry2
116 # pool.next_free() -> entry4
117 # pool.next_free() -> None
118 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
119
120 class PoolItem:
121     def __init__ (self,hostname,userdata):
122         self.hostname=hostname
123         self.userdata=userdata
124         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
125         # 'mine' is for our own stuff, 'starting' from the concurrent tests
126         self.status=None
127         self.ip=None
128
129     def line(self):
130         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
131
132     def char (self):
133         if   self.status==None:       return '?'
134         elif self.status=='busy':     return '+'
135         elif self.status=='free':     return '-'
136         elif self.status=='mine':     return 'M'
137         elif self.status=='starting': return 'S'
138
139     def get_ip(self):
140         if self.ip: return self.ip
141         ip=socket.gethostbyname(self.hostname)
142         self.ip=ip
143         return ip
144
145 class Pool:
146
147     def __init__ (self, tuples,message, substrate):
148         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
149         self.message=message
150         # where to send notifications upon load_starting
151         self.substrate=substrate
152
153     def list (self):
154         for i in self.pool_items: print i.line()
155
156     def line (self):
157         line=self.message
158         for i in self.pool_items: line += ' ' + i.char()
159         return line
160
161     def _item (self, hostname):
162         for i in self.pool_items: 
163             if i.hostname==hostname: return i
164         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
165
166     def retrieve_userdata (self, hostname): 
167         return self._item(hostname).userdata
168
169     def get_ip (self, hostname):
170         try:    return self._item(hostname).get_ip()
171         except: return socket.gethostbyname(hostname)
172         
173     def set_mine (self, hostname):
174         try:
175             self._item(hostname).status='mine'
176         except:
177             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
178
179     def next_free (self):
180         for i in self.pool_items:
181             if i.status == 'free':
182                 i.status='mine'
183                 return (i.hostname,i.userdata)
184         return None
185
186     ####################
187     # we have a starting instance of our own
188     def add_starting (self, vname, bname):
189         Starting().add(vname,bname)
190         for i in self.pool_items:
191             if i.hostname==vname: i.status='mine'
192
193     # load the starting instances from the common file
194     # remember that might be ours
195     # return the list of (vname,bname) that are not ours
196     def load_starting (self):
197         starting=Starting()
198         starting.load()
199         new_tuples=[]
200         for (v,b) in starting.tuples:
201             for i in self.pool_items:
202                 if i.hostname==v and i.status=='free':
203                     i.status='starting'
204                     new_tuples.append( (v,b,) )
205         return new_tuples
206
207     def release_my_starting (self):
208         for i in self.pool_items:
209             if i.status=='mine':
210                 Starting().delete_vname (i.hostname)
211                 i.status=None
212
213
214     ##########
215     def _sense (self):
216         for item in self.pool_items:
217             if item.status is not None: 
218                 print item.char(),
219                 continue
220             if self.check_ping (item.hostname): 
221                 item.status='busy'
222                 print '*',
223             else:
224                 item.status='free'
225                 print '.',
226     
227     def sense (self):
228         print 'Sensing IP pool',self.message,
229         self._sense()
230         print 'Done'
231         for (vname,bname) in self.load_starting():
232             self.substrate.add_starting_dummy (bname, vname)
233         print 'After starting: IP pool'
234         print self.line()
235     # OS-dependent ping option (support for macos, for convenience)
236     ping_timeout_option = None
237     # returns True when a given hostname/ip responds to ping
238     def check_ping (self,hostname):
239         if not Pool.ping_timeout_option:
240             (status,osname) = commands.getstatusoutput("uname -s")
241             if status != 0:
242                 raise Exception, "TestPool: Cannot figure your OS name"
243             if osname == "Linux":
244                 Pool.ping_timeout_option="-w"
245             elif osname == "Darwin":
246                 Pool.ping_timeout_option="-t"
247
248         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
249         (status,output) = commands.getstatusoutput(command)
250         return status == 0
251
252 ####################
253 class Box:
254     def __init__ (self,hostname):
255         self.hostname=hostname
256         self._probed=None
257     def shortname (self):
258         return short_hostname(self.hostname)
259     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
260     def reboot (self, options):
261         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
262                             dry_run=options.dry_run)
263
264     def uptime(self):
265         if hasattr(self,'_uptime') and self._uptime: return self._uptime
266         return '*undef* uptime'
267     def sense_uptime (self):
268         command=['uptime']
269         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
270         if not self._uptime: self._uptime='unreachable'
271
272     def run(self,argv,message=None,trash_err=False,dry_run=False):
273         if dry_run:
274             print 'DRY_RUN:',
275             print " ".join(argv)
276             return 0
277         else:
278             header(message)
279             if not trash_err:
280                 return subprocess.call(argv)
281             else:
282                 return subprocess.call(argv,stderr=file('/dev/null','w'))
283                 
284     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
285         ssh_argv = self.test_ssh().actual_argv(argv)
286         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
287         if result!=0:
288             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
289         return result
290
291     def backquote (self, argv, trash_err=False):
292         # print 'running backquote',argv
293         if not trash_err:
294             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
295         else:
296             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
297         return result
298
299     def probe (self):
300         if self._probed is not None: return self._probed
301         # first probe the ssh link
302         probe_argv=self.test_ssh().actual_argv(['hostname'])
303         self._probed=self.backquote ( probe_argv, trash_err=True )
304         if not self._probed: print "root@%s unreachable"%self.hostname
305         return self._probed
306
307     # use argv=['bash','-c',"the command line"]
308     # if you have any shell-expanded arguments like *
309     # and if there's any chance the command is adressed to the local host
310     def backquote_ssh (self, argv, trash_err=False):
311         if not self.probe(): return ''
312         return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
313
314 ############################################################
315 class BuildInstance:
316     def __init__ (self, buildname, pid, buildbox):
317         self.buildname=buildname
318         self.buildbox=buildbox
319         self.pids=[pid]
320
321     def add_pid(self,pid):
322         self.pids.append(pid)
323
324     def line (self):
325         return "== %s == (pids=%r)"%(self.buildname,self.pids)
326
327 class BuildBox (Box):
328     def __init__ (self,hostname):
329         Box.__init__(self,hostname)
330         self.build_instances=[]
331
332     def add_build (self,buildname,pid):
333         for build in self.build_instances:
334             if build.buildname==buildname: 
335                 build.add_pid(pid)
336                 return
337         self.build_instances.append(BuildInstance(buildname, pid, self))
338
339     def list(self):
340         if not self.build_instances: 
341             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
342         else:
343             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
344             for b in self.build_instances: 
345                 header (b.line(),banner=False)
346
347     def reboot (self, options):
348         if not options.soft:
349             Box.reboot(self,options)
350         else:
351             command=['pkill','vbuild']
352             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
353
354     # inspect box and find currently running builds
355     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
356     matcher_building_vm=re.compile("\s*(?P<pid>[0-9]+).*init-vserver.*-i\s+eth.\s+(?P<buildname>[^\s]+)\s*\Z")
357     def sense(self, options):
358         print 'b',
359         self.sense_uptime()
360         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
361         if not pids: return
362         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
363         ps_lines=self.backquote_ssh (command).split('\n')
364         for line in ps_lines:
365             if not line.strip() or line.find('PID')>=0: continue
366             m=BuildBox.matcher.match(line)
367             if m: 
368                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
369                 buildname=m.group('buildname').replace('@DATE@',date)
370                 self.add_build (buildname,m.group('pid'))
371                 continue
372             m=BuildBox.matcher_building_vm.match(line)
373             if m: 
374                 # buildname is expansed here
375                 self.add_build (buildname,m.group('pid'))
376                 continue
377             header('BuildBox.sense: command %r returned line that failed to match'%command)
378             header(">>%s<<"%line)
379
380 ############################################################
381 class PlcInstance:
382     def __init__ (self, plcbox):
383         self.plc_box=plcbox
384         # unknown yet
385         self.timestamp=0
386         
387     def set_timestamp (self,timestamp): self.timestamp=timestamp
388     def set_now (self): self.timestamp=int(time.time())
389     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
390
391 class PlcVsInstance (PlcInstance):
392     def __init__ (self, plcbox, vservername, ctxid):
393         PlcInstance.__init__(self,plcbox)
394         self.vservername=vservername
395         self.ctxid=ctxid
396
397     def vplcname (self):
398         return self.vservername.split('-')[-1]
399     def buildname (self):
400         return self.vservername.rsplit('-',2)[0]
401
402     def line (self):
403         msg="== %s =="%(self.vplcname())
404         msg += " [=%s]"%self.vservername
405         if self.ctxid==0:  msg+=" not (yet?) running"
406         else:              msg+=" (ctx=%s)"%self.ctxid     
407         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
408         else:              msg += " *unknown timestamp*"
409         return msg
410
411     def kill (self):
412         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
413         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
414         self.plc_box.forget(self)
415
416 class PlcLxcInstance (PlcInstance):
417     # does lxc have a context id of any kind ?
418     def __init__ (self, plcbox, lxcname):
419         PlcInstance.__init__(self, plcbox)
420         self.lxcname = lxcname
421
422     def kill (self):
423         print "TODO lxc PlcLxcInstance.kill ..."
424
425     def line (self):
426         return "TODO lxc PlcLxcInstance.line with lxcname=%s"%(self.lxcname)
427
428 ##########
429 class PlcBox (Box):
430     def __init__ (self, hostname, max_plcs):
431         Box.__init__(self,hostname)
432         self.plc_instances=[]
433         self.max_plcs=max_plcs
434
435     def free_slots (self):
436         return self.max_plcs - len(self.plc_instances)
437
438     # fill one slot even though this one is not started yet
439     def add_dummy (self, plcname):
440         dummy=PlcVsInstance(self,'dummy_'+plcname,0)
441         dummy.set_now()
442         self.plc_instances.append(dummy)
443
444     def reboot (self, options):
445         if not options.soft:
446             self.reboot(options)
447         else:
448             self.soft_reboot (options)
449
450     def list(self):
451         if not self.plc_instances: 
452             header ('No plc running on %s'%(self.line()))
453         else:
454             header ("Active plc VMs on %s"%self.line())
455             self.plc_instances.sort(timestamp_sort)
456             for p in self.plc_instances: 
457                 header (p.line(),banner=False)
458
459     def get_uname(self):
460         self._uname=self.backquote_ssh(['uname','-r']).strip()
461
462     # expecting sense () to have filled self._uname
463     def uname(self):
464         if hasattr(self,'_uname') and self._uname: return self._uname
465         return '*undef* uname'
466
467 class PlcVsBox (PlcBox):
468
469     def add_vserver (self,vservername,ctxid):
470         for plc in self.plc_instances:
471             if plc.vservername==vservername: 
472                 header("WARNING, duplicate myplc %s running on %s"%\
473                            (vservername,self.hostname),banner=False)
474                 return
475         self.plc_instances.append(PlcVsInstance(self,vservername,ctxid))
476     
477     def forget (self, plc_instance):
478         self.plc_instances.remove(plc_instance)
479
480     def line(self): 
481         msg="%s [max=%d,%d free, VS-based] (%s)"%(self.hostname, self.max_plcs,self.free_slots(),self.uname())
482         return msg
483         
484     def plc_instance_by_vservername (self, vservername):
485         for p in self.plc_instances:
486             if p.vservername==vservername: return p
487         return None
488
489     def soft_reboot (self, options):
490         self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
491                      dry_run=options.dry_run)
492
493     def sense (self, options):
494         print 'p',
495         self.get_uname()
496         # try to find fullname (vserver_stat truncates to a ridiculously short name)
497         # fetch the contexts for all vservers on that box
498         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
499         context_map=self.backquote_ssh (map_command)
500         # at this point we have a set of lines like
501         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
502         ctx_dict={}
503         for map_line in context_map.split("\n"):
504             if not map_line: continue
505             [path,xid] = map_line.split(':')
506             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
507         # at this point ctx_id maps context id to vservername
508
509         command=['vserver-stat']
510         vserver_stat = self.backquote_ssh (command)
511         for vserver_line in vserver_stat.split("\n"):
512             if not vserver_line: continue
513             context=vserver_line.split()[0]
514             if context=="CTX": continue
515             try:
516                 longname=ctx_dict[context]
517                 self.add_vserver(longname,context)
518             except:
519                 print 'WARNING: found ctx %s in vserver_stat but was unable to figure a corresp. vserver'%context
520
521         # scan timestamps 
522         running_vsnames = [ i.vservername for i in self.plc_instances ]
523         command=   ['grep','.']
524         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
525         command += ['/dev/null']
526         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
527         for ts_line in ts_lines:
528             if not ts_line.strip(): continue
529             # expect /vservers/<vservername>.timestamp:<timestamp>
530             try:
531                 (ts_file,timestamp)=ts_line.split(':')
532                 ts_file=os.path.basename(ts_file)
533                 (vservername,_)=os.path.splitext(ts_file)
534                 timestamp=int(timestamp)
535                 p=self.plc_instance_by_vservername(vservername)
536                 if not p: 
537                     print 'WARNING zombie plc',self.hostname,ts_line
538                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
539                     continue
540                 p.set_timestamp(timestamp)
541             except:  print 'WARNING, could not parse ts line',ts_line
542         
543
544 class PlcLxcBox (PlcBox):
545
546     # a line describing the box
547     def line(self): 
548         msg="%s [max=%d,%d free, LXC-based] (%s)"%(self.hostname, self.max_plcs,self.free_slots(),self.uname())
549         return msg
550         
551     # essentially shutdown all running containers
552     def soft_reboot (self, options):
553         print "TODO lxc PlcLxcBox.soft_reboot"
554
555     # sense is expected to fill self.plc_instances with PlcLxcInstance's 
556     # to describe the currently running VM's
557     # as well as to call  self.get_uname() once
558     def sense (self, options):
559         print "p(lxc) - todo (PlcLxcBox.sense)",
560         self.get_uname()
561
562
563 ############################################################
564 class QemuInstance: 
565     def __init__ (self, nodename, pid, qemubox):
566         self.nodename=nodename
567         self.pid=pid
568         self.qemu_box=qemubox
569         # not known yet
570         self.buildname=None
571         self.timestamp=0
572         
573     def set_buildname (self,buildname): self.buildname=buildname
574     def set_timestamp (self,timestamp): self.timestamp=timestamp
575     def set_now (self): self.timestamp=int(time.time())
576     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
577     
578     def line (self):
579         msg = "== %s =="%(short_hostname(self.nodename))
580         msg += " [=%s]"%self.buildname
581         if self.pid:       msg += " (pid=%s)"%self.pid
582         else:              msg += " not (yet?) running"
583         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
584         else:              msg += " *unknown timestamp*"
585         return msg
586     
587     def kill(self):
588         if self.pid==0: 
589             print "cannot kill qemu %s with pid==0"%self.nodename
590             return
591         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
592         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
593         self.qemu_box.forget(self)
594
595
596 class QemuBox (Box):
597     def __init__ (self, hostname, max_qemus):
598         Box.__init__(self,hostname)
599         self.qemu_instances=[]
600         self.max_qemus=max_qemus
601
602     def add_node (self,nodename,pid):
603         for qemu in self.qemu_instances:
604             if qemu.nodename==nodename: 
605                 header("WARNING, duplicate qemu %s running on %s"%\
606                            (nodename,self.hostname), banner=False)
607                 return
608         self.qemu_instances.append(QemuInstance(nodename,pid,self))
609
610     def forget (self, qemu_instance):
611         self.qemu_instances.remove(qemu_instance)
612
613     # fill one slot even though this one is not started yet
614     def add_dummy (self, nodename):
615         dummy=QemuInstance('dummy_'+nodename,0,self)
616         dummy.set_now()
617         self.qemu_instances.append(dummy)
618
619     def line (self):
620         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_slots(),self.driver())
621         return msg
622
623     def list(self):
624         if not self.qemu_instances: 
625             header ('No qemu process on %s'%(self.line()))
626         else:
627             header ("Active qemu processes on %s"%(self.line()))
628             self.qemu_instances.sort(timestamp_sort)
629             for q in self.qemu_instances: 
630                 header (q.line(),banner=False)
631
632     def free_slots (self):
633         return self.max_qemus - len(self.qemu_instances)
634
635     def driver(self):
636         if hasattr(self,'_driver') and self._driver: return self._driver
637         return '*undef* driver'
638
639     def qemu_instance_by_pid (self,pid):
640         for q in self.qemu_instances:
641             if q.pid==pid: return q
642         return None
643
644     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
645         for q in self.qemu_instances:
646             if q.nodename==nodename and q.buildname==buildname:
647                 return q
648         return None
649
650     def reboot (self, options):
651         if not options.soft:
652             self.reboot(options)
653         else:
654             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
655                          dry_run=options.dry_run)
656
657     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
658     def sense(self, options):
659         print 'q',
660         modules=self.backquote_ssh(['lsmod']).split('\n')
661         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
662         for module in modules:
663             if module.find('kqemu')==0:
664                 self._driver='kqemu module loaded'
665             # kvm might be loaded without vkm_intel (we dont have AMD)
666             elif module.find('kvm_intel')==0:
667                 self._driver='kvm_intel module loaded'
668         ########## find out running pids
669         pids=self.backquote_ssh(['pgrep','qemu'])
670         if not pids: return
671         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
672         ps_lines = self.backquote_ssh (command).split("\n")
673         for line in ps_lines:
674             if not line.strip() or line.find('PID') >=0 : continue
675             m=QemuBox.matcher.match(line)
676             if m: 
677                 self.add_node (m.group('nodename'),m.group('pid'))
678                 continue
679             header('QemuBox.sense: command %r returned line that failed to match'%command)
680             header(">>%s<<"%line)
681         ########## retrieve alive instances and map to build
682         live_builds=[]
683         command=['grep','.','*/*/qemu.pid','/dev/null']
684         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
685         for pid_line in pid_lines:
686             if not pid_line.strip(): continue
687             # expect <build>/<nodename>/qemu.pid:<pid>pid
688             try:
689                 (buildname,nodename,tail)=pid_line.split('/')
690                 (_,pid)=tail.split(':')
691                 q=self.qemu_instance_by_pid (pid)
692                 if not q: continue
693                 q.set_buildname(buildname)
694                 live_builds.append(buildname)
695             except: print 'WARNING, could not parse pid line',pid_line
696         # retrieve timestamps
697         if not live_builds: return
698         command=   ['grep','.']
699         command += ['%s/*/timestamp'%b for b in live_builds]
700         command += ['/dev/null']
701         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
702         for ts_line in ts_lines:
703             if not ts_line.strip(): continue
704             # expect <build>/<nodename>/timestamp:<timestamp>
705             try:
706                 (buildname,nodename,tail)=ts_line.split('/')
707                 nodename=nodename.replace('qemu-','')
708                 (_,timestamp)=tail.split(':')
709                 timestamp=int(timestamp)
710                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
711                 if not q: 
712                     print 'WARNING zombie qemu',self.hostname,ts_line
713                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
714                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
715                     continue
716                 q.set_timestamp(timestamp)
717             except:  print 'WARNING, could not parse ts line',ts_line
718
719 ####################
720 class TestInstance:
721     def __init__ (self, buildname, pid=0):
722         self.pids=[]
723         if pid!=0: self.pid.append(pid)
724         self.buildname=buildname
725         # latest trace line
726         self.trace=''
727         # has a KO test
728         self.broken_steps=[]
729         self.timestamp = 0
730
731     def set_timestamp (self,timestamp): self.timestamp=timestamp
732     def set_now (self): self.timestamp=int(time.time())
733     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
734
735
736     def add_pid (self,pid):
737         self.pids.append(pid)
738     def set_broken (self, plcindex, step): 
739         self.broken_steps.append ( (plcindex, step,) )
740
741     def line (self):
742         double='=='
743         if self.pids: double='*'+double[1]
744         if self.broken_steps: double=double[0]+'B'
745         msg = " %s %s =="%(double,self.buildname)
746         if not self.pids:       pass
747         elif len(self.pids)==1: msg += " (pid=%s)"%self.pids[0]
748         else:                   msg += " !!!pids=%s!!!"%self.pids
749         msg += " @%s"%self.pretty_timestamp()
750         if self.broken_steps:
751             msg += " [BROKEN=" + " ".join( [ "%s@%s"%(s,i) for (i,s) in self.broken_steps ] ) + "]"
752         return msg
753
754 class TestBox (Box):
755     def __init__ (self,hostname):
756         Box.__init__(self,hostname)
757         self.starting_ips=[]
758         self.test_instances=[]
759
760     def reboot (self, options):
761         # can't reboot a vserver VM
762         self.run_ssh (['pkill','run_log'],"Terminating current runs",
763                       dry_run=options.dry_run)
764         self.run_ssh (['rm','-f',Starting.location],"Cleaning %s"%Starting.location,
765                       dry_run=options.dry_run)
766
767     def get_test (self, buildname):
768         for i in self.test_instances:
769             if i.buildname==buildname: return i
770
771     # we scan ALL remaining test results, even the ones not running
772     def add_timestamp (self, buildname, timestamp):
773         i=self.get_test(buildname)
774         if i:   
775             i.set_timestamp(timestamp)
776         else:   
777             i=TestInstance(buildname,0)
778             i.set_timestamp(timestamp)
779             self.test_instances.append(i)
780
781     def add_running_test (self, pid, buildname):
782         i=self.get_test(buildname)
783         if not i:
784             self.test_instances.append (TestInstance (buildname,pid))
785             return
786         if i.pids:
787             print "WARNING: 2 concurrent tests run on same build %s"%buildname
788         i.add_pid (pid)
789
790     def add_broken (self, buildname, plcindex, step):
791         i=self.get_test(buildname)
792         if not i:
793             i=TestInstance(buildname)
794             self.test_instances.append(i)
795         i.set_broken(plcindex, step)
796
797     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
798     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
799     def sense (self, options):
800         print 't',
801         self.sense_uptime()
802         self.starting_ips=[x for x in self.backquote_ssh(['cat',Starting.location], trash_err=True).strip().split('\n') if x]
803
804         # scan timestamps on all tests
805         # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
806         # xxx would make sense above too
807         command=['bash','-c',"grep . /root/*/timestamp /dev/null"]
808         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
809         for ts_line in ts_lines:
810             if not ts_line.strip(): continue
811             # expect /root/<buildname>/timestamp:<timestamp>
812             try:
813                 (ts_file,timestamp)=ts_line.split(':')
814                 ts_file=os.path.dirname(ts_file)
815                 buildname=os.path.basename(ts_file)
816                 timestamp=int(timestamp)
817                 t=self.add_timestamp(buildname,timestamp)
818             except:  print 'WARNING, could not parse ts line',ts_line
819
820         command=['bash','-c',"grep KO /root/*/logs/trace-* /dev/null" ]
821         trace_lines=self.backquote_ssh (command).split('\n')
822         for line in trace_lines:
823             if not line.strip(): continue
824             m=TestBox.matcher_grep.match(line)
825             if m: 
826                 buildname=m.group('buildname')
827                 plcindex=m.group('plcindex')
828                 step=m.group('step')
829                 self.add_broken(buildname,plcindex, step)
830                 continue
831             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
832             header(">>%s<<"%line)
833
834         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
835         if not pids: return
836         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
837         ps_lines=self.backquote_ssh (command).split('\n')
838         for line in ps_lines:
839             if not line.strip(): continue
840             m=TestBox.matcher_proc.match(line)
841             if m: 
842                 pid=m.group('pid')
843                 buildname=m.group('buildname')
844                 self.add_running_test(pid, buildname)
845                 continue
846             header("TestBox.sense: command %r returned line that failed to match\n%s"%(command,line))
847             header(">>%s<<"%line)
848         
849         
850     def line (self):
851         return "%s (%s)"%(self.hostname,self.uptime())
852
853     def list (self):
854         if not self.test_instances:
855             header ("No known tests on %s"%self.line())
856         else:
857             header ("Known tests on %s"%self.line())
858             self.test_instances.sort(timestamp_sort)
859             for i in self.test_instances: print i.line()
860         if self.starting_ips:
861             header ("Starting IP addresses on %s"%self.line())
862             self.starting_ips.sort()
863             for starting in self.starting_ips: print starting
864
865 ############################################################
866 class Options: pass
867
868 class Substrate:
869
870     def __init__ (self, plcs_on_vs=True, plcs_on_lxc=False):
871         self.options=Options()
872         self.options.dry_run=False
873         self.options.verbose=False
874         self.options.reboot=False
875         self.options.soft=False
876         self.test_box = TestBox (self.test_box_spec())
877         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
878         # for compat with older LocalSubstrate
879         try:
880             self.plc_vs_boxes = [ PlcVsBox (h,m) for (h,m) in self.plc_vs_boxes_spec ()]
881             self.plc_lxc_boxes = [ PlcLxcBox (h,m) for (h,m) in self.plc_lxc_boxes_spec ()]
882         except:
883             self.plc_vs_boxes = [ PlcVsBox (h,m) for (h,m) in self.plc_boxes_spec ()]
884             self.plc_lxc_boxes = [ ]
885         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
886         self._sensed=False
887
888         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs",self)
889         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes",self)
890         
891         self.rescope (plcs_on_vs=plcs_on_vs, plcs_on_lxc=plcs_on_lxc)
892
893     def rescope(self, plcs_on_vs, plcs_on_lxc):
894         self.plc_boxes=[]
895         if plcs_on_vs: self.plc_boxes += self.plc_vs_boxes
896         if plcs_on_lxc: self.plc_boxes += self.plc_lxc_boxes
897         self.default_boxes = self.plc_boxes + self.qemu_boxes
898         self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
899
900     def fqdn (self, hostname):
901         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
902         return hostname
903
904     # return True if actual sensing takes place
905     def sense (self,force=False):
906         if self._sensed and not force: return False
907         print 'Sensing local substrate...',
908         for b in self.default_boxes: b.sense(self.options)
909         print 'Done'
910         self._sensed=True
911         return True
912
913     def list (self):
914         for b in self.default_boxes:
915             b.list()
916
917     def add_dummy_plc (self, plc_boxname, plcname):
918         for pb in self.plc_boxes:
919             if pb.hostname==plc_boxname:
920                 pb.add_dummy(plcname)
921                 return True
922     def add_dummy_qemu (self, qemu_boxname, qemuname):
923         for qb in self.qemu_boxes:
924             if qb.hostname==qemu_boxname:
925                 qb.add_dummy(qemuname)
926                 return True
927
928     def add_starting_dummy (self, bname, vname):
929         return self.add_dummy_plc (bname, vname) or self.add_dummy_qemu (bname, vname)
930
931     ########## 
932     def provision (self,plcs,options):
933         try:
934             # attach each plc to a plc box and an IP address
935             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
936             # attach each node/qemu to a qemu box with an IP address
937             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
938             # update the SFA spec accordingly
939             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
940             self.list()
941             return plcs
942         except Exception, e:
943             print '* Could not provision this test on current substrate','--',e,'--','exiting'
944             traceback.print_exc()
945             sys.exit(1)
946
947     # it is expected that a couple of options like ips_bplc and ips_vplc 
948     # are set or unset together
949     @staticmethod
950     def check_options (x,y):
951         if not x and not y: return True
952         return len(x)==len(y)
953
954     # find an available plc box (or make space)
955     # and a free IP address (using options if present)
956     def provision_plc (self, plc, options):
957         
958         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
959
960         #### let's find an IP address for that plc
961         # look in options 
962         if options.ips_vplc:
963             # this is a rerun
964             # we don't check anything here, 
965             # it is the caller's responsability to cleanup and make sure this makes sense
966             plc_boxname = options.ips_bplc.pop()
967             vplc_hostname=options.ips_vplc.pop()
968         else:
969             if self.sense(): self.list()
970             plc_boxname=None
971             vplc_hostname=None
972             # try to find an available IP 
973             self.vplc_pool.sense()
974             couple=self.vplc_pool.next_free()
975             if couple:
976                 (vplc_hostname,unused)=couple
977             #### we need to find one plc box that still has a slot
978             max_free=0
979             # use the box that has max free spots for load balancing
980             for pb in self.plc_boxes:
981                 free=pb.free_slots()
982                 if free>max_free:
983                     plc_boxname=pb.hostname
984                     max_free=free
985             # if there's no available slot in the plc_boxes, or we need a free IP address
986             # make space by killing the oldest running instance
987             if not plc_boxname or not vplc_hostname:
988                 # find the oldest of all our instances
989                 all_plc_instances=reduce(lambda x, y: x+y, 
990                                          [ pb.plc_instances for pb in self.plc_boxes ],
991                                          [])
992                 all_plc_instances.sort(timestamp_sort)
993                 try:
994                     plc_instance_to_kill=all_plc_instances[0]
995                 except:
996                     msg=""
997                     if not plc_boxname: msg += " PLC boxes are full"
998                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
999                     raise Exception,"Could not make space for a PLC instance:"+msg
1000                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
1001                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
1002                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
1003                                                                   freed_plc_boxname)
1004                 plc_instance_to_kill.kill()
1005                 # use this new plcbox if that was the problem
1006                 if not plc_boxname:
1007                     plc_boxname=freed_plc_boxname
1008                 # ditto for the IP address
1009                 if not vplc_hostname:
1010                     vplc_hostname=freed_vplc_hostname
1011                     # record in pool as mine
1012                     self.vplc_pool.set_mine(vplc_hostname)
1013
1014         # 
1015         self.add_dummy_plc(plc_boxname,plc['name'])
1016         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
1017         self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
1018
1019         #### compute a helpful vserver name
1020         # remove domain in hostname
1021         vplc_short = short_hostname(vplc_hostname)
1022         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
1023         plc_name = "%s_%s"%(plc['name'],vplc_short)
1024
1025         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
1026                           (plc['name'],plc_boxname,vplc_hostname,vservername))
1027
1028         #### apply in the plc_spec
1029         # # informative
1030         # label=options.personality.replace("linux","")
1031         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
1032                                    # 'name':'%s-'+label,
1033                                    'name': plc_name,
1034                                    'vservername':vservername,
1035                                    'vserverip':vplc_ip,
1036                                    'PLC_DB_HOST':vplc_hostname,
1037                                    'PLC_API_HOST':vplc_hostname,
1038                                    'PLC_BOOT_HOST':vplc_hostname,
1039                                    'PLC_WWW_HOST':vplc_hostname,
1040                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
1041                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
1042                                    } ) ]
1043                   }
1044
1045
1046         # mappers only work on a list of plcs
1047         return TestMapper([plc],options).map(mapper)[0]
1048
1049     ##########
1050     def provision_qemus (self, plc, options):
1051
1052         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
1053
1054         test_mapper = TestMapper ([plc], options)
1055         nodenames = test_mapper.node_names()
1056         maps=[]
1057         for nodename in nodenames:
1058
1059             if options.ips_vnode:
1060                 # as above, it's a rerun, take it for granted
1061                 qemu_boxname=options.ips_bnode.pop()
1062                 vnode_hostname=options.ips_vnode.pop()
1063             else:
1064                 if self.sense(): self.list()
1065                 qemu_boxname=None
1066                 vnode_hostname=None
1067                 # try to find an available IP 
1068                 self.vnode_pool.sense()
1069                 couple=self.vnode_pool.next_free()
1070                 if couple:
1071                     (vnode_hostname,unused)=couple
1072                 # find a physical box
1073                 max_free=0
1074                 # use the box that has max free spots for load balancing
1075                 for qb in self.qemu_boxes:
1076                     free=qb.free_slots()
1077                     if free>max_free:
1078                         qemu_boxname=qb.hostname
1079                         max_free=free
1080                 # if we miss the box or the IP, kill the oldest instance
1081                 if not qemu_boxname or not vnode_hostname:
1082                 # find the oldest of all our instances
1083                     all_qemu_instances=reduce(lambda x, y: x+y, 
1084                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
1085                                               [])
1086                     all_qemu_instances.sort(timestamp_sort)
1087                     try:
1088                         qemu_instance_to_kill=all_qemu_instances[0]
1089                     except:
1090                         msg=""
1091                         if not qemu_boxname: msg += " QEMU boxes are full"
1092                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
1093                         raise Exception,"Could not make space for a QEMU instance:"+msg
1094                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
1095                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
1096                     # kill it
1097                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
1098                                                                    freed_qemu_boxname)
1099                     qemu_instance_to_kill.kill()
1100                     # use these freed resources where needed
1101                     if not qemu_boxname:
1102                         qemu_boxname=freed_qemu_boxname
1103                     if not vnode_hostname:
1104                         vnode_hostname=freed_vnode_hostname
1105                         self.vnode_pool.set_mine(vnode_hostname)
1106
1107             self.add_dummy_qemu (qemu_boxname,vnode_hostname)
1108             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
1109             ip=self.vnode_pool.get_ip (vnode_hostname)
1110             self.vnode_pool.add_starting(vnode_hostname,qemu_boxname)
1111
1112             vnode_fqdn = self.fqdn(vnode_hostname)
1113             nodemap={'host_box':qemu_boxname,
1114                      'node_fields:hostname':vnode_fqdn,
1115                      'interface_fields:ip':ip, 
1116                      'interface_fields:mac':mac,
1117                      }
1118             nodemap.update(self.network_settings())
1119             maps.append ( (nodename, nodemap) )
1120
1121             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
1122                              (nodename,qemu_boxname,vnode_hostname,mac))
1123
1124         return test_mapper.map({'node':maps})[0]
1125
1126     def localize_sfa_rspec (self,plc,options):
1127        
1128         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
1129         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
1130         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
1131         plc['sfa']['SFA_DB_HOST'] = plc['PLC_DB_HOST']
1132         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
1133         return plc
1134
1135     #################### release:
1136     def release (self,options):
1137         self.vplc_pool.release_my_starting()
1138         self.vnode_pool.release_my_starting()
1139         pass
1140
1141     #################### show results for interactive mode
1142     def get_box (self,boxname):
1143         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1144             if b.shortname()==boxname:
1145                 return b
1146         print "Could not find box %s"%boxname
1147         return None
1148
1149     def list_boxes(self,box_or_names):
1150         print 'Sensing',
1151         for box in box_or_names:
1152             if not isinstance(box,Box): box=self.get_box(box)
1153             if not box: continue
1154             box.sense(self.options)
1155         print 'Done'
1156         for box in box_or_names:
1157             if not isinstance(box,Box): box=self.get_box(box)
1158             if not box: continue
1159             box.list()
1160
1161     def reboot_boxes(self,box_or_names):
1162         for box in box_or_names:
1163             if not isinstance(box,Box): box=self.get_box(box)
1164             if not box: continue
1165             box.reboot(self.options)
1166
1167     ####################
1168     # can be run as a utility to manage the local infrastructure
1169     def main (self):
1170         parser=OptionParser()
1171         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1172                            help='reboot mode (use shutdown -r)')
1173         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1174                            help='soft mode for reboot (vserver stop or kill qemus)')
1175         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1176                            help='add test box') 
1177         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1178                            help='add build boxes')
1179         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1180                            help='add plc boxes')
1181         parser.add_option ('-X', "--lxc",action='store_true',dest='plcs_use_lxc',
1182                            help='use lxc-enabled plc boxes instead of vs-enabled ones')
1183         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1184                            help='add qemu boxes') 
1185         parser.add_option ('-a',"--all",action='store_true',dest='all',default=False,
1186                            help='address all known  boxes, like -b -t -p -q')
1187         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1188                            help='verbose mode')
1189         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1190                            help='dry run mode')
1191         (self.options,args)=parser.parse_args()
1192
1193         if self.options.plcs_use_lxc:
1194             self.rescope (plcs_on_vs=False, plcs_on_lxc=True)
1195
1196         boxes=args
1197         if self.options.testbox: boxes += [self.test_box]
1198         if self.options.builds: boxes += self.build_boxes
1199         if self.options.plcs: boxes += self.plc_boxes
1200         if self.options.qemus: boxes += self.qemu_boxes
1201         if self.options.all: boxes += self.all_boxes
1202         
1203         # default scope is -b -p -q
1204         if not boxes:
1205             boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes
1206
1207         if self.options.reboot: self.reboot_boxes (boxes)
1208         else:                   self.list_boxes (boxes)