Merge with head
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import threading
13 import sys
14 import os
15 import os.path
16 import time
17 import resourcealloc
18 import collections
19 import operator
20 import functools
21 import socket
22 import struct
23 import tempfile
24 import subprocess
25 import random
26 import shutil
27 import logging
28 import metadata
29 import weakref
30
31 class TempKeyError(Exception):
32     pass
33
34 class TestbedController(testbed_impl.TestbedController):
35     def __init__(self):
36         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
37         self._home_directory = None
38         self.slicename = None
39         self._traces = dict()
40
41         import node, interfaces, application, multicast
42         self._node = node
43         self._interfaces = interfaces
44         self._app = application
45         self._multicast = multicast
46         
47         self._blacklist = set()
48         self._just_provisioned = set()
49         
50         self._load_blacklist()
51         
52         self._logger = logging.getLogger('nepi.testbeds.planetlab')
53         
54         self.recovering = False
55
56     @property
57     def home_directory(self):
58         return self._home_directory
59
60     @property
61     def plapi(self):
62         if not hasattr(self, '_plapi'):
63             import plcapi
64
65             if self.authUser:
66                 self._plapi = plcapi.PLCAPI(
67                     username = self.authUser,
68                     password = self.authString,
69                     hostname = self.plcHost,
70                     urlpattern = self.plcUrl
71                     )
72             else:
73                 # anonymous access - may not be enough for much
74                 self._plapi = plcapi.PLCAPI()
75         return self._plapi
76
77     @property
78     def slice_id(self):
79         if not hasattr(self, '_slice_id'):
80             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
81             if slices:
82                 self._slice_id = slices[0]['slice_id']
83             else:
84                 # If it wasn't found, don't remember this failure, keep trying
85                 return None
86         return self._slice_id
87     
88     @property
89     def vsys_vnet(self):
90         if not hasattr(self, '_vsys_vnet'):
91             slicetags = self.plapi.GetSliceTags(
92                 name = self.slicename,
93                 tagname = 'vsys_vnet',
94                 fields=('value',))
95             if slicetags:
96                 self._vsys_vnet = slicetags[0]['value']
97             else:
98                 # If it wasn't found, don't remember this failure, keep trying
99                 return None
100         return self._vsys_vnet
101     
102     def _load_blacklist(self):
103         blpath = environ.homepath('plblacklist')
104         
105         try:
106             bl = open(blpath, "r")
107         except:
108             self._blacklist = set()
109             return
110             
111         try:
112             self._blacklist = set(
113                 map(int,
114                     map(str.strip, bl.readlines())
115                 )
116             )
117         finally:
118             bl.close()
119     
120     def _save_blacklist(self):
121         blpath = environ.homepath('plblacklist')
122         bl = open(blpath, "w")
123         try:
124             bl.writelines(
125                 map('%s\n'.__mod__, self._blacklist))
126         finally:
127             bl.close()
128     
129     def do_setup(self):
130         self._home_directory = self._attributes.\
131             get_attribute_value("homeDirectory")
132         self.slicename = self._attributes.\
133             get_attribute_value("slice")
134         self.authUser = self._attributes.\
135             get_attribute_value("authUser")
136         self.authString = self._attributes.\
137             get_attribute_value("authPass")
138         self.sliceSSHKey = self._attributes.\
139             get_attribute_value("sliceSSHKey")
140         self.sliceSSHKeyPass = None
141         self.plcHost = self._attributes.\
142             get_attribute_value("plcHost")
143         self.plcUrl = self._attributes.\
144             get_attribute_value("plcUrl")
145         self.logLevel = self._attributes.\
146             get_attribute_value("plLogLevel")
147         self.tapPortBase = self._attributes.\
148             get_attribute_value("tapPortBase")
149         self.p2pDeployment = self._attributes.\
150             get_attribute_value("p2pDeployment")
151         self.dedicatedSlice = self._attributes.\
152             get_attribute_value("dedicatedSlice")
153         
154         if not self.slicename:
155             raise RuntimeError, "Slice not set"
156         if not self.authUser:
157             raise RuntimeError, "PlanetLab account username not set"
158         if not self.authString:
159             raise RuntimeError, "PlanetLab account passphrase not set"
160         if not self.sliceSSHKey:
161             raise RuntimeError, "PlanetLab account key not specified"
162         if not os.path.exists(self.sliceSSHKey):
163             raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
164         
165         self._logger.setLevel(getattr(logging,self.logLevel))
166         
167         super(TestbedController, self).do_setup()
168
169     def do_post_asynclaunch(self, guid):
170         # Dependencies were launched asynchronously,
171         # so wait for them
172         dep = self._elements[guid]
173         if isinstance(dep, self._app.Dependency):
174             dep.async_setup_wait()
175     
176     # Two-phase configuration for asynchronous launch
177     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
178     do_poststep_configure = staticmethod(do_post_asynclaunch)
179
180     def do_preconfigure(self):
181         while True:
182             # Perform resource discovery if we don't have
183             # specific resources assigned yet
184             self.do_resource_discovery()
185
186             # Create PlanetLab slivers
187             self.do_provisioning()
188             
189             try:
190                 # Wait for provisioning
191                 self.do_wait_nodes()
192                 
193                 # Okkey...
194                 break
195             except self._node.UnresponsiveNodeError:
196                 # Oh... retry...
197                 pass
198         
199         if self.p2pDeployment:
200             # Plan application deployment
201             self.do_spanning_deployment_plan()
202
203         # Configure elements per XML data
204         super(TestbedController, self).do_preconfigure()
205
206     def do_resource_discovery(self, recover = False):
207         to_provision = self._to_provision = set()
208         
209         reserved = set(self._blacklist)
210         for guid, node in self._elements.iteritems():
211             if isinstance(node, self._node.Node) and node._node_id is not None:
212                 reserved.add(node._node_id)
213         
214         # Initial algo:
215         #   look for perfectly defined nodes
216         #   (ie: those with only one candidate)
217         reserve_lock = threading.Lock()
218         def assignifunique(guid, node):
219             # Try existing nodes first
220             # If we have only one candidate, simply use it
221             candidates = node.find_candidates(
222                 filter_slice_id = self.slice_id)
223             
224             node_id = None
225             reserve_lock.acquire()
226             try:
227                 candidates -= reserved
228                 if len(candidates) == 1:
229                     node_id = iter(candidates).next()
230                     reserved.add(node_id)
231                 elif not candidates:
232                     # Try again including unassigned nodes
233                     reserve_lock.release()
234                     try:
235                         candidates = node.find_candidates()
236                     finally:
237                         reserve_lock.acquire()
238                     candidates -= reserved
239                     if len(candidates) > 1:
240                         return
241                     if len(candidates) == 1:
242                         node_id = iter(candidates).next()
243                         to_provision.add(node_id)
244                         reserved.add(node_id)
245                     elif not candidates:
246                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
247                             node.make_filter_description())
248             finally:
249                 reserve_lock.release()
250             
251             if node_id is not None:
252                 node.assign_node_id(node_id)
253         
254         runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
255         runner.start()
256         for guid, node in self._elements.iteritems():
257             if isinstance(node, self._node.Node) and node._node_id is None:
258                 runner.put(assignifunique, guid, node)
259         runner.sync()
260         
261         # Now do the backtracking search for a suitable solution
262         # First with existing slice nodes
263         reqs = []
264         nodes = []
265         def genreqs(node, filter_slice_id=None):
266             # Try existing nodes first
267             # If we have only one candidate, simply use it
268             candidates = node.find_candidates(
269                 filter_slice_id = filter_slice_id)
270             candidates -= reserved
271             reqs.append(candidates)
272             nodes.append(node)
273         for guid, node in self._elements.iteritems():
274             if isinstance(node, self._node.Node) and node._node_id is None:
275                 runner.put(genreqs, node, self.slice_id)
276         runner.sync()
277         
278         if nodes and reqs:
279             if recover:
280                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
281
282             def pickbest(fullset, nreq, node=nodes[0]):
283                 if len(fullset) > nreq:
284                     fullset = zip(node.rate_nodes(fullset),fullset)
285                     fullset.sort(reverse=True)
286                     del fullset[nreq:]
287                     return set(map(operator.itemgetter(1),fullset))
288                 else:
289                     return fullset
290             
291             try:
292                 solution = resourcealloc.alloc(reqs, sample=pickbest)
293             except resourcealloc.ResourceAllocationError:
294                 # Failed, try again with all nodes
295                 reqs = []
296                 for node in nodes:
297                     runner.put(genreqs, node)
298                 runner.sync()
299                 solution = resourcealloc.alloc(reqs, sample=pickbest)
300                 to_provision.update(solution)
301             
302             # Do assign nodes
303             for node, node_id in zip(nodes, solution):
304                 runner.put(node.assign_node_id, node_id)
305             runner.join()
306
307     def do_provisioning(self):
308         if self._to_provision:
309             # Add new nodes to the slice
310             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
311             new_nodes = list(set(cur_nodes) | self._to_provision)
312             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
313
314         # cleanup
315         self._just_provisioned = self._to_provision
316         del self._to_provision
317     
318     def do_wait_nodes(self):
319         for guid, node in self._elements.iteritems():
320             if isinstance(node, self._node.Node):
321                 # Just inject configuration stuff
322                 node.home_path = "nepi-node-%s" % (guid,)
323                 node.ident_path = self.sliceSSHKey
324                 node.slicename = self.slicename
325             
326                 # Show the magic
327                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
328         
329         try:
330             runner = ParallelRun(maxqueue=1)
331             abort = []
332             def waitforit(guid, node):
333                 try:
334                     node.wait_provisioning(
335                         (20*60 if node._node_id in self._just_provisioned else 60)
336                     )
337                     
338                     self._logger.info("READY Node %s at %s", guid, node.hostname)
339                     
340                     # Prepare dependency installer now
341                     node.prepare_dependencies()
342                 except:
343                     abort.append(None)
344                     raise
345                 
346             for guid, node in self._elements.iteritems():
347                 if abort:
348                     break
349                 if isinstance(node, self._node.Node):
350                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
351                     runner.put(waitforit, guid, node)
352             runner.join()
353                     
354         except self._node.UnresponsiveNodeError:
355             # Uh... 
356             self._logger.warn("UNRESPONSIVE Nodes")
357             
358             # Mark all dead nodes (which are unresponsive) on the blacklist
359             # and re-raise
360             for guid, node in self._elements.iteritems():
361                 if isinstance(node, self._node.Node):
362                     if not node.is_alive():
363                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
364                         self._blacklist.add(node._node_id)
365                         node.unassign_node()
366             
367             try:
368                 self._save_blacklist()
369             except:
370                 # not important...
371                 import traceback
372                 traceback.print_exc()
373             
374             raise
375     
376     def do_spanning_deployment_plan(self):
377         # Create application groups by collecting all applications
378         # based on their hash - the hash should contain everything that
379         # defines them and the platform they're built
380         
381         def dephash(app):
382             return (
383                 frozenset((app.depends or "").split(' ')),
384                 frozenset((app.sources or "").split(' ')),
385                 app.build,
386                 app.install,
387                 app.node.architecture,
388                 app.node.operatingSystem,
389                 app.node.pl_distro,
390             )
391         
392         depgroups = collections.defaultdict(list)
393         
394         for element in self._elements.itervalues():
395             if isinstance(element, self._app.Dependency):
396                 depgroups[dephash(element)].append(element)
397             elif isinstance(element, self._node.Node):
398                 deps = element._yum_dependencies
399                 if deps:
400                     depgroups[dephash(deps)].append(deps)
401         
402         # Set up spanning deployment for those applications that
403         # have been deployed in several nodes.
404         for dh, group in depgroups.iteritems():
405             if len(group) > 1:
406                 # Pick root (deterministically)
407                 root = min(group, key=lambda app:app.node.hostname)
408                 
409                 # Obtain all IPs in numeric format
410                 # (which means faster distance computations)
411                 for dep in group:
412                     dep._ip = socket.gethostbyname(dep.node.hostname)
413                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
414                 
415                 # Compute plan
416                 # NOTE: the plan is an iterator
417                 plan = mst.mst(
418                     group,
419                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
420                     root = root,
421                     maxbranching = 2)
422                 
423                 # Re-sign private key
424                 try:
425                     tempprk, temppuk, tmppass = self._make_temp_private_key()
426                 except TempKeyError:
427                     continue
428                 
429                 # Set up slaves
430                 plan = list(plan)
431                 for slave, master in plan:
432                     slave.set_master(master)
433                     slave.install_keys(tempprk, temppuk, tmppass)
434                     
435         # We don't need the user's passphrase anymore
436         self.sliceSSHKeyPass = None
437     
438     def _make_temp_private_key(self):
439         # Get the user's key's passphrase
440         if not self.sliceSSHKeyPass:
441             if 'SSH_ASKPASS' in os.environ:
442                 proc = subprocess.Popen(
443                     [ os.environ['SSH_ASKPASS'],
444                       "Please type the passphrase for the %s SSH identity file. "
445                       "The passphrase will be used to re-cipher the identity file with "
446                       "a random 256-bit key for automated chain deployment on the "
447                       "%s PlanetLab slice" % ( 
448                         os.path.basename(self.sliceSSHKey), 
449                         self.slicename
450                     ) ],
451                     stdin = open("/dev/null"),
452                     stdout = subprocess.PIPE,
453                     stderr = subprocess.PIPE)
454                 out,err = proc.communicate()
455                 self.sliceSSHKeyPass = out.strip()
456         
457         if not self.sliceSSHKeyPass:
458             raise TempKeyError
459         
460         # Create temporary key files
461         prk = tempfile.NamedTemporaryFile(
462             dir = self.root_directory,
463             prefix = "pl_deploy_tmpk_",
464             suffix = "")
465
466         puk = tempfile.NamedTemporaryFile(
467             dir = self.root_directory,
468             prefix = "pl_deploy_tmpk_",
469             suffix = ".pub")
470             
471         # Create secure 256-bits temporary passphrase
472         passphrase = ''.join(map(chr,[rng.randint(0,255) 
473                                       for rng in (random.SystemRandom(),)
474                                       for i in xrange(32)] )).encode("hex")
475                 
476         # Copy keys
477         oprk = open(self.sliceSSHKey, "rb")
478         opuk = open(self.sliceSSHKey+".pub", "rb")
479         shutil.copymode(oprk.name, prk.name)
480         shutil.copymode(opuk.name, puk.name)
481         shutil.copyfileobj(oprk, prk)
482         shutil.copyfileobj(opuk, puk)
483         prk.flush()
484         puk.flush()
485         oprk.close()
486         opuk.close()
487         
488         # A descriptive comment
489         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
490         
491         # Recipher keys
492         proc = subprocess.Popen(
493             ["ssh-keygen", "-p",
494              "-f", prk.name,
495              "-P", self.sliceSSHKeyPass,
496              "-N", passphrase,
497              "-C", comment ],
498             stdout = subprocess.PIPE,
499             stderr = subprocess.PIPE,
500             stdin = subprocess.PIPE
501         )
502         out, err = proc.communicate()
503         
504         if err:
505             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
506                 out, err)
507         
508         prk.seek(0)
509         puk.seek(0)
510         
511         # Change comment on public key
512         puklines = puk.readlines()
513         puklines[0] = puklines[0].split(' ')
514         puklines[0][-1] = comment+'\n'
515         puklines[0] = ' '.join(puklines[0])
516         puk.seek(0)
517         puk.truncate()
518         puk.writelines(puklines)
519         del puklines
520         puk.flush()
521         
522         return prk, puk, passphrase
523     
524     def set(self, guid, name, value, time = TIME_NOW):
525         super(TestbedController, self).set(guid, name, value, time)
526         # TODO: take on account schedule time for the task
527         element = self._elements[guid]
528         if element:
529             try:
530                 setattr(element, name, value)
531             except:
532                 # We ignore these errors while recovering.
533                 # Some attributes are immutable, and setting
534                 # them is necessary (to recover the state), but
535                 # some are not (they throw an exception).
536                 if not self.recovering:
537                     raise
538
539             if hasattr(element, 'refresh'):
540                 # invoke attribute refresh hook
541                 element.refresh()
542
543     def get(self, guid, name, time = TIME_NOW):
544         value = super(TestbedController, self).get(guid, name, time)
545         # TODO: take on account schedule time for the task
546         factory_id = self._create[guid]
547         factory = self._factories[factory_id]
548         element = self._elements.get(guid)
549         try:
550             return getattr(element, name)
551         except (KeyError, AttributeError):
552             return value
553
554     def get_address(self, guid, index, attribute='Address'):
555         index = int(index)
556
557         # try the real stuff
558         iface = self._elements.get(guid)
559         if iface and index == 0:
560             if attribute == 'Address':
561                 return iface.address
562             elif attribute == 'NetPrefix':
563                 return iface.netprefix
564             elif attribute == 'Broadcast':
565                 return iface.broadcast
566
567         # if all else fails, query box
568         return super(TestbedController, self).get_address(guid, index, attribute)
569
570     def action(self, time, guid, action):
571         raise NotImplementedError
572
573     def shutdown(self):
574         for trace in self._traces.itervalues():
575             trace.close()
576         
577         def invokeif(action, testbed, guid):
578             element = self._elements[guid]
579             if hasattr(element, action):
580                 getattr(element, action)()
581         
582         self._do_in_factory_order(
583             functools.partial(invokeif, 'cleanup'),
584             metadata.shutdown_order)
585
586         self._do_in_factory_order(
587             functools.partial(invokeif, 'destroy'),
588             metadata.shutdown_order)
589             
590         self._elements.clear()
591         self._traces.clear()
592
593     def trace(self, guid, trace_id, attribute='value'):
594         elem = self._elements[guid]
595
596         if attribute == 'value':
597             path = elem.sync_trace(self.home_directory, trace_id)
598             if path:
599                 fd = open(path, "r")
600                 content = fd.read()
601                 fd.close()
602             else:
603                 content = None
604         elif attribute == 'path':
605             content = elem.remote_trace_path(trace_id)
606         elif attribute == 'name':
607             content = elem.remote_trace_name(trace_id)
608         else:
609             content = None
610         return content
611
612     def follow_trace(self, trace_id, trace):
613         self._traces[trace_id] = trace
614
615     def recover(self):
616         try:
617             # An internal flag, so we know to behave differently in
618             # a few corner cases.
619             self.recovering = True
620             
621             # Create and connect do not perform any real tasks against
622             # the nodes, it only sets up the object hierarchy,
623             # so we can run them normally
624             self.do_create()
625             self.do_connect_init()
626             self.do_connect_compl()
627             
628             # Manually recover nodes, to mark dependencies installed
629             # and clean up mutable attributes
630             self._do_in_factory_order(
631                 lambda self, guid : self._elements[guid].recover(), 
632                 [
633                     metadata.NODE,
634                 ])
635             
636             # Assign nodes - since we're working off exeucte XML, nodes
637             # have specific hostnames assigned and we don't need to do
638             # real assignment, only find out node ids and check liveliness
639             self.do_resource_discovery(recover = True)
640             self.do_wait_nodes()
641             
642             # Pre/post configure, however, tends to set up tunnels
643             # Execute configuration steps only for those object
644             # kinds that do not have side effects
645             
646             # Do the ones without side effects,
647             # including nodes that need to set up home 
648             # folders and all that
649             self._do_in_factory_order(
650                 "preconfigure_function", 
651                 [
652                     metadata.INTERNET,
653                     Parallel(metadata.NODE),
654                     metadata.NODEIFACE,
655                 ])
656             
657             # Tunnels require a home path that is configured
658             # at this step. Since we cannot run the step itself,
659             # we need to inject this homepath ourselves
660             for guid, element in self._elements.iteritems():
661                 if isinstance(element, self._interfaces.TunIface):
662                     element._home_path = "tun-%s" % (guid,)
663             
664             # Manually recover tunnels, applications and
665             # netpipes, negating the side effects
666             self._do_in_factory_order(
667                 lambda self, guid : self._elements[guid].recover(), 
668                 [
669                     Parallel(metadata.TAPIFACE),
670                     Parallel(metadata.TUNIFACE),
671                     metadata.NETPIPE,
672                     Parallel(metadata.NEPIDEPENDENCY),
673                     Parallel(metadata.NS3DEPENDENCY),
674                     Parallel(metadata.DEPENDENCY),
675                     Parallel(metadata.APPLICATION),
676                 ])
677
678             # Tunnels are not harmed by configuration after
679             # recovery, and some attributes get set this way
680             # like external_iface
681             self._do_in_factory_order(
682                 "preconfigure_function", 
683                 [
684                     Parallel(metadata.TAPIFACE),
685                     Parallel(metadata.TUNIFACE),
686                 ])
687
688             # Post-do the ones without side effects
689             self._do_in_factory_order(
690                 "configure_function", 
691                 [
692                     metadata.INTERNET,
693                     Parallel(metadata.NODE),
694                     metadata.NODEIFACE,
695                     Parallel(metadata.TAPIFACE),
696                     Parallel(metadata.TUNIFACE),
697                 ])
698             
699             # There are no required prestart steps
700             # to call upon recovery, so we're done
701         finally:
702             self.recovering = True
703     
704     def _make_generic(self, parameters, kind):
705         app = kind(self.plapi)
706         app.testbed = weakref.ref(self)
707
708         # Note: there is 1-to-1 correspondence between attribute names
709         #   If that changes, this has to change as well
710         for attr,val in parameters.iteritems():
711             try:
712                 setattr(app, attr, val)
713             except:
714                 # We ignore these errors while recovering.
715                 # Some attributes are immutable, and setting
716                 # them is necessary (to recover the state), but
717                 # some are not (they throw an exception).
718                 if not self.recovering:
719                     raise
720
721         return app
722
723     def _make_node(self, parameters):
724         node = self._make_generic(parameters, self._node.Node)
725         node.enable_cleanup = self.dedicatedSlice
726         return node
727
728     def _make_node_iface(self, parameters):
729         return self._make_generic(parameters, self._interfaces.NodeIface)
730
731     def _make_tun_iface(self, parameters):
732         return self._make_generic(parameters, self._interfaces.TunIface)
733
734     def _make_tap_iface(self, parameters):
735         return self._make_generic(parameters, self._interfaces.TapIface)
736
737     def _make_netpipe(self, parameters):
738         return self._make_generic(parameters, self._interfaces.NetPipe)
739
740     def _make_internet(self, parameters):
741         return self._make_generic(parameters, self._interfaces.Internet)
742
743     def _make_application(self, parameters):
744         return self._make_generic(parameters, self._app.Application)
745
746     def _make_dependency(self, parameters):
747         return self._make_generic(parameters, self._app.Dependency)
748
749     def _make_nepi_dependency(self, parameters):
750         return self._make_generic(parameters, self._app.NepiDependency)
751
752     def _make_ns3_dependency(self, parameters):
753         return self._make_generic(parameters, self._app.NS3Dependency)
754
755     def _make_tun_filter(self, parameters):
756         return self._make_generic(parameters, self._interfaces.TunFilter)
757
758     def _make_class_queue_filter(self, parameters):
759         return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
760
761     def _make_tos_queue_filter(self, parameters):
762         return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
763
764     def _make_multicast_forwarder(self, parameters):
765         return self._make_generic(parameters, self._multicast.MulticastForwarder)
766
767     def _make_multicast_announcer(self, parameters):
768         return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
769
770     def _make_multicast_router(self, parameters):
771         return self._make_generic(parameters, self._multicast.MulticastRouter)
772
773