need to keep track of the assgned box for starting instances to avoid overprovisioning
[tests.git] / system / Substrate.py
1 #
2 # Thierry Parmentelat <thierry.parmentelat@inria.fr>
3 # Copyright (C) 2010 INRIA 
4 #
5 # #################### history
6 #
7 # see also Substrate.readme
8 #
9 # This is a complete rewrite of TestResources/Tracker/Pool
10 # we don't use trackers anymore and just probe/sense the running 
11 # boxes to figure out where we are
12 # in order to implement some fairness in the round-robin allocation scheme
13 # we need an indication of the 'age' of each running entity, 
14 # hence the 'timestamp-*' steps in TestPlc
15
16 # this should be much more flexible:
17 # * supports several plc boxes 
18 # * supports several qemu guests per host
19 # * no need to worry about tracker being in sync or not
20 #
21 # #################### howto use
22 #
23 # each site is to write its own LocalSubstrate.py, 
24 # (see e.g. LocalSubstrate.inria.py)
25 # LocalSubstrate.py is expected to be in /root on the testmaster box
26 # and needs to define
27 # MYPLCs
28 # . the vserver-capable boxes used for hosting myplcs
29 # .  and their admissible load (max # of myplcs)
30 # . the pool of DNS-names and IP-addresses available for myplcs
31 # QEMU nodes
32 # . the kvm-qemu capable boxes to host qemu instances
33 # .  and their admissible load (max # of myplcs)
34 # . the pool of DNS-names and IP-addresses available for nodes
35
36 # #################### implem. note
37
38 # this model relies on 'sensing' the substrate, 
39 # i.e. probing all the boxes for their running instances of vservers and qemu
40 # this is how we get rid of tracker inconsistencies 
41 # however there is a 'black hole' between the time where a given address is 
42 # allocated and when it actually gets used/pingable
43 # this is why we still need a shared knowledge among running tests
44 # in a file named /root/starting
45 # this is connected to the Pool class 
46
47 # ####################
48
49 import os.path, sys
50 import time
51 import re
52 import traceback
53 import subprocess
54 import commands
55 import socket
56 from optparse import OptionParser
57
58 import utils
59 from TestSsh import TestSsh
60 from TestMapper import TestMapper
61
62 def header (message,banner=True):
63     if not message: return
64     if banner: print "===============",
65     print message
66     sys.stdout.flush()
67
68 def timestamp_sort(o1,o2): return o1.timestamp-o2.timestamp
69
70 def short_hostname (hostname):
71     return hostname.split('.')[0]
72
73 ####################
74 # the place were other test instances tell about their not-yet-started
75 # instances, that go undetected through sensing
76 class Starting:
77
78     location='/root/starting'
79     def __init__ (self):
80         self.tuples=[]
81
82     def load (self):
83         try:    self.tuples=[line.strip().split('@') 
84                              for line in file(Starting.location).readlines()]
85         except: self.tuples=[]
86
87     def vnames (self) : 
88         self.load()
89         return [ x for (x,_) in self.tuples ]
90
91     def add (self, vname, bname):
92         if not vname in self.vnames():
93             file(Starting.location,'a').write("%s@%s\n"%(vname,bname))
94             
95     def delete_vname (self, vname):
96         self.load()
97         if vname in self.vnames():
98             f=file(Starting.location,'w')
99             for (v,b) in self.tuples: 
100                 if v != vname: f.write("%s@%s\n"%(v,b))
101             f.close()
102     
103 ####################
104 # pool class
105 # allows to pick an available IP among a pool
106 # input is expressed as a list of tuples (hostname,ip,user_data)
107 # that can be searched iteratively for a free slot
108 # e.g.
109 # pool = [ (hostname1,user_data1),  
110 #          (hostname2,user_data2),  
111 #          (hostname3,user_data2),  
112 #          (hostname4,user_data4) ]
113 # assuming that ip1 and ip3 are taken (pingable), then we'd get
114 # pool=Pool(pool)
115 # pool.next_free() -> entry2
116 # pool.next_free() -> entry4
117 # pool.next_free() -> None
118 # that is, even if ip2 is not busy/pingable when the second next_free() is issued
119
120 class PoolItem:
121     def __init__ (self,hostname,userdata):
122         self.hostname=hostname
123         self.userdata=userdata
124         # slot holds 'busy' or 'free' or 'mine' or 'starting' or None
125         # 'mine' is for our own stuff, 'starting' from the concurrent tests
126         self.status=None
127         self.ip=None
128
129     def line(self):
130         return "Pooled %s (%s) -> %s"%(self.hostname,self.userdata, self.status)
131
132     def char (self):
133         if   self.status==None:       return '?'
134         elif self.status=='busy':     return '+'
135         elif self.status=='free':     return '-'
136         elif self.status=='mine':     return 'M'
137         elif self.status=='starting': return 'S'
138
139     def get_ip(self):
140         if self.ip: return self.ip
141         ip=socket.gethostbyname(self.hostname)
142         self.ip=ip
143         return ip
144
145 class Pool:
146
147     def __init__ (self, tuples,message, substrate):
148         self.pool_items= [ PoolItem (hostname,userdata) for (hostname,userdata) in tuples ] 
149         self.message=message
150         # where to send notifications upon load_starting
151         self.substrate=substrate
152
153     def list (self):
154         for i in self.pool_items: print i.line()
155
156     def line (self):
157         line=self.message
158         for i in self.pool_items: line += ' ' + i.char()
159         return line
160
161     def _item (self, hostname):
162         for i in self.pool_items: 
163             if i.hostname==hostname: return i
164         raise Exception ("Could not locate hostname %s in pool %s"%(hostname,self.message))
165
166     def retrieve_userdata (self, hostname): 
167         return self._item(hostname).userdata
168
169     def get_ip (self, hostname):
170         try:    return self._item(hostname).get_ip()
171         except: return socket.gethostbyname(hostname)
172         
173     def set_mine (self, hostname):
174         try:
175             self._item(hostname).status='mine'
176         except:
177             print 'WARNING: host %s not found in IP pool %s'%(hostname,self.message)
178
179     def next_free (self):
180         for i in self.pool_items:
181             if i.status == 'free':
182                 i.status='mine'
183                 return (i.hostname,i.userdata)
184         return None
185
186     ####################
187     # we have a starting instance of our own
188     def add_starting (self, vname, bname):
189         Starting().add(vname,bname)
190         for i in self.pool_items:
191             if i.hostname==vname: i.status='mine'
192
193     # load the starting instances from the common file
194     # remember that might be ours
195     # return the list of (vname,bname) that are not ours
196     def load_starting (self):
197         starting=Starting()
198         starting.load()
199         new_tuples=[]
200         for (v,b) in starting.tuples:
201             for i in self.pool_items:
202                 if i.hostname==v and i.status=='free':
203                     i.status='starting'
204                     new_tuples.append( (v,b,) )
205         return new_tuples
206
207     def release_my_starting (self):
208         for i in self.pool_items:
209             if i.status=='mine':
210                 Starting().delete_vname (i.hostname)
211                 i.status=None
212
213
214     ##########
215     def _sense (self):
216         for item in self.pool_items:
217             if item.status is not None: 
218                 print item.char(),
219                 continue
220             if self.check_ping (item.hostname): 
221                 item.status='busy'
222                 print '*',
223             else:
224                 item.status='free'
225                 print '.',
226     
227     def sense (self):
228         print 'Sensing IP pool',self.message,
229         self._sense()
230         print 'Done'
231         for (vname,bname) in self.load_starting():
232             self.substrate.add_starting_dummy (bname, vname)
233         print 'After starting: IP pool'
234         print self.line()
235     # OS-dependent ping option (support for macos, for convenience)
236     ping_timeout_option = None
237     # returns True when a given hostname/ip responds to ping
238     def check_ping (self,hostname):
239         if not Pool.ping_timeout_option:
240             (status,osname) = commands.getstatusoutput("uname -s")
241             if status != 0:
242                 raise Exception, "TestPool: Cannot figure your OS name"
243             if osname == "Linux":
244                 Pool.ping_timeout_option="-w"
245             elif osname == "Darwin":
246                 Pool.ping_timeout_option="-t"
247
248         command="ping -c 1 %s 1 %s"%(Pool.ping_timeout_option,hostname)
249         (status,output) = commands.getstatusoutput(command)
250         return status == 0
251
252 ####################
253 class Box:
254     def __init__ (self,hostname):
255         self.hostname=hostname
256         self._probed=None
257     def shortname (self):
258         return short_hostname(self.hostname)
259     def test_ssh (self): return TestSsh(self.hostname,username='root',unknown_host=False)
260     def reboot (self, options):
261         self.test_ssh().run("shutdown -r now",message="Rebooting %s"%self.hostname,
262                             dry_run=options.dry_run)
263
264     def uptime(self):
265         if hasattr(self,'_uptime') and self._uptime: return self._uptime
266         return '*undef* uptime'
267     def sense_uptime (self):
268         command=['uptime']
269         self._uptime=self.backquote_ssh(command,trash_err=True).strip()
270         if not self._uptime: self._uptime='unreachable'
271
272     def run(self,argv,message=None,trash_err=False,dry_run=False):
273         if dry_run:
274             print 'DRY_RUN:',
275             print " ".join(argv)
276             return 0
277         else:
278             header(message)
279             if not trash_err:
280                 return subprocess.call(argv)
281             else:
282                 return subprocess.call(argv,stderr=file('/dev/null','w'))
283                 
284     def run_ssh (self, argv, message, trash_err=False, dry_run=False):
285         ssh_argv = self.test_ssh().actual_argv(argv)
286         result=self.run (ssh_argv, message, trash_err, dry_run=dry_run)
287         if result!=0:
288             print "WARNING: failed to run %s on %s"%(" ".join(argv),self.hostname)
289         return result
290
291     def backquote (self, argv, trash_err=False):
292         # print 'running backquote',argv
293         if not trash_err:
294             result= subprocess.Popen(argv,stdout=subprocess.PIPE).communicate()[0]
295         else:
296             result= subprocess.Popen(argv,stdout=subprocess.PIPE,stderr=file('/dev/null','w')).communicate()[0]
297         return result
298
299     def probe (self):
300         if self._probed is not None: return self._probed
301         # first probe the ssh link
302         probe_argv=self.test_ssh().actual_argv(['hostname'])
303         self._probed=self.backquote ( probe_argv, trash_err=True )
304         if not self._probed: print "root@%s unreachable"%self.hostname
305         return self._probed
306
307     # use argv=['bash','-c',"the command line"]
308     # if you have any shell-expanded arguments like *
309     # and if there's any chance the command is adressed to the local host
310     def backquote_ssh (self, argv, trash_err=False):
311         if not self.probe(): return ''
312         return self.backquote( self.test_ssh().actual_argv(argv), trash_err)
313
314 ############################################################
315 class BuildInstance:
316     def __init__ (self, buildname, pid, buildbox):
317         self.buildname=buildname
318         self.buildbox=buildbox
319         self.pids=[pid]
320
321     def add_pid(self,pid):
322         self.pids.append(pid)
323
324     def line (self):
325         return "== %s == (pids=%r)"%(self.buildname,self.pids)
326
327 class BuildBox (Box):
328     def __init__ (self,hostname):
329         Box.__init__(self,hostname)
330         self.build_instances=[]
331
332     def add_build (self,buildname,pid):
333         for build in self.build_instances:
334             if build.buildname==buildname: 
335                 build.add_pid(pid)
336                 return
337         self.build_instances.append(BuildInstance(buildname, pid, self))
338
339     def list(self):
340         if not self.build_instances: 
341             header ('No build process on %s (%s)'%(self.hostname,self.uptime()))
342         else:
343             header ("Builds on %s (%s)"%(self.hostname,self.uptime()))
344             for b in self.build_instances: 
345                 header (b.line(),banner=False)
346
347     def reboot (self, options):
348         if not options.soft:
349             self.reboot(options)
350         else:
351             command=['pkill','vbuild']
352             self.run_ssh(command,"Terminating vbuild processes",dry_run=options.dry_run)
353
354     # inspect box and find currently running builds
355     matcher=re.compile("\s*(?P<pid>[0-9]+).*-[bo]\s+(?P<buildname>[^\s]+)(\s|\Z)")
356     def sense(self, options):
357         print 'b',
358         self.sense_uptime()
359         pids=self.backquote_ssh(['pgrep','vbuild'],trash_err=True)
360         if not pids: return
361         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
362         ps_lines=self.backquote_ssh (command).split('\n')
363         for line in ps_lines:
364             if not line.strip() or line.find('PID')>=0: continue
365             m=BuildBox.matcher.match(line)
366             if m: 
367                 date=time.strftime('%Y-%m-%d',time.localtime(time.time()))
368                 buildname=m.group('buildname').replace('@DATE@',date)
369                 self.add_build (buildname,m.group('pid'))
370             else: header('command %r returned line that failed to match'%command)
371
372 ############################################################
373 class PlcInstance:
374     def __init__ (self, vservername, ctxid, plcbox):
375         self.vservername=vservername
376         self.ctxid=ctxid
377         self.plc_box=plcbox
378         # unknown yet
379         self.timestamp=0
380
381     def set_timestamp (self,timestamp): self.timestamp=timestamp
382     def set_now (self): self.timestamp=int(time.time())
383     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
384
385     def vplcname (self):
386         return self.vservername.split('-')[-1]
387     def buildname (self):
388         return self.vservername.rsplit('-',2)[0]
389
390     def line (self):
391         msg="== %s =="%(self.vplcname())
392         msg += " [=%s]"%self.vservername
393         if self.ctxid==0:  msg+=" not (yet?) running"
394         else:              msg+=" (ctx=%s)"%self.ctxid     
395         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
396         else:              msg += " *unknown timestamp*"
397         return msg
398
399     def kill (self):
400         msg="vserver stopping %s on %s"%(self.vservername,self.plc_box.hostname)
401         self.plc_box.run_ssh(['vserver',self.vservername,'stop'],msg)
402         self.plc_box.forget(self)
403
404 class PlcBox (Box):
405     def __init__ (self, hostname, max_plcs):
406         Box.__init__(self,hostname)
407         self.plc_instances=[]
408         self.max_plcs=max_plcs
409
410     def add_vserver (self,vservername,ctxid):
411         for plc in self.plc_instances:
412             if plc.vservername==vservername: 
413                 header("WARNING, duplicate myplc %s running on %s"%\
414                            (vservername,self.hostname),banner=False)
415                 return
416         self.plc_instances.append(PlcInstance(vservername,ctxid,self))
417     
418     def forget (self, plc_instance):
419         self.plc_instances.remove(plc_instance)
420
421     # fill one slot even though this one is not started yet
422     def add_dummy (self, plcname):
423         dummy=PlcInstance('dummy_'+plcname,0,self)
424         dummy.set_now()
425         self.plc_instances.append(dummy)
426
427     def line(self): 
428         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_plcs,self.free_spots(),self.uname())
429         return msg
430         
431     def list(self):
432         if not self.plc_instances: 
433             header ('No vserver running on %s'%(self.line()))
434         else:
435             header ("Active plc VMs on %s"%self.line())
436             self.plc_instances.sort(timestamp_sort)
437             for p in self.plc_instances: 
438                 header (p.line(),banner=False)
439
440     def free_spots (self):
441         return self.max_plcs - len(self.plc_instances)
442
443     def uname(self):
444         if hasattr(self,'_uname') and self._uname: return self._uname
445         return '*undef* uname'
446
447     def plc_instance_by_vservername (self, vservername):
448         for p in self.plc_instances:
449             if p.vservername==vservername: return p
450         return None
451
452     def reboot (self, options):
453         if not options.soft:
454             self.reboot(options)
455         else:
456             self.run_ssh(['service','util-vserver','stop'],"Stopping all running vservers",
457                          dry_run=options.dry_run)
458
459     def sense (self, options):
460         print 'p',
461         self._uname=self.backquote_ssh(['uname','-r']).strip()
462         # try to find fullname (vserver_stat truncates to a ridiculously short name)
463         # fetch the contexts for all vservers on that box
464         map_command=['grep','.','/etc/vservers/*/context','/dev/null',]
465         context_map=self.backquote_ssh (map_command)
466         # at this point we have a set of lines like
467         # /etc/vservers/2010.01.20--k27-f12-32-vplc03/context:40144
468         ctx_dict={}
469         for map_line in context_map.split("\n"):
470             if not map_line: continue
471             [path,xid] = map_line.split(':')
472             ctx_dict[xid]=os.path.basename(os.path.dirname(path))
473         # at this point ctx_id maps context id to vservername
474
475         command=['vserver-stat']
476         vserver_stat = self.backquote_ssh (command)
477         for vserver_line in vserver_stat.split("\n"):
478             if not vserver_line: continue
479             context=vserver_line.split()[0]
480             if context=="CTX": continue
481             try:
482                 longname=ctx_dict[context]
483                 self.add_vserver(longname,context)
484             except:
485                 print 'WARNING: found ctx %s in vserver_stat but was unable to figure a corresp. vserver'%context
486
487         # scan timestamps 
488         running_vsnames = [ i.vservername for i in self.plc_instances ]
489         command=   ['grep','.']
490         command += ['/vservers/%s.timestamp'%vs for vs in running_vsnames]
491         command += ['/dev/null']
492         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
493         for ts_line in ts_lines:
494             if not ts_line.strip(): continue
495             # expect /vservers/<vservername>.timestamp:<timestamp>
496             try:
497                 (ts_file,timestamp)=ts_line.split(':')
498                 ts_file=os.path.basename(ts_file)
499                 (vservername,_)=os.path.splitext(ts_file)
500                 timestamp=int(timestamp)
501                 p=self.plc_instance_by_vservername(vservername)
502                 if not p: 
503                     print 'WARNING zombie plc',self.hostname,ts_line
504                     print '... was expecting',vservername,'in',[i.vservername for i in self.plc_instances]
505                     continue
506                 p.set_timestamp(timestamp)
507             except:  print 'WARNING, could not parse ts line',ts_line
508         
509
510
511
512 ############################################################
513 class QemuInstance: 
514     def __init__ (self, nodename, pid, qemubox):
515         self.nodename=nodename
516         self.pid=pid
517         self.qemu_box=qemubox
518         # not known yet
519         self.buildname=None
520         self.timestamp=0
521         
522     def set_buildname (self,buildname): self.buildname=buildname
523     def set_timestamp (self,timestamp): self.timestamp=timestamp
524     def set_now (self): self.timestamp=int(time.time())
525     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
526     
527     def line (self):
528         msg = "== %s =="%(short_hostname(self.nodename))
529         msg += " [=%s]"%self.buildname
530         if self.pid:       msg += " (pid=%s)"%self.pid
531         else:              msg += " not (yet?) running"
532         if self.timestamp: msg += " @ %s"%self.pretty_timestamp()
533         else:              msg += " *unknown timestamp*"
534         return msg
535     
536     def kill(self):
537         if self.pid==0: 
538             print "cannot kill qemu %s with pid==0"%self.nodename
539             return
540         msg="Killing qemu %s with pid=%s on box %s"%(self.nodename,self.pid,self.qemu_box.hostname)
541         self.qemu_box.run_ssh(['kill',"%s"%self.pid],msg)
542         self.qemu_box.forget(self)
543
544
545 class QemuBox (Box):
546     def __init__ (self, hostname, max_qemus):
547         Box.__init__(self,hostname)
548         self.qemu_instances=[]
549         self.max_qemus=max_qemus
550
551     def add_node (self,nodename,pid):
552         for qemu in self.qemu_instances:
553             if qemu.nodename==nodename: 
554                 header("WARNING, duplicate qemu %s running on %s"%\
555                            (nodename,self.hostname), banner=False)
556                 return
557         self.qemu_instances.append(QemuInstance(nodename,pid,self))
558
559     def forget (self, qemu_instance):
560         self.qemu_instances.remove(qemu_instance)
561
562     # fill one slot even though this one is not started yet
563     def add_dummy (self, nodename):
564         dummy=QemuInstance('dummy_'+nodename,0,self)
565         dummy.set_now()
566         self.qemu_instances.append(dummy)
567
568     def line (self):
569         msg="%s [max=%d,%d free] (%s)"%(self.hostname, self.max_qemus,self.free_spots(),self.driver())
570         return msg
571
572     def list(self):
573         if not self.qemu_instances: 
574             header ('No qemu process on %s'%(self.line()))
575         else:
576             header ("Active qemu processes on %s"%(self.line()))
577             self.qemu_instances.sort(timestamp_sort)
578             for q in self.qemu_instances: 
579                 header (q.line(),banner=False)
580
581     def free_spots (self):
582         return self.max_qemus - len(self.qemu_instances)
583
584     def driver(self):
585         if hasattr(self,'_driver') and self._driver: return self._driver
586         return '*undef* driver'
587
588     def qemu_instance_by_pid (self,pid):
589         for q in self.qemu_instances:
590             if q.pid==pid: return q
591         return None
592
593     def qemu_instance_by_nodename_buildname (self,nodename,buildname):
594         for q in self.qemu_instances:
595             if q.nodename==nodename and q.buildname==buildname:
596                 return q
597         return None
598
599     def reboot (self, options):
600         if not options.soft:
601             self.reboot(options)
602         else:
603             self.run_ssh(['pkill','qemu'],"Killing qemu instances",
604                          dry_run=options.dry_run)
605
606     matcher=re.compile("\s*(?P<pid>[0-9]+).*-cdrom\s+(?P<nodename>[^\s]+)\.iso")
607     def sense(self, options):
608         print 'q',
609         modules=self.backquote_ssh(['lsmod']).split('\n')
610         self._driver='*NO kqemu/kmv_intel MODULE LOADED*'
611         for module in modules:
612             if module.find('kqemu')==0:
613                 self._driver='kqemu module loaded'
614             # kvm might be loaded without vkm_intel (we dont have AMD)
615             elif module.find('kvm_intel')==0:
616                 self._driver='kvm_intel module loaded'
617         ########## find out running pids
618         pids=self.backquote_ssh(['pgrep','qemu'])
619         if not pids: return
620         command=['ps','-o','pid,command'] + [ pid for pid in pids.split("\n") if pid]
621         ps_lines = self.backquote_ssh (command).split("\n")
622         for line in ps_lines:
623             if not line.strip() or line.find('PID') >=0 : continue
624             m=QemuBox.matcher.match(line)
625             if m: self.add_node (m.group('nodename'),m.group('pid'))
626             else: header('command %r returned line that failed to match'%command)
627         ########## retrieve alive instances and map to build
628         live_builds=[]
629         command=['grep','.','*/*/qemu.pid','/dev/null']
630         pid_lines=self.backquote_ssh(command,trash_err=True).split('\n')
631         for pid_line in pid_lines:
632             if not pid_line.strip(): continue
633             # expect <build>/<nodename>/qemu.pid:<pid>pid
634             try:
635                 (buildname,nodename,tail)=pid_line.split('/')
636                 (_,pid)=tail.split(':')
637                 q=self.qemu_instance_by_pid (pid)
638                 if not q: continue
639                 q.set_buildname(buildname)
640                 live_builds.append(buildname)
641             except: print 'WARNING, could not parse pid line',pid_line
642         # retrieve timestamps
643         if not live_builds: return
644         command=   ['grep','.']
645         command += ['%s/*/timestamp'%b for b in live_builds]
646         command += ['/dev/null']
647         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
648         for ts_line in ts_lines:
649             if not ts_line.strip(): continue
650             # expect <build>/<nodename>/timestamp:<timestamp>
651             try:
652                 (buildname,nodename,tail)=ts_line.split('/')
653                 nodename=nodename.replace('qemu-','')
654                 (_,timestamp)=tail.split(':')
655                 timestamp=int(timestamp)
656                 q=self.qemu_instance_by_nodename_buildname(nodename,buildname)
657                 if not q: 
658                     print 'WARNING zombie qemu',self.hostname,ts_line
659                     print '... was expecting (',short_hostname(nodename),buildname,') in',\
660                         [ (short_hostname(i.nodename),i.buildname) for i in self.qemu_instances ]
661                     continue
662                 q.set_timestamp(timestamp)
663             except:  print 'WARNING, could not parse ts line',ts_line
664
665 ####################
666 class TestInstance:
667     def __init__ (self, buildname, pid=0):
668         self.pids=[]
669         if pid!=0: self.pid.append(pid)
670         self.buildname=buildname
671         # latest trace line
672         self.trace=''
673         # has a KO test
674         self.broken_steps=[]
675         self.timestamp = 0
676
677     def set_timestamp (self,timestamp): self.timestamp=timestamp
678     def set_now (self): self.timestamp=int(time.time())
679     def pretty_timestamp (self): return time.strftime("%Y-%m-%d:%H-%M",time.localtime(self.timestamp))
680
681
682     def add_pid (self,pid):
683         self.pids.append(pid)
684     def set_broken (self,plcindex, step): 
685         self.broken_steps.append ( (plcindex, step,) )
686
687     def line (self):
688         double='=='
689         if self.pids: double='*'+double[1]
690         if self.broken_steps: double=double[0]+'B'
691         msg = " %s %s =="%(double,self.buildname)
692         if not self.pids:       pass
693         elif len(self.pids)==1: msg += " (pid=%s)"%self.pids[0]
694         else:                   msg += " !!!pids=%s!!!"%self.pids
695         msg += " @%s"%self.pretty_timestamp()
696         if self.broken_steps:
697             msg += "\n BROKEN IN STEPS"
698             for (i,s) in self.broken_steps: msg += " %s@%s"%(s,i)
699         return msg
700
701 class TestBox (Box):
702     def __init__ (self,hostname):
703         Box.__init__(self,hostname)
704         self.starting_ips=[]
705         self.test_instances=[]
706
707     def reboot (self, options):
708         # can't reboot a vserver VM
709         self.run_ssh (['pkill','run_log'],"Terminating current runs",
710                       dry_run=options.dry_run)
711         self.run_ssh (['rm','-f',Starting.location],"Cleaning %s"%Starting.location,
712                       dry_run=options.dry_run)
713
714     def get_test (self, buildname):
715         for i in self.test_instances:
716             if i.buildname==buildname: return i
717
718     # we scan ALL remaining test results, even the ones not running
719     def add_timestamp (self, buildname, timestamp):
720         i=self.get_test(buildname)
721         if i:   
722             i.set_timestamp(timestamp)
723         else:   
724             i=TestInstance(buildname,0)
725             i.set_timestamp(timestamp)
726             self.test_instances.append(i)
727
728     def add_running_test (self, pid, buildname):
729         i=self.get_test(buildname)
730         if not i:
731             self.test_instances.append (TestInstance (buildname,pid))
732             return
733         if i.pids:
734             print "WARNING: 2 concurrent tests run on same build %s"%buildname
735         i.add_pid (pid)
736
737     def add_broken (self, buildname, plcindex, step):
738         i=self.get_test(buildname)
739         if not i:
740             i=TestInstance(buildname)
741             self.test_instances.append(i)
742         i.set_broken(plcindex, step)
743
744     matcher_proc=re.compile (".*/proc/(?P<pid>[0-9]+)/cwd.*/root/(?P<buildname>[^/]+)$")
745     matcher_grep=re.compile ("/root/(?P<buildname>[^/]+)/logs/trace.*:TRACE:\s*(?P<plcindex>[0-9]+).*step=(?P<step>\S+).*")
746     def sense (self, options):
747         print 't',
748         self.sense_uptime()
749         self.starting_ips=[x for x in self.backquote_ssh(['cat',Starting.location], trash_err=True).strip().split('\n') if x]
750
751         # scan timestamps on all tests
752         # this is likely to not invoke ssh so we need to be a bit smarter to get * expanded
753         # xxx would make sense above too
754         command=['bash','-c',"grep . /root/*/timestamp /dev/null"]
755         ts_lines=self.backquote_ssh(command,trash_err=True).split('\n')
756         for ts_line in ts_lines:
757             if not ts_line.strip(): continue
758             # expect /root/<buildname>/timestamp:<timestamp>
759             try:
760                 (ts_file,timestamp)=ts_line.split(':')
761                 ts_file=os.path.dirname(ts_file)
762                 buildname=os.path.basename(ts_file)
763                 timestamp=int(timestamp)
764                 t=self.add_timestamp(buildname,timestamp)
765             except:  print 'WARNING, could not parse ts line',ts_line
766
767         command=['bash','-c',"grep KO /root/*/logs/trace* /dev/null" ]
768         trace_lines=self.backquote_ssh (command).split('\n')
769         for line in trace_lines:
770             if not line.strip(): continue
771             m=TestBox.matcher_grep.match(line)
772             if m: 
773                 buildname=m.group('buildname')
774                 plcindex=m.group('plcindex')
775                 step=m.group('step')
776                 self.add_broken(buildname,plcindex, step)
777             else: header("command %r returned line that failed to match\n%s"%(command,line))
778
779         pids = self.backquote_ssh (['pgrep','run_log'],trash_err=True)
780         if not pids: return
781         command=['ls','-ld'] + ["/proc/%s/cwd"%pid for pid in pids.split("\n") if pid]
782         ps_lines=self.backquote_ssh (command).split('\n')
783         for line in ps_lines:
784             if not line.strip(): continue
785             m=TestBox.matcher_proc.match(line)
786             if m: 
787                 pid=m.group('pid')
788                 buildname=m.group('buildname')
789                 self.add_running_test(pid, buildname)
790             else: header("command %r returned line that failed to match\n%s"%(command,line))
791         
792         
793     def line (self):
794         return "%s (%s)"%(self.hostname,self.uptime())
795
796     def list (self):
797         if not self.test_instances:
798             header ("No known tests on %s"%self.line())
799         else:
800             header ("Known tests on %s"%self.line())
801             self.test_instances.sort(timestamp_sort)
802             for i in self.test_instances: print i.line()
803         if self.starting_ips:
804             header ("Starting IP addresses on %s"%self.line())
805             self.starting_ips.sort()
806             for starting in self.starting_ips: print starting
807
808 ############################################################
809 class Options: pass
810
811 class Substrate:
812
813     def __init__ (self):
814         self.options=Options()
815         self.options.dry_run=False
816         self.options.verbose=False
817         self.options.reboot=False
818         self.options.soft=False
819         self.test_box = TestBox (self.test_box_spec())
820         self.build_boxes = [ BuildBox(h) for h in self.build_boxes_spec() ]
821         self.plc_boxes = [ PlcBox (h,m) for (h,m) in self.plc_boxes_spec ()]
822         self.qemu_boxes = [ QemuBox (h,m) for (h,m) in self.qemu_boxes_spec ()]
823         self.default_boxes = self.plc_boxes + self.qemu_boxes
824         self.all_boxes = self.build_boxes + [ self.test_box ] + self.plc_boxes + self.qemu_boxes
825         self._sensed=False
826
827         self.vplc_pool = Pool (self.vplc_ips(),"for vplcs",self)
828         self.vnode_pool = Pool (self.vnode_ips(),"for vnodes",self)
829
830     def fqdn (self, hostname):
831         if hostname.find('.')<0: return "%s.%s"%(hostname,self.domain())
832         return hostname
833
834     # return True if actual sensing takes place
835     def sense (self,force=False):
836         if self._sensed and not force: return False
837         print 'Sensing local substrate...',
838         for b in self.default_boxes: b.sense(self.options)
839         print 'Done'
840         self._sensed=True
841         return True
842
843     def list (self):
844         for b in self.default_boxes:
845             b.list()
846
847     def add_dummy_plc (self, plc_boxname, plcname):
848         for pb in self.plc_boxes:
849             if pb.hostname==plc_boxname:
850                 pb.add_dummy(plcname)
851                 return True
852     def add_dummy_qemu (self, qemu_boxname, qemuname):
853         for qb in self.qemu_boxes:
854             if qb.hostname==qemu_boxname:
855                 qb.add_dummy(qemuname)
856                 return True
857
858     def add_starting_dummy (self, bname, vname):
859         return self.add_dummy_plc (bname, vname) or self.add_dummy_qemu (bname, vname)
860
861     ########## 
862     def provision (self,plcs,options):
863         try:
864             # attach each plc to a plc box and an IP address
865             plcs = [ self.provision_plc (plc,options) for plc in plcs ]
866             # attach each node/qemu to a qemu box with an IP address
867             plcs = [ self.provision_qemus (plc,options) for plc in plcs ]
868             # update the SFA spec accordingly
869             plcs = [ self.localize_sfa_rspec(plc,options) for plc in plcs ]
870             self.list()
871             return plcs
872         except Exception, e:
873             print '* Could not provision this test on current substrate','--',e,'--','exiting'
874             traceback.print_exc()
875             sys.exit(1)
876
877     # it is expected that a couple of options like ips_bplc and ips_vplc 
878     # are set or unset together
879     @staticmethod
880     def check_options (x,y):
881         if not x and not y: return True
882         return len(x)==len(y)
883
884     # find an available plc box (or make space)
885     # and a free IP address (using options if present)
886     def provision_plc (self, plc, options):
887         
888         assert Substrate.check_options (options.ips_bplc, options.ips_vplc)
889
890         #### let's find an IP address for that plc
891         # look in options 
892         if options.ips_vplc:
893             # this is a rerun
894             # we don't check anything here, 
895             # it is the caller's responsability to cleanup and make sure this makes sense
896             plc_boxname = options.ips_bplc.pop()
897             vplc_hostname=options.ips_vplc.pop()
898         else:
899             if self.sense(): self.list()
900             plc_boxname=None
901             vplc_hostname=None
902             # try to find an available IP 
903             self.vplc_pool.sense()
904             couple=self.vplc_pool.next_free()
905             if couple:
906                 (vplc_hostname,unused)=couple
907             #### we need to find one plc box that still has a slot
908             max_free=0
909             # use the box that has max free spots for load balancing
910             for pb in self.plc_boxes:
911                 free=pb.free_spots()
912                 if free>max_free:
913                     plc_boxname=pb.hostname
914                     max_free=free
915             # if there's no available slot in the plc_boxes, or we need a free IP address
916             # make space by killing the oldest running instance
917             if not plc_boxname or not vplc_hostname:
918                 # find the oldest of all our instances
919                 all_plc_instances=reduce(lambda x, y: x+y, 
920                                          [ pb.plc_instances for pb in self.plc_boxes ],
921                                          [])
922                 all_plc_instances.sort(timestamp_sort)
923                 try:
924                     plc_instance_to_kill=all_plc_instances[0]
925                 except:
926                     msg=""
927                     if not plc_boxname: msg += " PLC boxes are full"
928                     if not vplc_hostname: msg += " vplc IP pool exhausted" 
929                     raise Exception,"Could not make space for a PLC instance:"+msg
930                 freed_plc_boxname=plc_instance_to_kill.plc_box.hostname
931                 freed_vplc_hostname=plc_instance_to_kill.vplcname()
932                 message='killing oldest plc instance = %s on %s'%(plc_instance_to_kill.line(),
933                                                                   freed_plc_boxname)
934                 plc_instance_to_kill.kill()
935                 # use this new plcbox if that was the problem
936                 if not plc_boxname:
937                     plc_boxname=freed_plc_boxname
938                 # ditto for the IP address
939                 if not vplc_hostname:
940                     vplc_hostname=freed_vplc_hostname
941                     # record in pool as mine
942                     self.vplc_pool.set_mine(vplc_hostname)
943
944         # 
945         self.add_dummy_plc(plc_boxname,plc['name'])
946         vplc_ip = self.vplc_pool.get_ip(vplc_hostname)
947         self.vplc_pool.add_starting(vplc_hostname, plc_boxname)
948
949         #### compute a helpful vserver name
950         # remove domain in hostname
951         vplc_short = short_hostname(vplc_hostname)
952         vservername = "%s-%d-%s" % (options.buildname,plc['index'],vplc_short)
953         plc_name = "%s_%s"%(plc['name'],vplc_short)
954
955         utils.header( 'PROVISION plc %s in box %s at IP %s as %s'%\
956                           (plc['name'],plc_boxname,vplc_hostname,vservername))
957
958         #### apply in the plc_spec
959         # # informative
960         # label=options.personality.replace("linux","")
961         mapper = {'plc': [ ('*' , {'host_box':plc_boxname,
962                                    # 'name':'%s-'+label,
963                                    'name': plc_name,
964                                    'vservername':vservername,
965                                    'vserverip':vplc_ip,
966                                    'PLC_DB_HOST':vplc_hostname,
967                                    'PLC_API_HOST':vplc_hostname,
968                                    'PLC_BOOT_HOST':vplc_hostname,
969                                    'PLC_WWW_HOST':vplc_hostname,
970                                    'PLC_NET_DNS1' : self.network_settings() [ 'interface_fields:dns1' ],
971                                    'PLC_NET_DNS2' : self.network_settings() [ 'interface_fields:dns2' ],
972                                    } ) ]
973                   }
974
975
976         # mappers only work on a list of plcs
977         return TestMapper([plc],options).map(mapper)[0]
978
979     ##########
980     def provision_qemus (self, plc, options):
981
982         assert Substrate.check_options (options.ips_bnode, options.ips_vnode)
983
984         test_mapper = TestMapper ([plc], options)
985         nodenames = test_mapper.node_names()
986         maps=[]
987         for nodename in nodenames:
988
989             if options.ips_vnode:
990                 # as above, it's a rerun, take it for granted
991                 qemu_boxname=options.ips_bnode.pop()
992                 vnode_hostname=options.ips_vnode.pop()
993             else:
994                 if self.sense(): self.list()
995                 qemu_boxname=None
996                 vnode_hostname=None
997                 # try to find an available IP 
998                 self.vnode_pool.sense()
999                 couple=self.vnode_pool.next_free()
1000                 if couple:
1001                     (vnode_hostname,unused)=couple
1002                 # find a physical box
1003                 max_free=0
1004                 # use the box that has max free spots for load balancing
1005                 for qb in self.qemu_boxes:
1006                     free=qb.free_spots()
1007                     if free>max_free:
1008                         qemu_boxname=qb.hostname
1009                         max_free=free
1010                 # if we miss the box or the IP, kill the oldest instance
1011                 if not qemu_boxname or not vnode_hostname:
1012                 # find the oldest of all our instances
1013                     all_qemu_instances=reduce(lambda x, y: x+y, 
1014                                               [ qb.qemu_instances for qb in self.qemu_boxes ],
1015                                               [])
1016                     all_qemu_instances.sort(timestamp_sort)
1017                     try:
1018                         qemu_instance_to_kill=all_qemu_instances[0]
1019                     except:
1020                         msg=""
1021                         if not qemu_boxname: msg += " QEMU boxes are full"
1022                         if not vnode_hostname: msg += " vnode IP pool exhausted" 
1023                         raise Exception,"Could not make space for a QEMU instance:"+msg
1024                     freed_qemu_boxname=qemu_instance_to_kill.qemu_box.hostname
1025                     freed_vnode_hostname=short_hostname(qemu_instance_to_kill.nodename)
1026                     # kill it
1027                     message='killing oldest qemu node = %s on %s'%(qemu_instance_to_kill.line(),
1028                                                                    freed_qemu_boxname)
1029                     qemu_instance_to_kill.kill()
1030                     # use these freed resources where needed
1031                     if not qemu_boxname:
1032                         qemu_boxname=freed_qemu_boxname
1033                     if not vnode_hostname:
1034                         vnode_hostname=freed_vnode_hostname
1035                         self.vnode_pool.set_mine(vnode_hostname)
1036
1037             self.add_dummy_qemu (qemu_boxname,vnode_hostname)
1038             mac=self.vnode_pool.retrieve_userdata(vnode_hostname)
1039             ip=self.vnode_pool.get_ip (vnode_hostname)
1040             self.vnode_pool.add_starting(vnode_hostname,qemu_boxname)
1041
1042             vnode_fqdn = self.fqdn(vnode_hostname)
1043             nodemap={'host_box':qemu_boxname,
1044                      'node_fields:hostname':vnode_fqdn,
1045                      'interface_fields:ip':ip, 
1046                      'interface_fields:mac':mac,
1047                      }
1048             nodemap.update(self.network_settings())
1049             maps.append ( (nodename, nodemap) )
1050
1051             utils.header("PROVISION node %s in box %s at IP %s with MAC %s"%\
1052                              (nodename,qemu_boxname,vnode_hostname,mac))
1053
1054         return test_mapper.map({'node':maps})[0]
1055
1056     def localize_sfa_rspec (self,plc,options):
1057        
1058         plc['sfa']['SFA_REGISTRY_HOST'] = plc['PLC_DB_HOST']
1059         plc['sfa']['SFA_AGGREGATE_HOST'] = plc['PLC_DB_HOST']
1060         plc['sfa']['SFA_SM_HOST'] = plc['PLC_DB_HOST']
1061         plc['sfa']['SFA_PLC_DB_HOST'] = plc['PLC_DB_HOST']
1062         plc['sfa']['SFA_PLC_URL'] = 'https://' + plc['PLC_API_HOST'] + ':443/PLCAPI/' 
1063         for site in plc['sites']:
1064             for node in site['nodes']:
1065                 plc['sfa']['sfa_slice_rspec']['part4'] = node['node_fields']['hostname']
1066         return plc
1067
1068     #################### release:
1069     def release (self,options):
1070         self.vplc_pool.release_my_starting()
1071         self.vnode_pool.release_my_starting()
1072         pass
1073
1074     #################### show results for interactive mode
1075     def get_box (self,boxname):
1076         for b in self.build_boxes + self.plc_boxes + self.qemu_boxes + [self.test_box] :
1077             if b.shortname()==boxname:
1078                 return b
1079         print "Could not find box %s"%boxname
1080         return None
1081
1082     def list_boxes(self,box_or_names):
1083         print 'Sensing',
1084         for box in box_or_names:
1085             if not isinstance(box,Box): box=self.get_box(box)
1086             if not box: continue
1087             box.sense(self.options)
1088         print 'Done'
1089         for box in box_or_names:
1090             if not isinstance(box,Box): box=self.get_box(box)
1091             if not box: continue
1092             box.list()
1093
1094     def reboot_boxes(self,box_or_names):
1095         for box in box_or_names:
1096             if not isinstance(box,Box): box=self.get_box(box)
1097             if not box: continue
1098             box.reboot(self.options)
1099
1100     ####################
1101     # can be run as a utility to manage the local infrastructure
1102     def main (self):
1103         parser=OptionParser()
1104         parser.add_option ('-r',"--reboot",action='store_true',dest='reboot',default=False,
1105                            help='reboot mode (use shutdown -r)')
1106         parser.add_option ('-s',"--soft",action='store_true',dest='soft',default=False,
1107                            help='soft mode for reboot (vserver stop or kill qemus)')
1108         parser.add_option ('-t',"--testbox",action='store_true',dest='testbox',default=False,
1109                            help='add test box') 
1110         parser.add_option ('-b',"--build",action='store_true',dest='builds',default=False,
1111                            help='add build boxes')
1112         parser.add_option ('-p',"--plc",action='store_true',dest='plcs',default=False,
1113                            help='add plc boxes')
1114         parser.add_option ('-q',"--qemu",action='store_true',dest='qemus',default=False,
1115                            help='add qemu boxes') 
1116         parser.add_option ('-a',"--all",action='store_true',dest='all',default=False,
1117                            help='address all known  boxes, like -b -t -p -q')
1118         parser.add_option ('-v',"--verbose",action='store_true',dest='verbose',default=False,
1119                            help='verbose mode')
1120         parser.add_option ('-n',"--dry_run",action='store_true',dest='dry_run',default=False,
1121                            help='dry run mode')
1122         (self.options,args)=parser.parse_args()
1123
1124         boxes=args
1125         if self.options.testbox: boxes += [self.test_box]
1126         if self.options.builds: boxes += self.build_boxes
1127         if self.options.plcs: boxes += self.plc_boxes
1128         if self.options.qemus: boxes += self.qemu_boxes
1129         if self.options.all: boxes += self.all_boxes
1130         
1131         # default scope is -b -p -q
1132         if not boxes:
1133             boxes = self.build_boxes + self.plc_boxes + self.qemu_boxes
1134
1135         if self.options.reboot: self.reboot_boxes (boxes)
1136         else:                   self.list_boxes (boxes)