Avoid deadlocks
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import threading
13 import sys
14 import os
15 import os.path
16 import time
17 import resourcealloc
18 import collections
19 import operator
20 import functools
21 import socket
22 import struct
23 import tempfile
24 import subprocess
25 import random
26 import shutil
27 import logging
28 import metadata
29 import weakref
30
31 class TempKeyError(Exception):
32     pass
33
34 class TestbedController(testbed_impl.TestbedController):
35     def __init__(self):
36         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
37         self._home_directory = None
38         self.slicename = None
39         self._traces = dict()
40
41         import node, interfaces, application, multicast
42         self._node = node
43         self._interfaces = interfaces
44         self._app = application
45         self._multicast = multicast
46         
47         self._blacklist = set()
48         self._just_provisioned = set()
49         
50         self._load_blacklist()
51         
52         self._logger = logging.getLogger('nepi.testbeds.planetlab')
53         
54         self.recovering = False
55
56     @property
57     def home_directory(self):
58         return self._home_directory
59
60     @property
61     def plapi(self):
62         if not hasattr(self, '_plapi'):
63             import plcapi
64
65             if self.authUser:
66                 self._plapi = plcapi.PLCAPI(
67                     username = self.authUser,
68                     password = self.authString,
69                     hostname = self.plcHost,
70                     urlpattern = self.plcUrl
71                     )
72             else:
73                 # anonymous access - may not be enough for much
74                 self._plapi = plcapi.PLCAPI()
75         return self._plapi
76
77     @property
78     def slice_id(self):
79         if not hasattr(self, '_slice_id'):
80             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
81             if slices:
82                 self._slice_id = slices[0]['slice_id']
83             else:
84                 # If it wasn't found, don't remember this failure, keep trying
85                 return None
86         return self._slice_id
87     
88     @property
89     def vsys_vnet(self):
90         if not hasattr(self, '_vsys_vnet'):
91             slicetags = self.plapi.GetSliceTags(
92                 name = self.slicename,
93                 tagname = 'vsys_vnet',
94                 fields=('value',))
95             if slicetags:
96                 self._vsys_vnet = slicetags[0]['value']
97             else:
98                 # If it wasn't found, don't remember this failure, keep trying
99                 return None
100         return self._vsys_vnet
101     
102     def _load_blacklist(self):
103         blpath = environ.homepath('plblacklist')
104         
105         try:
106             bl = open(blpath, "r")
107         except:
108             self._blacklist = set()
109             return
110             
111         try:
112             self._blacklist = set(
113                 map(int,
114                     map(str.strip, bl.readlines())
115                 )
116             )
117         finally:
118             bl.close()
119     
120     def _save_blacklist(self):
121         blpath = environ.homepath('plblacklist')
122         bl = open(blpath, "w")
123         try:
124             bl.writelines(
125                 map('%s\n'.__mod__, self._blacklist))
126         finally:
127             bl.close()
128     
129     def do_setup(self):
130         self._home_directory = self._attributes.\
131             get_attribute_value("homeDirectory")
132         self.slicename = self._attributes.\
133             get_attribute_value("slice")
134         self.authUser = self._attributes.\
135             get_attribute_value("authUser")
136         self.authString = self._attributes.\
137             get_attribute_value("authPass")
138         self.sliceSSHKey = self._attributes.\
139             get_attribute_value("sliceSSHKey")
140         self.sliceSSHKeyPass = None
141         self.plcHost = self._attributes.\
142             get_attribute_value("plcHost")
143         self.plcUrl = self._attributes.\
144             get_attribute_value("plcUrl")
145         self.logLevel = self._attributes.\
146             get_attribute_value("plLogLevel")
147         self.tapPortBase = self._attributes.\
148             get_attribute_value("tapPortBase")
149         self.p2pDeployment = self._attributes.\
150             get_attribute_value("p2pDeployment")
151         self.dedicatedSlice = self._attributes.\
152             get_attribute_value("dedicatedSlice")
153         
154         if not self.slicename:
155             raise RuntimeError, "Slice not set"
156         if not self.authUser:
157             raise RuntimeError, "PlanetLab account username not set"
158         if not self.authString:
159             raise RuntimeError, "PlanetLab account passphrase not set"
160         if not self.sliceSSHKey:
161             raise RuntimeError, "PlanetLab account key not specified"
162         if not os.path.exists(self.sliceSSHKey):
163             raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
164         
165         self._logger.setLevel(getattr(logging,self.logLevel))
166         
167         super(TestbedController, self).do_setup()
168
169     def do_post_asynclaunch(self, guid):
170         # Dependencies were launched asynchronously,
171         # so wait for them
172         dep = self._elements[guid]
173         if isinstance(dep, self._app.Dependency):
174             dep.async_setup_wait()
175     
176     # Two-phase configuration for asynchronous launch
177     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
178     do_poststep_configure = staticmethod(do_post_asynclaunch)
179
180     def do_preconfigure(self):
181         while True:
182             # Perform resource discovery if we don't have
183             # specific resources assigned yet
184             self.do_resource_discovery()
185
186             # Create PlanetLab slivers
187             self.do_provisioning()
188             
189             try:
190                 # Wait for provisioning
191                 self.do_wait_nodes()
192                 
193                 # Okkey...
194                 break
195             except self._node.UnresponsiveNodeError:
196                 # Oh... retry...
197                 pass
198         
199         if self.p2pDeployment:
200             # Plan application deployment
201             self.do_spanning_deployment_plan()
202
203         # Configure elements per XML data
204         super(TestbedController, self).do_preconfigure()
205
206     def do_resource_discovery(self, recover = False):
207         to_provision = self._to_provision = set()
208         
209         reserved = set(self._blacklist)
210         for guid, node in self._elements.iteritems():
211             if isinstance(node, self._node.Node) and node._node_id is not None:
212                 reserved.add(node._node_id)
213         
214         # Initial algo:
215         #   look for perfectly defined nodes
216         #   (ie: those with only one candidate)
217         reserve_lock = threading.RLock()
218         def assignifunique(guid, node):
219             # Try existing nodes first
220             # If we have only one candidate, simply use it
221             candidates = node.find_candidates(
222                 filter_slice_id = self.slice_id)
223             
224             node_id = None
225             reserve_lock.acquire()
226             try:
227                 candidates -= reserved
228                 if len(candidates) == 1:
229                     node_id = iter(candidates).next()
230                     reserved.add(node_id)
231                 elif not candidates:
232                     # Try again including unassigned nodes
233                     reserve_lock.release()
234                     try:
235                         candidates = node.find_candidates()
236                     finally:
237                         reserve_lock.acquire()
238                     candidates -= reserved
239                     if len(candidates) > 1:
240                         return
241                     if len(candidates) == 1:
242                         node_id = iter(candidates).next()
243                         to_provision.add(node_id)
244                         reserved.add(node_id)
245                     elif not candidates:
246                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
247                             node.make_filter_description())
248             finally:
249                 reserve_lock.release()
250             
251             if node_id is not None:
252                 node.assign_node_id(node_id)
253         
254         runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
255         runner.start()
256         for guid, node in self._elements.iteritems():
257             if isinstance(node, self._node.Node) and node._node_id is None:
258                 runner.put(assignifunique, guid, node)
259         runner.sync()
260         
261         # Now do the backtracking search for a suitable solution
262         # First with existing slice nodes
263         reqs = []
264         nodes = []
265         def genreqs(node, filter_slice_id=None):
266             # Try existing nodes first
267             # If we have only one candidate, simply use it
268             candidates = node.find_candidates(
269                 filter_slice_id = filter_slice_id)
270             candidates -= reserved
271             reqs.append(candidates)
272             nodes.append(node)
273         for guid, node in self._elements.iteritems():
274             if isinstance(node, self._node.Node) and node._node_id is None:
275                 runner.put(genreqs, node, self.slice_id)
276         runner.sync()
277         
278         if nodes and reqs:
279             if recover:
280                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
281
282             def pickbest(fullset, nreq, node=nodes[0]):
283                 if len(fullset) > nreq:
284                     fullset = zip(node.rate_nodes(fullset),fullset)
285                     fullset.sort(reverse=True)
286                     del fullset[nreq:]
287                     return set(map(operator.itemgetter(1),fullset))
288                 else:
289                     return fullset
290             
291             try:
292                 solution = resourcealloc.alloc(reqs, sample=pickbest)
293             except resourcealloc.ResourceAllocationError:
294                 # Failed, try again with all nodes
295                 reqs = []
296                 for node in nodes:
297                     runner.put(genreqs, node)
298                 runner.sync()
299                 solution = resourcealloc.alloc(reqs, sample=pickbest)
300                 to_provision.update(solution)
301             
302             # Do assign nodes
303             for node, node_id in zip(nodes, solution):
304                 runner.put(node.assign_node_id, node_id)
305             runner.join()
306
307     def do_provisioning(self):
308         if self._to_provision:
309             # Add new nodes to the slice
310             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
311             new_nodes = list(set(cur_nodes) | self._to_provision)
312             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
313
314         # cleanup
315         self._just_provisioned = self._to_provision
316         del self._to_provision
317     
318     def do_wait_nodes(self):
319         for guid, node in self._elements.iteritems():
320             if isinstance(node, self._node.Node):
321                 # Just inject configuration stuff
322                 node.home_path = "nepi-node-%s" % (guid,)
323                 node.ident_path = self.sliceSSHKey
324                 node.slicename = self.slicename
325             
326                 # Show the magic
327                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
328         
329         try:
330             runner = ParallelRun(maxthreads=64, maxqueue=1)
331             abort = []
332             def waitforit(guid, node):
333                 try:
334                     node.wait_provisioning(
335                         (20*60 if node._node_id in self._just_provisioned else 60)
336                     )
337                     
338                     self._logger.info("READY Node %s at %s", guid, node.hostname)
339                     
340                     # Prepare dependency installer now
341                     node.prepare_dependencies()
342                 except:
343                     abort.append(None)
344                     raise
345                 
346             for guid, node in self._elements.iteritems():
347                 if abort:
348                     break
349                 if isinstance(node, self._node.Node):
350                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
351                     runner.put(waitforit, guid, node)
352             runner.join()
353                     
354         except self._node.UnresponsiveNodeError:
355             # Uh... 
356             self._logger.warn("UNRESPONSIVE Nodes")
357             
358             # Mark all dead nodes (which are unresponsive) on the blacklist
359             # and re-raise
360             for guid, node in self._elements.iteritems():
361                 if isinstance(node, self._node.Node):
362                     if not node.is_alive():
363                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
364                         self._blacklist.add(node._node_id)
365                         node.unassign_node()
366             
367             try:
368                 self._save_blacklist()
369             except:
370                 # not important...
371                 import traceback
372                 traceback.print_exc()
373             
374             raise
375     
376     def do_spanning_deployment_plan(self):
377         # Create application groups by collecting all applications
378         # based on their hash - the hash should contain everything that
379         # defines them and the platform they're built
380         
381         def dephash(app):
382             return (
383                 frozenset((app.depends or "").split(' ')),
384                 frozenset((app.sources or "").split(' ')),
385                 app.build,
386                 app.install,
387                 app.node.architecture,
388                 app.node.operatingSystem,
389                 app.node.pl_distro,
390                 app.__class__,
391             )
392         
393         depgroups = collections.defaultdict(list)
394         
395         for element in self._elements.itervalues():
396             if isinstance(element, self._app.Dependency):
397                 depgroups[dephash(element)].append(element)
398             elif isinstance(element, self._node.Node):
399                 deps = element._yum_dependencies
400                 if deps:
401                     depgroups[dephash(deps)].append(deps)
402         
403         # Set up spanning deployment for those applications that
404         # have been deployed in several nodes.
405         for dh, group in depgroups.iteritems():
406             if len(group) > 1:
407                 # Pick root (deterministically)
408                 root = min(group, key=lambda app:app.node.hostname)
409                 
410                 # Obtain all IPs in numeric format
411                 # (which means faster distance computations)
412                 for dep in group:
413                     dep._ip = socket.gethostbyname(dep.node.hostname)
414                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
415                 
416                 # Compute plan
417                 # NOTE: the plan is an iterator
418                 plan = mst.mst(
419                     group,
420                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
421                     root = root,
422                     maxbranching = 2)
423                 
424                 # Re-sign private key
425                 try:
426                     tempprk, temppuk, tmppass = self._make_temp_private_key()
427                 except TempKeyError:
428                     continue
429                 
430                 # Set up slaves
431                 plan = list(plan)
432                 for slave, master in plan:
433                     slave.set_master(master)
434                     slave.install_keys(tempprk, temppuk, tmppass)
435                     
436         # We don't need the user's passphrase anymore
437         self.sliceSSHKeyPass = None
438     
439     def _make_temp_private_key(self):
440         # Get the user's key's passphrase
441         if not self.sliceSSHKeyPass:
442             if 'SSH_ASKPASS' in os.environ:
443                 proc = subprocess.Popen(
444                     [ os.environ['SSH_ASKPASS'],
445                       "Please type the passphrase for the %s SSH identity file. "
446                       "The passphrase will be used to re-cipher the identity file with "
447                       "a random 256-bit key for automated chain deployment on the "
448                       "%s PlanetLab slice" % ( 
449                         os.path.basename(self.sliceSSHKey), 
450                         self.slicename
451                     ) ],
452                     stdin = open("/dev/null"),
453                     stdout = subprocess.PIPE,
454                     stderr = subprocess.PIPE)
455                 out,err = proc.communicate()
456                 self.sliceSSHKeyPass = out.strip()
457         
458         if not self.sliceSSHKeyPass:
459             raise TempKeyError
460         
461         # Create temporary key files
462         prk = tempfile.NamedTemporaryFile(
463             dir = self.root_directory,
464             prefix = "pl_deploy_tmpk_",
465             suffix = "")
466
467         puk = tempfile.NamedTemporaryFile(
468             dir = self.root_directory,
469             prefix = "pl_deploy_tmpk_",
470             suffix = ".pub")
471             
472         # Create secure 256-bits temporary passphrase
473         passphrase = os.urandom(32).encode("hex")
474                 
475         # Copy keys
476         oprk = open(self.sliceSSHKey, "rb")
477         opuk = open(self.sliceSSHKey+".pub", "rb")
478         shutil.copymode(oprk.name, prk.name)
479         shutil.copymode(opuk.name, puk.name)
480         shutil.copyfileobj(oprk, prk)
481         shutil.copyfileobj(opuk, puk)
482         prk.flush()
483         puk.flush()
484         oprk.close()
485         opuk.close()
486         
487         # A descriptive comment
488         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
489         
490         # Recipher keys
491         proc = subprocess.Popen(
492             ["ssh-keygen", "-p",
493              "-f", prk.name,
494              "-P", self.sliceSSHKeyPass,
495              "-N", passphrase,
496              "-C", comment ],
497             stdout = subprocess.PIPE,
498             stderr = subprocess.PIPE,
499             stdin = subprocess.PIPE
500         )
501         out, err = proc.communicate()
502         
503         if err:
504             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
505                 out, err)
506         
507         prk.seek(0)
508         puk.seek(0)
509         
510         # Change comment on public key
511         puklines = puk.readlines()
512         puklines[0] = puklines[0].split(' ')
513         puklines[0][-1] = comment+'\n'
514         puklines[0] = ' '.join(puklines[0])
515         puk.seek(0)
516         puk.truncate()
517         puk.writelines(puklines)
518         del puklines
519         puk.flush()
520         
521         return prk, puk, passphrase
522     
523     def set(self, guid, name, value, time = TIME_NOW):
524         super(TestbedController, self).set(guid, name, value, time)
525         # TODO: take on account schedule time for the task
526         element = self._elements[guid]
527         if element:
528             try:
529                 setattr(element, name, value)
530             except:
531                 # We ignore these errors while recovering.
532                 # Some attributes are immutable, and setting
533                 # them is necessary (to recover the state), but
534                 # some are not (they throw an exception).
535                 if not self.recovering:
536                     raise
537
538             if hasattr(element, 'refresh'):
539                 # invoke attribute refresh hook
540                 element.refresh()
541
542     def get(self, guid, name, time = TIME_NOW):
543         value = super(TestbedController, self).get(guid, name, time)
544         # TODO: take on account schedule time for the task
545         factory_id = self._create[guid]
546         factory = self._factories[factory_id]
547         element = self._elements.get(guid)
548         try:
549             return getattr(element, name)
550         except (KeyError, AttributeError):
551             return value
552
553     def get_address(self, guid, index, attribute='Address'):
554         index = int(index)
555
556         # try the real stuff
557         iface = self._elements.get(guid)
558         if iface and index == 0:
559             if attribute == 'Address':
560                 return iface.address
561             elif attribute == 'NetPrefix':
562                 return iface.netprefix
563             elif attribute == 'Broadcast':
564                 return iface.broadcast
565
566         # if all else fails, query box
567         return super(TestbedController, self).get_address(guid, index, attribute)
568
569     def action(self, time, guid, action):
570         raise NotImplementedError
571
572     def shutdown(self):
573         for trace in self._traces.itervalues():
574             trace.close()
575         
576         def invokeif(action, testbed, guid):
577             element = self._elements[guid]
578             if hasattr(element, action):
579                 getattr(element, action)()
580         
581         self._do_in_factory_order(
582             functools.partial(invokeif, 'cleanup'),
583             metadata.shutdown_order)
584
585         self._do_in_factory_order(
586             functools.partial(invokeif, 'destroy'),
587             metadata.shutdown_order)
588             
589         self._elements.clear()
590         self._traces.clear()
591
592     def trace(self, guid, trace_id, attribute='value'):
593         elem = self._elements[guid]
594
595         if attribute == 'value':
596             path = elem.sync_trace(self.home_directory, trace_id)
597             if path:
598                 fd = open(path, "r")
599                 content = fd.read()
600                 fd.close()
601             else:
602                 content = None
603         elif attribute == 'path':
604             content = elem.remote_trace_path(trace_id)
605         elif attribute == 'name':
606             content = elem.remote_trace_name(trace_id)
607         else:
608             content = None
609         return content
610
611     def follow_trace(self, trace_id, trace):
612         self._traces[trace_id] = trace
613
614     def recover(self):
615         try:
616             # An internal flag, so we know to behave differently in
617             # a few corner cases.
618             self.recovering = True
619             
620             # Create and connect do not perform any real tasks against
621             # the nodes, it only sets up the object hierarchy,
622             # so we can run them normally
623             self.do_create()
624             self.do_connect_init()
625             self.do_connect_compl()
626             
627             # Manually recover nodes, to mark dependencies installed
628             # and clean up mutable attributes
629             self._do_in_factory_order(
630                 lambda self, guid : self._elements[guid].recover(), 
631                 [
632                     metadata.NODE,
633                 ])
634             
635             # Assign nodes - since we're working off exeucte XML, nodes
636             # have specific hostnames assigned and we don't need to do
637             # real assignment, only find out node ids and check liveliness
638             self.do_resource_discovery(recover = True)
639             self.do_wait_nodes()
640             
641             # Pre/post configure, however, tends to set up tunnels
642             # Execute configuration steps only for those object
643             # kinds that do not have side effects
644             
645             # Do the ones without side effects,
646             # including nodes that need to set up home 
647             # folders and all that
648             self._do_in_factory_order(
649                 "preconfigure_function", 
650                 [
651                     metadata.INTERNET,
652                     Parallel(metadata.NODE),
653                     metadata.NODEIFACE,
654                 ])
655             
656             # Tunnels require a home path that is configured
657             # at this step. Since we cannot run the step itself,
658             # we need to inject this homepath ourselves
659             for guid, element in self._elements.iteritems():
660                 if isinstance(element, self._interfaces.TunIface):
661                     element._home_path = "tun-%s" % (guid,)
662             
663             # Manually recover tunnels, applications and
664             # netpipes, negating the side effects
665             self._do_in_factory_order(
666                 lambda self, guid : self._elements[guid].recover(), 
667                 [
668                     Parallel(metadata.TAPIFACE),
669                     Parallel(metadata.TUNIFACE),
670                     metadata.NETPIPE,
671                     Parallel(metadata.NEPIDEPENDENCY),
672                     Parallel(metadata.NS3DEPENDENCY),
673                     Parallel(metadata.DEPENDENCY),
674                     Parallel(metadata.APPLICATION),
675                 ])
676
677             # Tunnels are not harmed by configuration after
678             # recovery, and some attributes get set this way
679             # like external_iface
680             self._do_in_factory_order(
681                 "preconfigure_function", 
682                 [
683                     Parallel(metadata.TAPIFACE),
684                     Parallel(metadata.TUNIFACE),
685                 ])
686
687             # Post-do the ones without side effects
688             self._do_in_factory_order(
689                 "configure_function", 
690                 [
691                     metadata.INTERNET,
692                     Parallel(metadata.NODE),
693                     metadata.NODEIFACE,
694                     Parallel(metadata.TAPIFACE),
695                     Parallel(metadata.TUNIFACE),
696                 ])
697             
698             # There are no required prestart steps
699             # to call upon recovery, so we're done
700         finally:
701             self.recovering = True
702     
703     def _make_generic(self, parameters, kind):
704         app = kind(self.plapi)
705         app.testbed = weakref.ref(self)
706
707         # Note: there is 1-to-1 correspondence between attribute names
708         #   If that changes, this has to change as well
709         for attr,val in parameters.iteritems():
710             try:
711                 setattr(app, attr, val)
712             except:
713                 # We ignore these errors while recovering.
714                 # Some attributes are immutable, and setting
715                 # them is necessary (to recover the state), but
716                 # some are not (they throw an exception).
717                 if not self.recovering:
718                     raise
719
720         return app
721
722     def _make_node(self, parameters):
723         node = self._make_generic(parameters, self._node.Node)
724         node.enable_cleanup = self.dedicatedSlice
725         return node
726
727     def _make_node_iface(self, parameters):
728         return self._make_generic(parameters, self._interfaces.NodeIface)
729
730     def _make_tun_iface(self, parameters):
731         return self._make_generic(parameters, self._interfaces.TunIface)
732
733     def _make_tap_iface(self, parameters):
734         return self._make_generic(parameters, self._interfaces.TapIface)
735
736     def _make_netpipe(self, parameters):
737         return self._make_generic(parameters, self._interfaces.NetPipe)
738
739     def _make_internet(self, parameters):
740         return self._make_generic(parameters, self._interfaces.Internet)
741
742     def _make_application(self, parameters):
743         return self._make_generic(parameters, self._app.Application)
744
745     def _make_dependency(self, parameters):
746         return self._make_generic(parameters, self._app.Dependency)
747
748     def _make_nepi_dependency(self, parameters):
749         return self._make_generic(parameters, self._app.NepiDependency)
750
751     def _make_ns3_dependency(self, parameters):
752         return self._make_generic(parameters, self._app.NS3Dependency)
753
754     def _make_tun_filter(self, parameters):
755         return self._make_generic(parameters, self._interfaces.TunFilter)
756
757     def _make_class_queue_filter(self, parameters):
758         return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
759
760     def _make_tos_queue_filter(self, parameters):
761         return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
762
763     def _make_multicast_forwarder(self, parameters):
764         return self._make_generic(parameters, self._multicast.MulticastForwarder)
765
766     def _make_multicast_announcer(self, parameters):
767         return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
768
769     def _make_multicast_router(self, parameters):
770         return self._make_generic(parameters, self._multicast.MulticastRouter)
771
772