da5f2f8641863dd0b9278e4b9e8924ff85ede236
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import threading
13 import sys
14 import os
15 import os.path
16 import time
17 import resourcealloc
18 import collections
19 import operator
20 import functools
21 import socket
22 import struct
23 import tempfile
24 import subprocess
25 import random
26 import shutil
27 import logging
28 import metadata
29 import weakref
30 import util as plutil
31
32 class TempKeyError(Exception):
33     pass
34
35 class TestbedController(testbed_impl.TestbedController):
36     def __init__(self):
37         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
38         self._home_directory = None
39         self.slicename = None
40         self._traces = dict()
41
42         import node, interfaces, application, multicast
43         self._node = node
44         self._interfaces = interfaces
45         self._app = application
46         self._multicast = multicast
47         
48         self._blacklist = set()
49         self._just_provisioned = set()
50         
51         self._load_blacklist()
52         
53         self._logger = logging.getLogger('nepi.testbeds.planetlab')
54         
55         self.recovering = False
56
57     @property
58     def home_directory(self):
59         return self._home_directory
60
61     @property
62     def plapi(self):
63         if not hasattr(self, '_plapi'):
64             import plcapi
65
66             if self.authUser:
67                 self._plapi = plcapi.PLCAPI(
68                     username = self.authUser,
69                     password = self.authString,
70                     hostname = self.plcHost,
71                     urlpattern = self.plcUrl
72                     )
73             else:
74                 # anonymous access - may not be enough for much
75                 self._plapi = plcapi.PLCAPI()
76         return self._plapi
77
78     @property
79     def slice_id(self):
80         if not hasattr(self, '_slice_id'):
81             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
82             if slices:
83                 self._slice_id = slices[0]['slice_id']
84             else:
85                 # If it wasn't found, don't remember this failure, keep trying
86                 return None
87         return self._slice_id
88     
89     @property
90     def vsys_vnet(self):
91         if not hasattr(self, '_vsys_vnet'):
92             self._vsys_vnet = plutil.getVnet(
93                 self.plapi,
94                 self.slicename)
95         return self._vsys_vnet
96     
97     def _load_blacklist(self):
98         blpath = environ.homepath('plblacklist')
99         
100         try:
101             bl = open(blpath, "r")
102         except:
103             self._blacklist = set()
104             return
105             
106         try:
107             self._blacklist = set(
108                 map(int,
109                     map(str.strip, bl.readlines())
110                 )
111             )
112         finally:
113             bl.close()
114     
115     def _save_blacklist(self):
116         blpath = environ.homepath('plblacklist')
117         bl = open(blpath, "w")
118         try:
119             bl.writelines(
120                 map('%s\n'.__mod__, self._blacklist))
121         finally:
122             bl.close()
123     
124     def do_setup(self):
125         self._home_directory = self._attributes.\
126             get_attribute_value("homeDirectory")
127         self.slicename = self._attributes.\
128             get_attribute_value("slice")
129         self.authUser = self._attributes.\
130             get_attribute_value("authUser")
131         self.authString = self._attributes.\
132             get_attribute_value("authPass")
133         self.sliceSSHKey = self._attributes.\
134             get_attribute_value("sliceSSHKey")
135         self.sliceSSHKeyPass = None
136         self.plcHost = self._attributes.\
137             get_attribute_value("plcHost")
138         self.plcUrl = self._attributes.\
139             get_attribute_value("plcUrl")
140         self.logLevel = self._attributes.\
141             get_attribute_value("plLogLevel")
142         self.tapPortBase = self._attributes.\
143             get_attribute_value("tapPortBase")
144         self.p2pDeployment = self._attributes.\
145             get_attribute_value("p2pDeployment")
146         self.dedicatedSlice = self._attributes.\
147             get_attribute_value("dedicatedSlice")
148         
149         if not self.slicename:
150             raise RuntimeError, "Slice not set"
151         if not self.authUser:
152             raise RuntimeError, "PlanetLab account username not set"
153         if not self.authString:
154             raise RuntimeError, "PlanetLab account passphrase not set"
155         if not self.sliceSSHKey:
156             raise RuntimeError, "PlanetLab account key not specified"
157         if not os.path.exists(self.sliceSSHKey):
158             raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
159         
160         self._logger.setLevel(getattr(logging,self.logLevel))
161         
162         super(TestbedController, self).do_setup()
163
164     def do_post_asynclaunch(self, guid):
165         # Dependencies were launched asynchronously,
166         # so wait for them
167         dep = self._elements[guid]
168         if isinstance(dep, self._app.Dependency):
169             dep.async_setup_wait()
170     
171     # Two-phase configuration for asynchronous launch
172     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
173     do_poststep_configure = staticmethod(do_post_asynclaunch)
174
175     def do_preconfigure(self):
176         while True:
177             # Perform resource discovery if we don't have
178             # specific resources assigned yet
179             self.do_resource_discovery()
180
181             # Create PlanetLab slivers
182             self.do_provisioning()
183             
184             try:
185                 # Wait for provisioning
186                 self.do_wait_nodes()
187                 
188                 # Okkey...
189                 break
190             except self._node.UnresponsiveNodeError:
191                 # Oh... retry...
192                 pass
193         
194         if self.p2pDeployment:
195             # Plan application deployment
196             self.do_spanning_deployment_plan()
197
198         # Configure elements per XML data
199         super(TestbedController, self).do_preconfigure()
200
201     def do_resource_discovery(self, recover = False):
202         to_provision = self._to_provision = set()
203         
204         reserved = set(self._blacklist)
205         for guid, node in self._elements.iteritems():
206             if isinstance(node, self._node.Node) and node._node_id is not None:
207                 reserved.add(node._node_id)
208         
209         # Initial algo:
210         #   look for perfectly defined nodes
211         #   (ie: those with only one candidate)
212         reserve_lock = threading.RLock()
213         def assignifunique(guid, node):
214             # Try existing nodes first
215             # If we have only one candidate, simply use it
216             candidates = node.find_candidates(
217                 filter_slice_id = self.slice_id)
218             
219             node_id = None
220             reserve_lock.acquire()
221             try:
222                 candidates -= reserved
223                 if len(candidates) == 1:
224                     node_id = iter(candidates).next()
225                     reserved.add(node_id)
226                 elif not candidates:
227                     # Try again including unassigned nodes
228                     reserve_lock.release()
229                     try:
230                         candidates = node.find_candidates()
231                     finally:
232                         reserve_lock.acquire()
233                     candidates -= reserved
234                     if len(candidates) > 1:
235                         return
236                     if len(candidates) == 1:
237                         node_id = iter(candidates).next()
238                         to_provision.add(node_id)
239                         reserved.add(node_id)
240                     elif not candidates:
241                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
242                             node.make_filter_description())
243             finally:
244                 reserve_lock.release()
245             
246             if node_id is not None:
247                 node.assign_node_id(node_id)
248         
249         runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
250         runner.start()
251         for guid, node in self._elements.iteritems():
252             if isinstance(node, self._node.Node) and node._node_id is None:
253                 runner.put(assignifunique, guid, node)
254         runner.sync()
255         
256         # Now do the backtracking search for a suitable solution
257         # First with existing slice nodes
258         reqs = []
259         nodes = []
260         def genreqs(node, filter_slice_id=None):
261             # Try existing nodes first
262             # If we have only one candidate, simply use it
263             candidates = node.find_candidates(
264                 filter_slice_id = filter_slice_id)
265             candidates -= reserved
266             reqs.append(candidates)
267             nodes.append(node)
268         for guid, node in self._elements.iteritems():
269             if isinstance(node, self._node.Node) and node._node_id is None:
270                 runner.put(genreqs, node, self.slice_id)
271         runner.sync()
272         
273         if nodes and reqs:
274             if recover:
275                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
276
277             def pickbest(fullset, nreq, node=nodes[0]):
278                 if len(fullset) > nreq:
279                     fullset = zip(node.rate_nodes(fullset),fullset)
280                     fullset.sort(reverse=True)
281                     del fullset[nreq:]
282                     return set(map(operator.itemgetter(1),fullset))
283                 else:
284                     return fullset
285             
286             try:
287                 solution = resourcealloc.alloc(reqs, sample=pickbest)
288             except resourcealloc.ResourceAllocationError:
289                 # Failed, try again with all nodes
290                 reqs = []
291                 for node in nodes:
292                     runner.put(genreqs, node)
293                 runner.sync()
294                 solution = resourcealloc.alloc(reqs, sample=pickbest)
295                 to_provision.update(solution)
296             
297             # Do assign nodes
298             for node, node_id in zip(nodes, solution):
299                 runner.put(node.assign_node_id, node_id)
300             runner.join()
301
302     def do_provisioning(self):
303         if self._to_provision:
304             # Add new nodes to the slice
305             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
306             new_nodes = list(set(cur_nodes) | self._to_provision)
307             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
308
309         # cleanup
310         self._just_provisioned = self._to_provision
311         del self._to_provision
312     
313     def do_wait_nodes(self):
314         for guid, node in self._elements.iteritems():
315             if isinstance(node, self._node.Node):
316                 # Just inject configuration stuff
317                 node.home_path = "nepi-node-%s" % (guid,)
318                 node.ident_path = self.sliceSSHKey
319                 node.slicename = self.slicename
320             
321                 # Show the magic
322                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
323         
324         try:
325             runner = ParallelRun(maxthreads=64, maxqueue=1)
326             abort = []
327             def waitforit(guid, node):
328                 try:
329                     node.wait_provisioning(
330                         (20*60 if node._node_id in self._just_provisioned else 60)
331                     )
332                     
333                     self._logger.info("READY Node %s at %s", guid, node.hostname)
334                     
335                     # Prepare dependency installer now
336                     node.prepare_dependencies()
337                 except:
338                     abort.append(None)
339                     raise
340                 
341             for guid, node in self._elements.iteritems():
342                 if abort:
343                     break
344                 if isinstance(node, self._node.Node):
345                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
346                     runner.put(waitforit, guid, node)
347             runner.join()
348                     
349         except self._node.UnresponsiveNodeError:
350             # Uh... 
351             self._logger.warn("UNRESPONSIVE Nodes")
352             
353             # Mark all dead nodes (which are unresponsive) on the blacklist
354             # and re-raise
355             for guid, node in self._elements.iteritems():
356                 if isinstance(node, self._node.Node):
357                     if not node.is_alive():
358                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
359                         self._blacklist.add(node._node_id)
360                         node.unassign_node()
361             
362             try:
363                 self._save_blacklist()
364             except:
365                 # not important...
366                 import traceback
367                 traceback.print_exc()
368             
369             raise
370     
371     def do_spanning_deployment_plan(self):
372         # Create application groups by collecting all applications
373         # based on their hash - the hash should contain everything that
374         # defines them and the platform they're built
375         
376         def dephash(app):
377             return (
378                 frozenset((app.depends or "").split(' ')),
379                 frozenset((app.sources or "").split(' ')),
380                 app.build,
381                 app.install,
382                 app.node.architecture,
383                 app.node.operatingSystem,
384                 app.node.pl_distro,
385                 app.__class__,
386             )
387         
388         depgroups = collections.defaultdict(list)
389         
390         for element in self._elements.itervalues():
391             if isinstance(element, self._app.Dependency):
392                 depgroups[dephash(element)].append(element)
393             elif isinstance(element, self._node.Node):
394                 deps = element._yum_dependencies
395                 if deps:
396                     depgroups[dephash(deps)].append(deps)
397         
398         # Set up spanning deployment for those applications that
399         # have been deployed in several nodes.
400         for dh, group in depgroups.iteritems():
401             if len(group) > 1:
402                 # Pick root (deterministically)
403                 root = min(group, key=lambda app:app.node.hostname)
404                 
405                 # Obtain all IPs in numeric format
406                 # (which means faster distance computations)
407                 for dep in group:
408                     dep._ip = socket.gethostbyname(dep.node.hostname)
409                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
410                 
411                 # Compute plan
412                 # NOTE: the plan is an iterator
413                 plan = mst.mst(
414                     group,
415                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
416                     root = root,
417                     maxbranching = 2)
418                 
419                 # Re-sign private key
420                 try:
421                     tempprk, temppuk, tmppass = self._make_temp_private_key()
422                 except TempKeyError:
423                     continue
424                 
425                 # Set up slaves
426                 plan = list(plan)
427                 for slave, master in plan:
428                     slave.set_master(master)
429                     slave.install_keys(tempprk, temppuk, tmppass)
430                     
431         # We don't need the user's passphrase anymore
432         self.sliceSSHKeyPass = None
433     
434     def _make_temp_private_key(self):
435         # Get the user's key's passphrase
436         if not self.sliceSSHKeyPass:
437             if 'SSH_ASKPASS' in os.environ:
438                 proc = subprocess.Popen(
439                     [ os.environ['SSH_ASKPASS'],
440                       "Please type the passphrase for the %s SSH identity file. "
441                       "The passphrase will be used to re-cipher the identity file with "
442                       "a random 256-bit key for automated chain deployment on the "
443                       "%s PlanetLab slice" % ( 
444                         os.path.basename(self.sliceSSHKey), 
445                         self.slicename
446                     ) ],
447                     stdin = open("/dev/null"),
448                     stdout = subprocess.PIPE,
449                     stderr = subprocess.PIPE)
450                 out,err = proc.communicate()
451                 self.sliceSSHKeyPass = out.strip()
452         
453         if not self.sliceSSHKeyPass:
454             raise TempKeyError
455         
456         # Create temporary key files
457         prk = tempfile.NamedTemporaryFile(
458             dir = self.root_directory,
459             prefix = "pl_deploy_tmpk_",
460             suffix = "")
461
462         puk = tempfile.NamedTemporaryFile(
463             dir = self.root_directory,
464             prefix = "pl_deploy_tmpk_",
465             suffix = ".pub")
466             
467         # Create secure 256-bits temporary passphrase
468         passphrase = os.urandom(32).encode("hex")
469                 
470         # Copy keys
471         oprk = open(self.sliceSSHKey, "rb")
472         opuk = open(self.sliceSSHKey+".pub", "rb")
473         shutil.copymode(oprk.name, prk.name)
474         shutil.copymode(opuk.name, puk.name)
475         shutil.copyfileobj(oprk, prk)
476         shutil.copyfileobj(opuk, puk)
477         prk.flush()
478         puk.flush()
479         oprk.close()
480         opuk.close()
481         
482         # A descriptive comment
483         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
484         
485         # Recipher keys
486         proc = subprocess.Popen(
487             ["ssh-keygen", "-p",
488              "-f", prk.name,
489              "-P", self.sliceSSHKeyPass,
490              "-N", passphrase,
491              "-C", comment ],
492             stdout = subprocess.PIPE,
493             stderr = subprocess.PIPE,
494             stdin = subprocess.PIPE
495         )
496         out, err = proc.communicate()
497         
498         if err:
499             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
500                 out, err)
501         
502         prk.seek(0)
503         puk.seek(0)
504         
505         # Change comment on public key
506         puklines = puk.readlines()
507         puklines[0] = puklines[0].split(' ')
508         puklines[0][-1] = comment+'\n'
509         puklines[0] = ' '.join(puklines[0])
510         puk.seek(0)
511         puk.truncate()
512         puk.writelines(puklines)
513         del puklines
514         puk.flush()
515         
516         return prk, puk, passphrase
517     
518     def set(self, guid, name, value, time = TIME_NOW):
519         super(TestbedController, self).set(guid, name, value, time)
520         # TODO: take on account schedule time for the task
521         element = self._elements[guid]
522         if element:
523             try:
524                 setattr(element, name, value)
525             except:
526                 # We ignore these errors while recovering.
527                 # Some attributes are immutable, and setting
528                 # them is necessary (to recover the state), but
529                 # some are not (they throw an exception).
530                 if not self.recovering:
531                     raise
532
533             if hasattr(element, 'refresh'):
534                 # invoke attribute refresh hook
535                 element.refresh()
536
537     def get(self, guid, name, time = TIME_NOW):
538         value = super(TestbedController, self).get(guid, name, time)
539         # TODO: take on account schedule time for the task
540         factory_id = self._create[guid]
541         factory = self._factories[factory_id]
542         element = self._elements.get(guid)
543         try:
544             return getattr(element, name)
545         except (KeyError, AttributeError):
546             return value
547
548     def get_address(self, guid, index, attribute='Address'):
549         index = int(index)
550
551         # try the real stuff
552         iface = self._elements.get(guid)
553         if iface and index == 0:
554             if attribute == 'Address':
555                 return iface.address
556             elif attribute == 'NetPrefix':
557                 return iface.netprefix
558             elif attribute == 'Broadcast':
559                 return iface.broadcast
560
561         # if all else fails, query box
562         return super(TestbedController, self).get_address(guid, index, attribute)
563
564     def action(self, time, guid, action):
565         raise NotImplementedError
566
567     def shutdown(self):
568         for trace in self._traces.itervalues():
569             trace.close()
570         
571         def invokeif(action, testbed, guid):
572             element = self._elements[guid]
573             if hasattr(element, action):
574                 getattr(element, action)()
575         
576         self._do_in_factory_order(
577             functools.partial(invokeif, 'cleanup'),
578             metadata.shutdown_order)
579
580         self._do_in_factory_order(
581             functools.partial(invokeif, 'destroy'),
582             metadata.shutdown_order)
583             
584         self._elements.clear()
585         self._traces.clear()
586
587     def trace(self, guid, trace_id, attribute='value'):
588         elem = self._elements[guid]
589
590         if attribute == 'value':
591             path = elem.sync_trace(self.home_directory, trace_id)
592             if path:
593                 fd = open(path, "r")
594                 content = fd.read()
595                 fd.close()
596             else:
597                 content = None
598         elif attribute == 'path':
599             content = elem.remote_trace_path(trace_id)
600         elif attribute == 'name':
601             content = elem.remote_trace_name(trace_id)
602         else:
603             content = None
604         return content
605
606     def follow_trace(self, trace_id, trace):
607         self._traces[trace_id] = trace
608
609     def recover(self):
610         try:
611             # An internal flag, so we know to behave differently in
612             # a few corner cases.
613             self.recovering = True
614             
615             # Create and connect do not perform any real tasks against
616             # the nodes, it only sets up the object hierarchy,
617             # so we can run them normally
618             self.do_create()
619             self.do_connect_init()
620             self.do_connect_compl()
621             
622             # Manually recover nodes, to mark dependencies installed
623             # and clean up mutable attributes
624             self._do_in_factory_order(
625                 lambda self, guid : self._elements[guid].recover(), 
626                 [
627                     metadata.NODE,
628                 ])
629             
630             # Assign nodes - since we're working off exeucte XML, nodes
631             # have specific hostnames assigned and we don't need to do
632             # real assignment, only find out node ids and check liveliness
633             self.do_resource_discovery(recover = True)
634             self.do_wait_nodes()
635             
636             # Pre/post configure, however, tends to set up tunnels
637             # Execute configuration steps only for those object
638             # kinds that do not have side effects
639             
640             # Do the ones without side effects,
641             # including nodes that need to set up home 
642             # folders and all that
643             self._do_in_factory_order(
644                 "preconfigure_function", 
645                 [
646                     metadata.INTERNET,
647                     Parallel(metadata.NODE),
648                     metadata.NODEIFACE,
649                 ])
650             
651             # Tunnels require a home path that is configured
652             # at this step. Since we cannot run the step itself,
653             # we need to inject this homepath ourselves
654             for guid, element in self._elements.iteritems():
655                 if isinstance(element, self._interfaces.TunIface):
656                     element._home_path = "tun-%s" % (guid,)
657             
658             # Manually recover tunnels, applications and
659             # netpipes, negating the side effects
660             self._do_in_factory_order(
661                 lambda self, guid : self._elements[guid].recover(), 
662                 [
663                     Parallel(metadata.TAPIFACE),
664                     Parallel(metadata.TUNIFACE),
665                     metadata.NETPIPE,
666                     Parallel(metadata.NEPIDEPENDENCY),
667                     Parallel(metadata.NS3DEPENDENCY),
668                     Parallel(metadata.DEPENDENCY),
669                     Parallel(metadata.APPLICATION),
670                 ])
671
672             # Tunnels are not harmed by configuration after
673             # recovery, and some attributes get set this way
674             # like external_iface
675             self._do_in_factory_order(
676                 "preconfigure_function", 
677                 [
678                     Parallel(metadata.TAPIFACE),
679                     Parallel(metadata.TUNIFACE),
680                 ])
681
682             # Post-do the ones without side effects
683             self._do_in_factory_order(
684                 "configure_function", 
685                 [
686                     metadata.INTERNET,
687                     Parallel(metadata.NODE),
688                     metadata.NODEIFACE,
689                     Parallel(metadata.TAPIFACE),
690                     Parallel(metadata.TUNIFACE),
691                 ])
692             
693             # There are no required prestart steps
694             # to call upon recovery, so we're done
695         finally:
696             self.recovering = True
697     
698     def _make_generic(self, parameters, kind):
699         app = kind(self.plapi)
700         app.testbed = weakref.ref(self)
701
702         # Note: there is 1-to-1 correspondence between attribute names
703         #   If that changes, this has to change as well
704         for attr,val in parameters.iteritems():
705             try:
706                 setattr(app, attr, val)
707             except:
708                 # We ignore these errors while recovering.
709                 # Some attributes are immutable, and setting
710                 # them is necessary (to recover the state), but
711                 # some are not (they throw an exception).
712                 if not self.recovering:
713                     raise
714
715         return app
716
717     def _make_node(self, parameters):
718         node = self._make_generic(parameters, self._node.Node)
719         node.enable_cleanup = self.dedicatedSlice
720         return node
721
722     def _make_node_iface(self, parameters):
723         return self._make_generic(parameters, self._interfaces.NodeIface)
724
725     def _make_tun_iface(self, parameters):
726         return self._make_generic(parameters, self._interfaces.TunIface)
727
728     def _make_tap_iface(self, parameters):
729         return self._make_generic(parameters, self._interfaces.TapIface)
730
731     def _make_netpipe(self, parameters):
732         return self._make_generic(parameters, self._interfaces.NetPipe)
733
734     def _make_internet(self, parameters):
735         return self._make_generic(parameters, self._interfaces.Internet)
736
737     def _make_application(self, parameters):
738         return self._make_generic(parameters, self._app.Application)
739
740     def _make_dependency(self, parameters):
741         return self._make_generic(parameters, self._app.Dependency)
742
743     def _make_nepi_dependency(self, parameters):
744         return self._make_generic(parameters, self._app.NepiDependency)
745
746     def _make_ns3_dependency(self, parameters):
747         return self._make_generic(parameters, self._app.NS3Dependency)
748
749     def _make_tun_filter(self, parameters):
750         return self._make_generic(parameters, self._interfaces.TunFilter)
751
752     def _make_class_queue_filter(self, parameters):
753         return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
754
755     def _make_tos_queue_filter(self, parameters):
756         return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
757
758     def _make_multicast_forwarder(self, parameters):
759         return self._make_generic(parameters, self._multicast.MulticastForwarder)
760
761     def _make_multicast_announcer(self, parameters):
762         return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
763
764     def _make_multicast_router(self, parameters):
765         return self._make_generic(parameters, self._multicast.MulticastRouter)
766
767