pycram.plan

Contents

pycram.plan#

Classes#

Plan

Represents a plan structure, typically a tree, which can be changed at any point in time. Performing the plan will

PlanNode

DesignatorNode

ActionNode

A node in the plan representing an ActionDesignator description

ResolvedActionNode

A node representing a resolved ActionDesignator with fully specified parameters

MotionNode

A node in the plan representing a MotionDesignator

Functions#

managed_node(→ typing_extensions.Callable)

Decorator which manages the state of a node, including the start and end time, status and reason of failure as well

with_plan(→ typing_extensions.Callable)

Decorator which wrapps the decorated designator into a node, creates a new plan with the node as root and returns

Module Contents#

class pycram.plan.Plan(root: PlanNode)#

Bases: networkx.DiGraph

Represents a plan structure, typically a tree, which can be changed at any point in time. Performing the plan will traverse the plan structure in depth first order and perform each PlanNode

current_plan: Plan = None#
on_start_callback: typing_extensions.Dict[typing_extensions.Optional[typing_extensions.Type[pycram.designator.ActionDescription]], typing_extensions.List[typing_extensions.Callable]]#
on_end_callback: typing_extensions.Dict[typing_extensions.Optional[typing_extensions.Type[pycram.designator.ActionDescription]], typing_extensions.List[typing_extensions.Callable]]#
root: PlanNode#
current_node: PlanNode#
mount(other: Plan, mount_node: PlanNode = None)#

Mounts another plan to this plan. The other plan will be added as a child of the mount_node.

Parameters:
  • other – The plan to be mounted

  • mount_node – A node of this plan to which the other plan will be mounted. If None, the root of this plan will be used.

merge_nodes(node1: PlanNode, node2: PlanNode)#

Merges two nodes into one. The node2 will be removed and all its children will be added to node1.

Parameters:
  • node1 – Node which will remain in the plan

  • node2 – Node which will be removed from the plan

add_node(node_for_adding: PlanNode, **attr)#

Adds a node to the plan. The node will not be connected to any other node of the plan.

Parameters:
  • node_for_adding – Node to be added

  • attr – Additional attributes to be added to the node

add_edge(u_of_edge, v_of_edge, **attr)#

Adds an edge to the plan. If one or both nodes are not in the plan, they will be added to the plan.

Parameters:
  • u_of_edge – Origin node of the edge

  • v_of_edge – Target node of the edge

  • attr – Additional attributes to be added to the edge

add_edges_from(ebunch_to_add: typing_extensions.Iterable[typing_extensions.Tuple[PlanNode, PlanNode]], **attr)#

Adds edges to the plan from an iterable of tuples. If one or both nodes are not in the plan, they will be added to the plan.

Parameters:
  • ebunch_to_add – Iterable of tuples of nodes to be added

  • attr – Additional attributes to be added to the edges

add_nodes_from(nodes_for_adding: typing_extensions.Iterable[PlanNode], **attr)#

Adds nodes from an Iterable of nodes.

Parameters:
  • nodes_for_adding – The iterable of nodes

  • attr – Addotional attributes to be added

insert_below(insert_node: PlanNode, insert_below: PlanNode)#

Inserts a node below the given node.

Parameters:
  • insert_node – The node to be inserted

  • insert_below – A node of the plan below which the given node should be added

perform() typing_extensions.Any#

Performs the root node of this plan.

Returns:

The return value of the root node

resolve()#

Resolves the root node of this plan if it is a DesignatorNode

Returns:

The resolved designator

flattened_parameters()#

The atomic parameter of this plan, as dict with paths as keys and the atomic type as value

Returns:

A dict of the atomic types

re_perform()#
property actions: typing_extensions.List[ActionNode]#
plot()#

Plots the plan using matplotlib and networkx. The plan is plotted as a tree with the root node at the bottom and the children nodes above.

classmethod add_on_start_callback(callback: typing_extensions.Callable[[PlanNode], None], action_type: typing_extensions.Optional[typing_extensions.Type[pycram.designator.ActionDescription], typing_extensions.Type[PlanNode]] = None)#

Adds a callback to be called when an action of the given type is started.

Parameters:
  • callback – The callback to be called

  • action_type – The type of the action, if None, the callback will be called for all actions

classmethod add_on_end_callback(callback: typing_extensions.Callable[[PlanNode], None], action_type: typing_extensions.Optional[typing_extensions.Type[pycram.designator.ActionDescription], typing_extensions.Type[PlanNode]] = None)#

Adds a callback to be called when an action of the given type is ended.

Parameters:
  • callback – The callback to be called

  • action_type – The type of the action

classmethod remove_on_start_callback(callback: typing_extensions.Callable[[PlanNode], None], action_type: typing_extensions.Optional[typing_extensions.Type[pycram.designator.ActionDescription], typing_extensions.Type[PlanNode]] = None)#

Removes a callback to be called when an action of the given type is started.

Parameters:
  • callback – The callback to be removed

  • action_type – The type of the action

classmethod remove_on_end_callback(callback: typing_extensions.Callable[[PlanNode], None], action_type: typing_extensions.Optional[typing_extensions.Type[pycram.designator.ActionDescription], typing_extensions.Type[PlanNode]] = None)#

Removes a callback to be called when an action of the given type is ended.

Parameters:
  • callback – The callback to be removed

  • action_type – The type of the action

_create_pure_networkx_graph(attributes: typing_extensions.List[str]) networkx.DiGraph[int]#

Creates a pure networkx graph of this plan and adds the given attributes of nodes as networkx Node attributes.

Parameters:

attributes – A list of attributes from the nodes which should be contained in the returned graph

Returns:

A NetworkX graph from hash values of the PlanNodes

plot_bokeh(attributes: typing_extensions.List[str] = None)#

Plots the plan using bokeh and networkx. The plan is plotted as a tree with the root node at the bottom and PlanNode.action attributes as labels. The plot features a hover tool showing the attributes of the nodes when the mouse is over them. Shown attributes can be configured using the attributes parameter. The attributes have to be a subset of the PlanNode attributes.

Parameters:

attributes – A list of attributes from the nodes which should be shown in the hover tool.

_create_labels()#

Creates a label set for the plan visualization. Labels are the PlanNode.action attribute.

Returns:

A LabelSet object which can be added to a bokeh plot.

pycram.plan.managed_node(func: typing_extensions.Callable) typing_extensions.Callable#

Decorator which manages the state of a node, including the start and end time, status and reason of failure as well as the setting of the current node in the plan.

Parameters:

func – Reference to the perform function of the node

Returns:

The wrapped perform function

class pycram.plan.PlanNode#
status: pycram.datastructures.enums.TaskStatus#

The status of the node from the TaskStatus enum.

start_time: typing_extensions.Optional[datetime.datetime]#

The starting time of the function, optional

end_time: typing_extensions.Optional[datetime.datetime] = None#

The ending time of the function, optional

reason: typing_extensions.Optional[pycram.failures.PlanFailure] = None#

The reason of failure if the action failed.

plan: Plan = None#

Reference to the plan to which this node belongs

result: typing_extensions.Any = None#

Result from the execution of this node

property parent: PlanNode#

The parent node of this node, None if this is the root node

Returns:

The parent node

property children: typing_extensions.List[PlanNode]#

All children nodes of this node

Returns:

A list of child nodes

property recursive_children: typing_extensions.List[PlanNode]#

Recursively lists all children and their children.

Returns:

A list of all nodes below this node

property subtree: Plan#

Creates a new plan with this node as the new root

Returns:

A new plan

property all_parents: typing_extensions.List[PlanNode]#

Returns all nodes above this node until the root node. The order is from this node to the root node.

Returns:

A list of all nodes above this

property is_leaf: bool#

Returns True if this node is a leaf node

Returns:

True if this node is a leaf node

flattened_parameters()#

The atomic types pf this node as dict

Returns:

The flattened parameter

__hash__()#
perform(*args, **kwargs)#
interrupt()#

Interrupts the execution of this node and all nodes below

resume()#

Resumes the execution of this node and all nodes below

pause()#

Suspends the execution of this node and all nodes below.

class pycram.plan.DesignatorNode#

Bases: PlanNode

designator_ref: typing_extensions.Any = None#

Reference to the Designator in this node

action: typing_extensions.Optional[typing_extensions.Any] = None#

The action and that is performed or None if nothing was performed

kwargs: typing_extensions.Dict[str, typing_extensions.Any] = None#

kwargs of the action in this node

__hash__()#
__repr__(*args, **kwargs)#
flattened_parameters() typing_extensions.Dict[str, pycram.has_parameters.leaf_types]#

The atomic types of the parameters of this node as dict with paths as keys and the atomic type as value. This resolves the parameters to its type not the actual value.

Returns:

The atomic types of this action

flatten() typing_extensions.Dict[str, pycram.has_parameters.leaf_types]#

Flattens the parameters of this node to a dict with the parameter as key and the value as value.

Returns:

A dict of the flattened parameters

class pycram.plan.ActionNode#

Bases: DesignatorNode

A node in the plan representing an ActionDesignator description

action_iter: typing_extensions.Iterator[pycram.designator.ActionDescription] = None#

Iterator over the current evaluation state of the ActionDesignator Description

action: pycram.designator.ActionDescription = None#

The action and that is performed or None if nothing was performed

__hash__()#
perform()#

Performs this node by resolving the ActionDesignator description to the next resolution and then performing the result.

Returns:

Return value of the resolved action node

__repr__(*args, **kwargs)#
class pycram.plan.ResolvedActionNode#

Bases: DesignatorNode

A node representing a resolved ActionDesignator with fully specified parameters

designator_ref: pycram.designator.ActionDescription = None#

Reference to the Designator in this node

action: pycram.designator.ActionDescription = None#

The action and that is performed or None if nothing was performed

__hash__()#
perform()#

Performs this node by performing the resolved action designator in zit

Returns:

The return value of the resolved ActionDesignator

__repr__(*args, **kwargs)#
class pycram.plan.MotionNode#

Bases: DesignatorNode

A node in the plan representing a MotionDesignator

designator_ref: pycram.designator.BaseMotion = None#

Reference to the MotionDesignator

__hash__()#
perform()#

Performs this node by performing the respective MotionDesignator. Additionally, checks if one of the parents has the status INTERRUPTED and aborts the perform if that is the case.

Returns:

The return value of the Motion Designator

__repr__(*args, **kwargs)#
flatten()#

Flattens the parameters of this node to a dict with the parameter as key and the value as value.

Returns:

A dict of the flattened parameters

flattened_parameters()#

The atomic types of the parameters of this node as dict with paths as keys and the atomic type as value. This resolves the parameters to its type not the actual value.

Returns:

The atomic types of this action

pycram.plan.with_plan(func: typing_extensions.Callable) typing_extensions.Callable#

Decorator which wrapps the decorated designator into a node, creates a new plan with the node as root and returns the plan.

Parameters:

func – The decorator which should be inserted into a plan

Returns:

A plan with the designator as root node