# Tasks
self._tasks = dict()
+ # RM groups
+ self._groups = dict()
+
+ # generator of globally unique id for groups
+ self._group_id_generator = guid.GuidGenerator()
+
# Event processing thread
self._cond = threading.Condition()
self._thread = threading.Thread(target = self._process)
rm1.register_connection(guid2)
rm2.register_connection(guid1)
- def register_condition(self, group1, action, group2, state,
+ def register_condition(self, guids1, action, guids2, state,
time = None):
- """ Registers an action START or STOP for all RM on group1 to occur
- time 'time' after all elements in group2 reached state 'state'.
+ """ Registers an action START or STOP for all RM on guids1 to occur
+ time 'time' after all elements in guids2 reached state 'state'.
- :param group1: List of guids of RMs subjected to action
- :type group1: list
+ :param guids1: List of guids of RMs subjected to action
+ :type guids1: list
:param action: Action to register (either START or STOP)
:type action: ResourceAction
- :param group2: List of guids of RMs to we waited for
- :type group2: list
+ :param guids2: List of guids of RMs to we waited for
+ :type guids2: list
:param state: State to wait for on RMs (STARTED, STOPPED, etc)
:type state: ResourceState
- :param time: Time to wait after group2 has reached status
+ :param time: Time to wait after guids2 has reached status
:type time: string
"""
- if isinstance(group1, int):
- group1 = [group1]
- if isinstance(group2, int):
- group2 = [group2]
+ if isinstance(guids1, int):
+ guids1 = [guids1]
+ if isinstance(guids2, int):
+ guids2 = [guids2]
- for guid1 in group1:
+ for guid1 in guids1:
rm = self.get_resource(guid1)
- rm.register_condition(action, group2, state, time)
+ rm.register_condition(action, guids2, state, time)
def enable_trace(self, guid, name):
""" Enable trace
rm = self.get_resource(guid)
return rm.start()
- def set_with_conditions(self, name, value, group1, group2, state,
+ def set_with_conditions(self, name, value, guids1, guids2, state,
time = None):
""" Set value 'value' on attribute with name 'name' on all RMs of
- group1 when 'time' has elapsed since all elements in group2
+ guids1 when 'time' has elapsed since all elements in guids2
have reached state 'state'.
:param name: Name of attribute to set in RM
:param value: Value of attribute to set in RM
:type name: string
- :param group1: List of guids of RMs subjected to action
- :type group1: list
+ :param guids1: List of guids of RMs subjected to action
+ :type guids1: list
:param action: Action to register (either START or STOP)
:type action: ResourceAction
- :param group2: List of guids of RMs to we waited for
- :type group2: list
+ :param guids2: List of guids of RMs to we waited for
+ :type guids2: list
:param state: State to wait for on RMs (STARTED, STOPPED, etc)
:type state: ResourceState
- :param time: Time to wait after group2 has reached status
+ :param time: Time to wait after guids2 has reached status
:type time: string
"""
- if isinstance(group1, int):
- group1 = [group1]
- if isinstance(group2, int):
- group2 = [group2]
+ if isinstance(guids1, int):
+ guids1 = [guids1]
+ if isinstance(guids2, int):
+ guids2 = [guids2]
- for guid1 in group1:
+ for guid1 in guids1:
rm = self.get_resource(guid)
- rm.set_with_conditions(name, value, group2, state, time)
+ rm.set_with_conditions(name, value, guids2, state, time)
def stop_with_conditions(self, guid):
""" Stop a specific RM defined by its 'guid' only if all the conditions are true
rm = self.get_resource(guid)
return rm.start_with_conditions()
- def deploy(self, group = None, wait_all_ready = True):
- """ Deploy all resource manager in group
+ def deploy(self, guids = None, wait_all_ready = True, group = None):
+ """ Deploy all resource manager in guids list
- :param group: List of guids of RMs to deploy
- :type group: list
+ :param guids: List of guids of RMs to deploy
+ :type guids: list
:param wait_all_ready: Wait until all RMs are ready in
order to start the RMs
:type guid: int
+ :param group: Id of deployment group in which to deploy RMs
+ :type group: int
+
"""
self.logger.debug(" ------- DEPLOY START ------ ")
- if not group:
- # By default, if not deployment group is indicated,
- # all RMs that are undeployed will be deployed
- group = []
+ if not guids:
+ # If no guids list was indicated, all 'NEW' RMs will be deployed
+ guids = []
for guid in self.resources:
if self.state(guid) == ResourceState.NEW:
- group.append(guid)
+ guids.append(guid)
- if isinstance(group, int):
- group = [group]
+ if isinstance(guids, int):
+ guids = [guids]
+
+ # Create deployment group
+ if not group:
+ group = self._group_id_generator.next(guid)
+
+ if group not in self._groups:
+ self._groups[group] = []
- # Before starting deployment we disorder the group list with the
+ self._groups[group].extend(guids)
+
+ # Before starting deployment we disorder the guids list with the
# purpose of speeding up the whole deployment process.
- # It is likely that the user inserted in the 'group' list closely
+ # It is likely that the user inserted in the 'guids' list closely
# resources one after another (e.g. all applications
# connected to the same node can likely appear one after another).
# This can originate a slow down in the deployment since the N
# be taken up by the same family of resources waiting for the
# same conditions (e.g. LinuxApplications running on a same
# node share a single lock, so they will tend to be serialized).
- # If we disorder the group list, this problem can be mitigated.
- random.shuffle(group)
+ # If we disorder the guids list, this problem can be mitigated.
+ random.shuffle(guids)
def wait_all_and_start(group):
reschedule = False
- for guid in group:
+
+ # Get all guids in group
+ guids = self._groups[group]
+
+ for guid in guids:
if self.state(guid) < ResourceState.READY:
reschedule = True
break
self.schedule("1s", callback)
else:
# If all resources are read, we schedule the start
- for guid in group:
+ for guid in guids:
rm = self.get_resource(guid)
self.schedule("0s", rm.start_with_conditions)
if wait_all_ready:
- # Schedule the function that will check all resources are
- # READY, and only then it will schedule the start.
- # This is aimed to reduce the number of tasks looping in the scheduler.
- # Intead of having N start tasks, we will have only one
+ # Schedule a function to check that all resources are
+ # READY, and only then schedule the start.
+ # This aimes at reducing the number of tasks looping in the
+ # scheduler.
+ # Intead of having N start tasks, we will have only one for
+ # the whole group.
callback = functools.partial(wait_all_and_start, group)
self.schedule("1s", callback)
- for guid in group:
+ for guid in guids:
rm = self.get_resource(guid)
+ rm.deployment_group = group
self.schedule("0s", rm.deploy)
if not wait_all_ready:
# schedule a stop. Otherwise the RM will stop immediately
self.schedule("2s", rm.stop_with_conditions)
- def release(self, group = None):
- """ Release the elements of the list 'group' or
- all the resources if any group is specified
+ def release(self, guids = None):
+ """ Release al RMs on the guids list or
+ all the resources if no list is specified
- :param group: List of RM
- :type group: list
+ :param guids: List of RM guids
+ :type guids: list
"""
- if not group:
- group = self.resources
+ if not guids:
+ guids = self.resources
- for guid in group:
+ for guid in guids:
rm = self.get_resource(guid)
self.schedule("0s", rm.release)
- self.wait_released(group)
+ self.wait_released(guids)
def shutdown(self):
""" Shutdown the Experiment Controller.
self._state = ResourceState.NEW
+ self.deployment_group = None
+
self._start_time = None
self._stop_time = None
self._discover_time = None
result = self.ec.trace(rm.guid, trace_name)
fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
rename))
- f = open(fpath, "w")
- f.write(result)
- f.close()
+ try:
+ f = open(fpath, "w")
+ f.write(result)
+ f.close()
+ except:
+ msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name,
+ rm.guid, fpath)
+ self.error(msg)
+ continue
super(Collector, self).release()
if self.state == ResourceState.STARTED:
- stopped = True
-
self.info("Stopping command '%s'" % command)
# If the command is running in foreground (it was launched using
# the node 'execute' method), then we use the handler to the Popen
# process to kill it. Else we send a kill signal using the pid and ppid
# retrieved after running the command with the node 'run' method
+ stopped = True
if self._proc:
self._proc.kill()
msg = " Failed to STOP command '%s' " % self.get("command")
self.error(msg, out, err)
self.fail()
- stopped = False
- if stopped:
- super(LinuxApplication, self).stop()
+ if self.state == ResourceState.STARTED:
+ super(LinuxApplication, self).stop()
def release(self):
self.info("Releasing resource")
self.stop()
if self.state == ResourceState.STOPPED:
+ self.info("Resource released")
+
super(LinuxApplication, self).release()
@property
command = self.get("command")
env = self.get("env")
- if command:
- self.info("Uploading command '%s'" % command)
-
- # We want to make sure the content is published
- # before the experiment starts.
- # Run the command as a bash script in the background,
- # in the host ( but wait until the command has
- # finished to continue )
- env = self.replace_paths(env)
- command = self.replace_paths(command)
-
- (out, err), proc = self.execute_command(command, env,
- blocking = True)
-
- if proc.poll():
- self.fail()
- msg = "Failed to execute command"
- self.error(msg, out, err)
- raise RuntimeError, msg
+ self.info("Uploading command '%s'" % command)
+
+ # We want to make sure the content is published
+ # before the experiment starts.
+ # Run the command as a bash script in the background,
+ # in the host ( but wait until the command has
+ # finished to continue )
+ env = self.replace_paths(env)
+ command = self.replace_paths(command)
+
+ (out, err), proc = self.execute_command(command, env,
+ blocking = True)
+
+ if proc.poll():
+ self.fail()
+ msg = "Failed to execute command"
+ self.error(msg, out, err)
+ raise RuntimeError, msg
def start(self):
if self._state == ResourceState.READY:
"Sets the CCNS_SYNC_SCOPE environmental variable. ",
flags = Flags.ExecReadOnly)
+ repo_file = Attribute("repoFile1",
+ "The Repository uses $CCNR_DIRECTORY/repoFile1 for "
+ "persistent storage of CCN Content Objects",
+ flags = Flags.ExecReadOnly)
+
cls._register_attribute(max_fanout)
cls._register_attribute(max_leaf_entries)
cls._register_attribute(max_node_bytes)
cls._register_attribute(ccns_root_advise_lifetime)
cls._register_attribute(ccns_stable_enabled)
cls._register_attribute(ccns_sync_scope)
+ cls._register_attribute(repo_file)
@classmethod
def _register_traces(cls):
command = self.get("command")
env = self.get("env")
- if command:
- # We want to make sure the repository is running
- # before the experiment starts.
- # Run the command as a bash script in background,
- # in the host ( but wait until the command has
- # finished to continue )
- env = self.replace_paths(env)
- command = self.replace_paths(command)
-
- shfile = os.path.join(self.app_home, "start.sh")
- self.node.run_and_wait(command, self.run_home,
- shfile = shfile,
- overwrite = False,
- env = env,
- raise_on_error = True)
-
+ if self.get("repoFile1"):
+ # upload repoFile1
+ local_file = self.get("repoFile1")
+ remote_file = "${RUN_HOME}/repoFile1"
+ remote_file = self.replace_paths(remote_file)
+ self.node.upload(local_file,
+ remote_file,
+ overwrite = False)
+
+ # We want to make sure the repository is running
+ # before the experiment starts.
+ # Run the command as a bash script in background,
+ # in the host ( but wait until the command has
+ # finished to continue )
+ env = self.replace_paths(env)
+ command = self.replace_paths(command)
+
+ shfile = os.path.join(self.app_home, "start.sh")
+ self.node.run_and_wait(command, self.run_home,
+ shfile = shfile,
+ overwrite = False,
+ env = env,
+ raise_on_error = True)
+
def start(self):
if self._state == ResourceState.READY:
command = self.get("command")
command = self.get("command")
env = self.get("env")
- if command:
- # We want to make sure the FIB entries are created
- # before the experiment starts.
- # Run the command as a bash script in the background,
- # in the host ( but wait until the command has
- # finished to continue )
- env = env and self.replace_paths(env)
- command = self.replace_paths(command)
+ # We want to make sure the FIB entries are created
+ # before the experiment starts.
+ # Run the command as a bash script in the background,
+ # in the host ( but wait until the command has
+ # finished to continue )
+ env = env and self.replace_paths(env)
+ command = self.replace_paths(command)
- (out, err), proc = self.execute_command(command, env)
+ (out, err), proc = self.execute_command(command, env)
- if proc.poll():
- self._state = ResourceState.FAILED
- msg = "Failed to execute command"
- self.error(msg, out, err)
- raise RuntimeError, msg
+ if proc.poll():
+ self._state = ResourceState.FAILED
+ msg = "Failed to execute command"
+ self.error(msg, out, err)
+ raise RuntimeError, msg
def configure(self):
if self.trace_enabled("ping"):
self.ec.set(self._ping, "printTimestamp", True)
self.ec.set(self._ping, "target", self.get("host"))
self.ec.register_connection(self._ping, self.node.guid)
- # force waiting until ping is READY before we starting the FIB
- self.ec.register_condition(self.guid, ResourceAction.START,
- self._ping, ResourceState.READY)
# schedule ping deploy
- self.ec.deploy(group=[self._ping])
+ self.ec.deploy(guids=[self._ping], group = self.deployment_group)
if self.trace_enabled("mtr"):
self.info("Configuring MTR trace")
self.ec.set(self._mtr, "continuous", True)
self.ec.set(self._mtr, "target", self.get("host"))
self.ec.register_connection(self._mtr, self.node.guid)
- # force waiting until mtr is READY before we starting the FIB
- self.ec.register_condition(self.guid, ResourceAction.START,
- self._mtr, ResourceState.READY)
# schedule mtr deploy
- self.ec.deploy(group=[self._mtr])
+ self.ec.deploy(guids=[self._mtr], group = self.deployment_group)
if self.trace_enabled("traceroute"):
self.info("Configuring TRACEROUTE trace")
self.ec.set(self._traceroute, "continuous", True)
self.ec.set(self._traceroute, "target", self.get("host"))
self.ec.register_connection(self._traceroute, self.node.guid)
- # force waiting until mtr is READY before we starting the FIB
- self.ec.register_condition(self.guid, ResourceAction.START,
- self._traceroute, ResourceState.READY)
# schedule mtr deploy
- self.ec.deploy(group=[self._traceroute])
+ self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
def start(self):
if self._state in [ResourceState.READY, ResourceState.STARTED]:
args.append("traceroute")
args.append(self.get("target"))
if self.get("continuous") == True:
- args.append("; sleep 5 ; done ")
+ args.append("; sleep 2 ; done ")
command = " ".join(args)