fixed shebangs in non executable .py files
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID, TESTBED_VERSION
4 from nepi.core import testbed_impl
5 from nepi.core.metadata import Parallel
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
11 import threading
12 import sys
13 import os
14 import os.path
15 import time
16 import resourcealloc
17 import collections
18 import operator
19 import functools
20 import socket
21 import struct
22 import tempfile
23 import subprocess
24 import random
25 import shutil
26 import logging
27 import metadata
28 import weakref
29 import util as plutil
30
31 class TempKeyError(Exception):
32     pass
33
34 class TestbedController(testbed_impl.TestbedController):
35     def __init__(self):
36         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
37         self._home_directory = None
38         self.slicename = None
39         self._traces = dict()
40
41         import node, interfaces, application, multicast
42         self._node = node
43         self._interfaces = interfaces
44         self._app = application
45         self._multicast = multicast
46         
47         self._blacklist = set()
48         self._just_provisioned = set()
49         
50         self._load_blacklist()
51         
52         self._logger = logging.getLogger('nepi.testbeds.planetlab')
53         
54         self.recovering = False
55
56     @property
57     def home_directory(self):
58         return self._home_directory
59
60     @property
61     def plapi(self):
62         if not hasattr(self, '_plapi'):
63             import plcapi
64
65             if self.authUser:
66                 self._plapi = plcapi.PLCAPI(
67                     username = self.authUser,
68                     password = self.authString,
69                     hostname = self.plcHost,
70                     urlpattern = self.plcUrl
71                     )
72             else:
73                 # anonymous access - may not be enough for much
74                 self._plapi = plcapi.PLCAPI()
75         return self._plapi
76
77     @property
78     def slice_id(self):
79         if not hasattr(self, '_slice_id'):
80             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
81             if slices:
82                 self._slice_id = slices[0]['slice_id']
83             else:
84                 # If it wasn't found, don't remember this failure, keep trying
85                 return None
86         return self._slice_id
87     
88     @property
89     def vsys_vnet(self):
90         if not hasattr(self, '_vsys_vnet'):
91             self._vsys_vnet = plutil.getVnet(
92                 self.plapi,
93                 self.slicename)
94         return self._vsys_vnet
95     
96     def _load_blacklist(self):
97         blpath = environ.homepath('plblacklist')
98         
99         try:
100             bl = open(blpath, "r")
101         except:
102             self._blacklist = set()
103             return
104             
105         try:
106             self._blacklist = set(
107                 map(int,
108                     map(str.strip, bl.readlines())
109                 )
110             )
111         finally:
112             bl.close()
113     
114     def _save_blacklist(self):
115         blpath = environ.homepath('plblacklist')
116         bl = open(blpath, "w")
117         try:
118             bl.writelines(
119                 map('%s\n'.__mod__, self._blacklist))
120         finally:
121             bl.close()
122     
123     def do_setup(self):
124         self._home_directory = self._attributes.\
125             get_attribute_value("homeDirectory")
126         self.slicename = self._attributes.\
127             get_attribute_value("slice")
128         self.authUser = self._attributes.\
129             get_attribute_value("authUser")
130         self.authString = self._attributes.\
131             get_attribute_value("authPass")
132         self.sliceSSHKey = self._attributes.\
133             get_attribute_value("sliceSSHKey")
134         self.sliceSSHKeyPass = None
135         self.plcHost = self._attributes.\
136             get_attribute_value("plcHost")
137         self.plcUrl = self._attributes.\
138             get_attribute_value("plcUrl")
139         self.logLevel = self._attributes.\
140             get_attribute_value("plLogLevel")
141         self.tapPortBase = self._attributes.\
142             get_attribute_value("tapPortBase")
143         self.p2pDeployment = self._attributes.\
144             get_attribute_value("p2pDeployment")
145         self.dedicatedSlice = self._attributes.\
146             get_attribute_value("dedicatedSlice")
147         
148         if not self.slicename:
149             raise RuntimeError, "Slice not set"
150         if not self.authUser:
151             raise RuntimeError, "PlanetLab account username not set"
152         if not self.authString:
153             raise RuntimeError, "PlanetLab account passphrase not set"
154         if not self.sliceSSHKey:
155             raise RuntimeError, "PlanetLab account key not specified"
156         if not os.path.exists(self.sliceSSHKey):
157             raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
158         
159         self._logger.setLevel(getattr(logging,self.logLevel))
160         
161         super(TestbedController, self).do_setup()
162
163     def do_post_asynclaunch(self, guid):
164         # Dependencies were launched asynchronously,
165         # so wait for them
166         dep = self._elements[guid]
167         if isinstance(dep, self._app.Dependency):
168             dep.async_setup_wait()
169     
170     # Two-phase configuration for asynchronous launch
171     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
172     do_poststep_configure = staticmethod(do_post_asynclaunch)
173
174     def do_preconfigure(self):
175         while True:
176             # Perform resource discovery if we don't have
177             # specific resources assigned yet
178             self.do_resource_discovery()
179
180             # Create PlanetLab slivers
181             self.do_provisioning()
182             
183             try:
184                 # Wait for provisioning
185                 self.do_wait_nodes()
186                 
187                 # Okkey...
188                 break
189             except self._node.UnresponsiveNodeError:
190                 # Oh... retry...
191                 pass
192         
193         if self.p2pDeployment:
194             # Plan application deployment
195             self.do_spanning_deployment_plan()
196
197         # Configure elements per XML data
198         super(TestbedController, self).do_preconfigure()
199
200     def do_resource_discovery(self, recover = False):
201         to_provision = self._to_provision = set()
202         
203         reserved = set(self._blacklist)
204         for guid, node in self._elements.iteritems():
205             if isinstance(node, self._node.Node) and node._node_id is not None:
206                 reserved.add(node._node_id)
207         
208         # Initial algo:
209         #   look for perfectly defined nodes
210         #   (ie: those with only one candidate)
211         reserve_lock = threading.RLock()
212         def assignifunique(guid, node):
213             # Try existing nodes first
214             # If we have only one candidate, simply use it
215             candidates = node.find_candidates(
216                 filter_slice_id = self.slice_id)
217             
218             node_id = None
219             reserve_lock.acquire()
220             try:
221                 candidates -= reserved
222                 if len(candidates) == 1:
223                     node_id = iter(candidates).next()
224                     reserved.add(node_id)
225                 elif not candidates:
226                     # Try again including unassigned nodes
227                     reserve_lock.release()
228                     try:
229                         candidates = node.find_candidates()
230                     finally:
231                         reserve_lock.acquire()
232                     candidates -= reserved
233                     if len(candidates) > 1:
234                         return
235                     if len(candidates) == 1:
236                         node_id = iter(candidates).next()
237                         to_provision.add(node_id)
238                         reserved.add(node_id)
239                     elif not candidates:
240                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
241                             node.make_filter_description())
242             finally:
243                 reserve_lock.release()
244             
245             if node_id is not None:
246                 node.assign_node_id(node_id)
247         
248         runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
249         runner.start()
250         for guid, node in self._elements.iteritems():
251             if isinstance(node, self._node.Node) and node._node_id is None:
252                 runner.put(assignifunique, guid, node)
253         runner.sync()
254         
255         # Now do the backtracking search for a suitable solution
256         # First with existing slice nodes
257         reqs = []
258         nodes = []
259         def genreqs(node, filter_slice_id=None):
260             # Try existing nodes first
261             # If we have only one candidate, simply use it
262             candidates = node.find_candidates(
263                 filter_slice_id = filter_slice_id)
264             candidates -= reserved
265             reqs.append(candidates)
266             nodes.append(node)
267         for guid, node in self._elements.iteritems():
268             if isinstance(node, self._node.Node) and node._node_id is None:
269                 runner.put(genreqs, node, self.slice_id)
270         runner.sync()
271         
272         if nodes and reqs:
273             if recover:
274                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
275
276             def pickbest(fullset, nreq, node=nodes[0]):
277                 if len(fullset) > nreq:
278                     fullset = zip(node.rate_nodes(fullset),fullset)
279                     fullset.sort(reverse=True)
280                     del fullset[nreq:]
281                     return set(map(operator.itemgetter(1),fullset))
282                 else:
283                     return fullset
284             
285             try:
286                 solution = resourcealloc.alloc(reqs, sample=pickbest)
287             except resourcealloc.ResourceAllocationError:
288                 # Failed, try again with all nodes
289                 reqs = []
290                 for node in nodes:
291                     runner.put(genreqs, node)
292                 runner.sync()
293                 solution = resourcealloc.alloc(reqs, sample=pickbest)
294                 to_provision.update(solution)
295             
296             # Do assign nodes
297             for node, node_id in zip(nodes, solution):
298                 runner.put(node.assign_node_id, node_id)
299             runner.join()
300
301     def do_provisioning(self):
302         if self._to_provision:
303             # Add new nodes to the slice
304             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
305             new_nodes = list(set(cur_nodes) | self._to_provision)
306             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
307
308         # cleanup
309         self._just_provisioned = self._to_provision
310         del self._to_provision
311     
312     def do_wait_nodes(self):
313         for guid, node in self._elements.iteritems():
314             if isinstance(node, self._node.Node):
315                 # Just inject configuration stuff
316                 node.home_path = "nepi-node-%s" % (guid,)
317                 node.ident_path = self.sliceSSHKey
318                 node.slicename = self.slicename
319             
320                 # Show the magic
321                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
322         
323         try:
324             runner = ParallelRun(maxthreads=64, maxqueue=1)
325             abort = []
326             def waitforit(guid, node):
327                 try:
328                     node.wait_provisioning(
329                         (20*60 if node._node_id in self._just_provisioned else 60)
330                     )
331                     
332                     self._logger.info("READY Node %s at %s", guid, node.hostname)
333                     
334                     # Prepare dependency installer now
335                     node.prepare_dependencies()
336                 except:
337                     abort.append(None)
338                     raise
339                 
340             for guid, node in self._elements.iteritems():
341                 if abort:
342                     break
343                 if isinstance(node, self._node.Node):
344                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
345                     runner.put(waitforit, guid, node)
346             runner.join()
347                     
348         except self._node.UnresponsiveNodeError:
349             # Uh... 
350             self._logger.warn("UNRESPONSIVE Nodes")
351             
352             # Mark all dead nodes (which are unresponsive) on the blacklist
353             # and re-raise
354             for guid, node in self._elements.iteritems():
355                 if isinstance(node, self._node.Node):
356                     if not node.is_alive():
357                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
358                         self._blacklist.add(node._node_id)
359                         node.unassign_node()
360             
361             try:
362                 self._save_blacklist()
363             except:
364                 # not important...
365                 import traceback
366                 traceback.print_exc()
367             
368             raise
369     
370     def do_spanning_deployment_plan(self):
371         # Create application groups by collecting all applications
372         # based on their hash - the hash should contain everything that
373         # defines them and the platform they're built
374         
375         def dephash(app):
376             return (
377                 frozenset((app.depends or "").split(' ')),
378                 frozenset((app.sources or "").split(' ')),
379                 app.build,
380                 app.install,
381                 app.node.architecture,
382                 app.node.operatingSystem,
383                 app.node.pl_distro,
384                 app.__class__,
385             )
386         
387         depgroups = collections.defaultdict(list)
388         
389         for element in self._elements.itervalues():
390             if isinstance(element, self._app.Dependency):
391                 depgroups[dephash(element)].append(element)
392             elif isinstance(element, self._node.Node):
393                 deps = element._yum_dependencies
394                 if deps:
395                     depgroups[dephash(deps)].append(deps)
396         
397         # Set up spanning deployment for those applications that
398         # have been deployed in several nodes.
399         for dh, group in depgroups.iteritems():
400             if len(group) > 1:
401                 # Pick root (deterministically)
402                 root = min(group, key=lambda app:app.node.hostname)
403                 
404                 # Obtain all IPs in numeric format
405                 # (which means faster distance computations)
406                 for dep in group:
407                     dep._ip = socket.gethostbyname(dep.node.hostname)
408                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
409                 
410                 # Compute plan
411                 # NOTE: the plan is an iterator
412                 plan = mst.mst(
413                     group,
414                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
415                     root = root,
416                     maxbranching = 2)
417                 
418                 # Re-sign private key
419                 try:
420                     tempprk, temppuk, tmppass = self._make_temp_private_key()
421                 except TempKeyError:
422                     continue
423                 
424                 # Set up slaves
425                 plan = list(plan)
426                 for slave, master in plan:
427                     slave.set_master(master)
428                     slave.install_keys(tempprk, temppuk, tmppass)
429                     
430         # We don't need the user's passphrase anymore
431         self.sliceSSHKeyPass = None
432     
433     def _make_temp_private_key(self):
434         # Get the user's key's passphrase
435         if not self.sliceSSHKeyPass:
436             if 'SSH_ASKPASS' in os.environ:
437                 proc = subprocess.Popen(
438                     [ os.environ['SSH_ASKPASS'],
439                       "Please type the passphrase for the %s SSH identity file. "
440                       "The passphrase will be used to re-cipher the identity file with "
441                       "a random 256-bit key for automated chain deployment on the "
442                       "%s PlanetLab slice" % ( 
443                         os.path.basename(self.sliceSSHKey), 
444                         self.slicename
445                     ) ],
446                     stdin = open("/dev/null"),
447                     stdout = subprocess.PIPE,
448                     stderr = subprocess.PIPE)
449                 out,err = proc.communicate()
450                 self.sliceSSHKeyPass = out.strip()
451         
452         if not self.sliceSSHKeyPass:
453             raise TempKeyError
454         
455         # Create temporary key files
456         prk = tempfile.NamedTemporaryFile(
457             dir = self.root_directory,
458             prefix = "pl_deploy_tmpk_",
459             suffix = "")
460
461         puk = tempfile.NamedTemporaryFile(
462             dir = self.root_directory,
463             prefix = "pl_deploy_tmpk_",
464             suffix = ".pub")
465             
466         # Create secure 256-bits temporary passphrase
467         passphrase = os.urandom(32).encode("hex")
468                 
469         # Copy keys
470         oprk = open(self.sliceSSHKey, "rb")
471         opuk = open(self.sliceSSHKey+".pub", "rb")
472         shutil.copymode(oprk.name, prk.name)
473         shutil.copymode(opuk.name, puk.name)
474         shutil.copyfileobj(oprk, prk)
475         shutil.copyfileobj(opuk, puk)
476         prk.flush()
477         puk.flush()
478         oprk.close()
479         opuk.close()
480         
481         # A descriptive comment
482         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
483         
484         # Recipher keys
485         proc = subprocess.Popen(
486             ["ssh-keygen", "-p",
487              "-f", prk.name,
488              "-P", self.sliceSSHKeyPass,
489              "-N", passphrase,
490              "-C", comment ],
491             stdout = subprocess.PIPE,
492             stderr = subprocess.PIPE,
493             stdin = subprocess.PIPE
494         )
495         out, err = proc.communicate()
496         
497         if err:
498             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
499                 out, err)
500         
501         prk.seek(0)
502         puk.seek(0)
503         
504         # Change comment on public key
505         puklines = puk.readlines()
506         puklines[0] = puklines[0].split(' ')
507         puklines[0][-1] = comment+'\n'
508         puklines[0] = ' '.join(puklines[0])
509         puk.seek(0)
510         puk.truncate()
511         puk.writelines(puklines)
512         del puklines
513         puk.flush()
514         
515         return prk, puk, passphrase
516     
517     def set(self, guid, name, value, time = TIME_NOW):
518         super(TestbedController, self).set(guid, name, value, time)
519         # TODO: take on account schedule time for the task
520         element = self._elements[guid]
521         if element:
522             try:
523                 setattr(element, name, value)
524             except:
525                 # We ignore these errors while recovering.
526                 # Some attributes are immutable, and setting
527                 # them is necessary (to recover the state), but
528                 # some are not (they throw an exception).
529                 if not self.recovering:
530                     raise
531
532             if hasattr(element, 'refresh'):
533                 # invoke attribute refresh hook
534                 element.refresh()
535
536     def get(self, guid, name, time = TIME_NOW):
537         value = super(TestbedController, self).get(guid, name, time)
538         # TODO: take on account schedule time for the task
539         factory_id = self._create[guid]
540         factory = self._factories[factory_id]
541         element = self._elements.get(guid)
542         try:
543             return getattr(element, name)
544         except (KeyError, AttributeError):
545             return value
546
547     def get_address(self, guid, index, attribute='Address'):
548         index = int(index)
549
550         # try the real stuff
551         iface = self._elements.get(guid)
552         if iface and index == 0:
553             if attribute == 'Address':
554                 return iface.address
555             elif attribute == 'NetPrefix':
556                 return iface.netprefix
557             elif attribute == 'Broadcast':
558                 return iface.broadcast
559
560         # if all else fails, query box
561         return super(TestbedController, self).get_address(guid, index, attribute)
562
563     def action(self, time, guid, action):
564         raise NotImplementedError
565
566     def shutdown(self):
567         for trace in self._traces.itervalues():
568             trace.close()
569         
570         def invokeif(action, testbed, guid):
571             element = self._elements[guid]
572             if hasattr(element, action):
573                 getattr(element, action)()
574         
575         self._do_in_factory_order(
576             functools.partial(invokeif, 'cleanup'),
577             metadata.shutdown_order)
578
579         self._do_in_factory_order(
580             functools.partial(invokeif, 'destroy'),
581             metadata.shutdown_order)
582             
583         self._elements.clear()
584         self._traces.clear()
585
586     def trace(self, guid, trace_id, attribute='value'):
587         elem = self._elements[guid]
588
589         if attribute == 'value':
590             path = elem.sync_trace(self.home_directory, trace_id)
591             if path:
592                 fd = open(path, "r")
593                 content = fd.read()
594                 fd.close()
595             else:
596                 content = None
597         elif attribute == 'path':
598             content = elem.remote_trace_path(trace_id)
599         elif attribute == 'name':
600             content = elem.remote_trace_name(trace_id)
601         else:
602             content = None
603         return content
604
605     def follow_trace(self, trace_id, trace):
606         self._traces[trace_id] = trace
607
608     def recover(self):
609         try:
610             # An internal flag, so we know to behave differently in
611             # a few corner cases.
612             self.recovering = True
613             
614             # Create and connect do not perform any real tasks against
615             # the nodes, it only sets up the object hierarchy,
616             # so we can run them normally
617             self.do_create()
618             self.do_connect_init()
619             self.do_connect_compl()
620             
621             # Manually recover nodes, to mark dependencies installed
622             # and clean up mutable attributes
623             self._do_in_factory_order(
624                 lambda self, guid : self._elements[guid].recover(), 
625                 [
626                     metadata.NODE,
627                 ])
628             
629             # Assign nodes - since we're working off exeucte XML, nodes
630             # have specific hostnames assigned and we don't need to do
631             # real assignment, only find out node ids and check liveliness
632             self.do_resource_discovery(recover = True)
633             self.do_wait_nodes()
634             
635             # Pre/post configure, however, tends to set up tunnels
636             # Execute configuration steps only for those object
637             # kinds that do not have side effects
638             
639             # Do the ones without side effects,
640             # including nodes that need to set up home 
641             # folders and all that
642             self._do_in_factory_order(
643                 "preconfigure_function", 
644                 [
645                     metadata.INTERNET,
646                     Parallel(metadata.NODE),
647                     metadata.NODEIFACE,
648                 ])
649             
650             # Tunnels require a home path that is configured
651             # at this step. Since we cannot run the step itself,
652             # we need to inject this homepath ourselves
653             for guid, element in self._elements.iteritems():
654                 if isinstance(element, self._interfaces.TunIface):
655                     element._home_path = "tun-%s" % (guid,)
656             
657             # Manually recover tunnels, applications and
658             # netpipes, negating the side effects
659             self._do_in_factory_order(
660                 lambda self, guid : self._elements[guid].recover(), 
661                 [
662                     Parallel(metadata.TAPIFACE),
663                     Parallel(metadata.TUNIFACE),
664                     metadata.NETPIPE,
665                     Parallel(metadata.NEPIDEPENDENCY),
666                     Parallel(metadata.NS3DEPENDENCY),
667                     Parallel(metadata.DEPENDENCY),
668                     Parallel(metadata.APPLICATION),
669                 ])
670
671             # Tunnels are not harmed by configuration after
672             # recovery, and some attributes get set this way
673             # like external_iface
674             self._do_in_factory_order(
675                 "preconfigure_function", 
676                 [
677                     Parallel(metadata.TAPIFACE),
678                     Parallel(metadata.TUNIFACE),
679                 ])
680
681             # Post-do the ones without side effects
682             self._do_in_factory_order(
683                 "configure_function", 
684                 [
685                     metadata.INTERNET,
686                     Parallel(metadata.NODE),
687                     metadata.NODEIFACE,
688                     Parallel(metadata.TAPIFACE),
689                     Parallel(metadata.TUNIFACE),
690                 ])
691             
692             # There are no required prestart steps
693             # to call upon recovery, so we're done
694         finally:
695             self.recovering = True
696     
697     def _make_generic(self, parameters, kind):
698         app = kind(self.plapi)
699         app.testbed = weakref.ref(self)
700
701         # Note: there is 1-to-1 correspondence between attribute names
702         #   If that changes, this has to change as well
703         for attr,val in parameters.iteritems():
704             try:
705                 setattr(app, attr, val)
706             except:
707                 # We ignore these errors while recovering.
708                 # Some attributes are immutable, and setting
709                 # them is necessary (to recover the state), but
710                 # some are not (they throw an exception).
711                 if not self.recovering:
712                     raise
713
714         return app
715
716     def _make_node(self, parameters):
717         node = self._make_generic(parameters, self._node.Node)
718         node.enable_cleanup = self.dedicatedSlice
719         return node
720
721     def _make_node_iface(self, parameters):
722         return self._make_generic(parameters, self._interfaces.NodeIface)
723
724     def _make_tun_iface(self, parameters):
725         return self._make_generic(parameters, self._interfaces.TunIface)
726
727     def _make_tap_iface(self, parameters):
728         return self._make_generic(parameters, self._interfaces.TapIface)
729
730     def _make_netpipe(self, parameters):
731         return self._make_generic(parameters, self._interfaces.NetPipe)
732
733     def _make_internet(self, parameters):
734         return self._make_generic(parameters, self._interfaces.Internet)
735
736     def _make_application(self, parameters):
737         return self._make_generic(parameters, self._app.Application)
738
739     def _make_dependency(self, parameters):
740         return self._make_generic(parameters, self._app.Dependency)
741
742     def _make_nepi_dependency(self, parameters):
743         return self._make_generic(parameters, self._app.NepiDependency)
744
745     def _make_ns3_dependency(self, parameters):
746         return self._make_generic(parameters, self._app.NS3Dependency)
747
748     def _make_tun_filter(self, parameters):
749         return self._make_generic(parameters, self._interfaces.TunFilter)
750
751     def _make_class_queue_filter(self, parameters):
752         return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
753
754     def _make_tos_queue_filter(self, parameters):
755         return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
756
757     def _make_multicast_forwarder(self, parameters):
758         return self._make_generic(parameters, self._multicast.MulticastForwarder)
759
760     def _make_multicast_announcer(self, parameters):
761         return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
762
763     def _make_multicast_router(self, parameters):
764         return self._make_generic(parameters, self._multicast.MulticastRouter)
765
766