pycram.language#

Classes#

LanguageMixin

Parent class for language expressions. Implements the operators as well as methods to reduce the resulting language

LanguagePlan

Base class for language plans

SequentialPlan

Creates a plan which executes its children in sequential order

ParallelPlan

Creates a plan which executes all children in parallel in seperate threads

TryInOrderPlan

Creates a plan that executes all children in sequential order but does not stop if one of them throws an error

TryAllPLan

Creates a plan which executes all children in parallel but does not abort if one throws an error

RepeatPlan

A plan which repeats all children a number of times

MonitorPlan

A plan which monitors a condition and upon the condition becoming true interrupts all children

CodePlan

A Plan that contains a function to be executed. Mainly intended for debugging purposes

LanguageNode

SequentialNode

ParallelNode

Executes all children in parallel by creating a thread per children and executing them in the respective thread. All

RepeatNode

MonitorNode

Monitors a Language Expression and interrupts it when the given condition is evaluated to True.

TryInOrderNode

Executes all children sequentially, an exception while executing a child does not terminate the whole process.

TryAllNode

Executes all children in parallel by creating a thread per children and executing them in the respective thread. All

CodeNode

Executable code block in a plan.

Module Contents#

class pycram.language.LanguageMixin#

Parent class for language expressions. Implements the operators as well as methods to reduce the resulting language tree.

add_language_plan(child_plans: typing_extensions.Iterable[pycram.plan.Plan], root_node: LanguageNode) pycram.plan.Plan#
__add__(other: pycram.plan.Plan) pycram.plan.Plan#

Language expression for sequential execution.

Parameters:

other – Another Language expression, either a designator_description or language expression

Returns:

A Sequential() object which is the new root node of the language tree

__sub__(other: pycram.plan.Plan) pycram.plan.Plan#

Language expression for try in order.

Parameters:

other – Another Language expression, either a designator_description or language expression

Returns:

A TryInOrder() object which is the new root node of the language tree

__or__(other: pycram.plan.Plan) pycram.plan.Plan#

Language expression for parallel execution.

Parameters:

other – Another Language expression, either a designator_description or language expression

Returns:

A Parallel() object which is the new root node of the language tree

__xor__(other: pycram.plan.Plan) pycram.plan.Plan#

Language expression for try all execution.

Parameters:

other – Another Language expression, either a designator_description or language expression

Returns:

A TryAll() object which is the new root node of the language tree

__rshift__(other: typing_extensions.Callable)#

Operator for Monitors, this always makes the Monitor the parent of the other expression.

Parameters:

other – Another Language expression

Returns:

The Monitor which is now the new root node.

__mul__(other: int)#

Language expression for Repeated execution. The other attribute of this operator has to be an integer.

Parameters:

other – An integer which states how often the Language expression should be repeated

Returns:

A Repeat() object which is the new root node of the language tree

__rmul__(other: int)#

Language expression for Repeated execution. The other attribute of this operator has to be an integer. This is the reversed operator of __mul__ which allows to write:

2 * ParkAction()
Parameters:

other – An integer which states how often the Language expression should be repeated

Returns:

A Repeat() object which is the new root node of the language tree

abstract interrupt() None#

Base method for interrupting the execution of Language expression. To be overwritten in a sub-class.

class pycram.language.LanguagePlan(root: LanguageNode, *children: pycram.plan.Plan)#

Bases: pycram.plan.Plan

Base class for language plans

simplify_language_nodes()#

Traverses the plan and merges LanguageNodes of the same type

class pycram.language.SequentialPlan(*children: pycram.plan.Plan)#

Bases: LanguagePlan

Creates a plan which executes its children in sequential order

class pycram.language.ParallelPlan(*children: pycram.plan.Plan, root: LanguageNode = None)#

Bases: LanguagePlan

Creates a plan which executes all children in parallel in seperate threads

parallel_blocklist = ['PickUpAction', 'PlaceAction', 'OpenAction', 'CloseAction', 'TransportAction', 'GraspingAction']#

A list of Actions which can’t be part of a Parallel plan

class pycram.language.TryInOrderPlan(*children: pycram.plan.Plan)#

Bases: LanguagePlan

Creates a plan that executes all children in sequential order but does not stop if one of them throws an error

class pycram.language.TryAllPLan(*children: pycram.plan.Plan)#

Bases: ParallelPlan

Creates a plan which executes all children in parallel but does not abort if one throws an error

class pycram.language.RepeatPlan(repeat=1, *children: pycram.plan.Plan)#

Bases: LanguagePlan

A plan which repeats all children a number of times

class pycram.language.MonitorPlan(condition, *children: pycram.plan.Plan)#

Bases: LanguagePlan

A plan which monitors a condition and upon the condition becoming true interrupts all children

class pycram.language.CodePlan(func: typing_extensions.Callable, kwargs: typing_extensions.Dict[str, typing_extensions.Any] = None)#

Bases: pycram.plan.Plan

A Plan that contains a function to be executed. Mainly intended for debugging purposes

class pycram.language.LanguageNode#

Bases: pycram.plan.PlanNode

action: typing_extensions.Type[LanguageNode]#

Superclass for language nodes in a plan. Used to distinguish language nodes from other types of nodes.

class pycram.language.SequentialNode#

Bases: LanguageNode

action: typing_extensions.Type[SequentialNode]#

Executes all children sequentially, an exception while executing a child does not terminate the whole process. Instead, the exception is saved to a list of all exceptions thrown during execution and returned.

Behaviour:

Returns a tuple containing the final state of execution (SUCCEEDED, FAILED) and a list of results from each child’s perform() method. The state is SUCCEEDED iff all children are executed without exception. In any other case the State FAILED will be returned.

perform()#

Behaviour of Sequential, calls perform() on each child sequentially

Returns:

The state and list of results according to the behaviour described in Sequential()

perform_sequential(nodes: typing_extensions.List[pycram.plan.PlanNode], raise_exceptions=True) typing_extensions.Any#

Behavior of the sequential node, performs all children in sequence and raises error if they occur.

Parameters:
  • nodes – A list of nodes which should be performed in sequence

  • raise_exceptions – If True (default) errors will be raised

__hash__()#
__repr__()#
class pycram.language.ParallelNode#

Bases: LanguageNode

Executes all children in parallel by creating a thread per children and executing them in the respective thread. All exceptions during execution will be caught, saved to a list and returned upon end.

Behaviour:

Returns a tuple containing the final state of execution (SUCCEEDED, FAILED) and a list of results from each child’s perform() method. The state is SUCCEEDED iff all children could be executed without an exception. In any other case the State FAILED will be returned.

perform()#

Behaviour of Parallel, creates a new thread for each child and calls perform() of the child in the respective thread.

Returns:

The state and list of results according to the behaviour described in Parallel()

perform_parallel(nodes: typing_extensions.List[pycram.plan.PlanNode])#

Behaviour of the parallel node performs the given nodes in parallel in different threads.

Parameters:

nodes – A list of nodes which should be performed in parallel

_lang_call(node: pycram.plan.PlanNode)#

Wrapper method which is executed in the thread. Wraps the given node in a try catch and performs it

Parameters:

node – The node which is to be performed

__hash__()#
__repr__()#
class pycram.language.RepeatNode#

Bases: SequentialNode

repeat: int = 1#

Executes all children a given number of times in sequential order.

perform()#

Behaviour of repeat, executes all children in a loop as often as stated on initialization.

Returns:

__hash__()#
class pycram.language.MonitorNode(condition: typing_extensions.Union[typing_extensions.Callable, pycram.fluent.Fluent] = None)#

Bases: SequentialNode

Monitors a Language Expression and interrupts it when the given condition is evaluated to True.

Behaviour:

This Monitor is attached to a language expression, when perform on this Monitor is called it will start a new thread which continuously checks if the condition is True. When the condition is True the interrupt function of the child will be called.

kill_event#
exception_queue#
perform()#

Behavior of the Monitor, starts a new Thread which checks the condition and then performs the attached language expression

Returns:

The state of the attached language expression, as well as a list of the results of the children

monitor()#
__hash__()#
class pycram.language.TryInOrderNode#

Bases: SequentialNode

Executes all children sequentially, an exception while executing a child does not terminate the whole process. Instead, the exception is saved to a list of all exceptions thrown during execution and returned.

Behaviour:

Returns a tuple containing the final state of execution (SUCCEEDED, FAILED) and a list of results from each child’s perform() method. The state is SUCCEEDED if one or more children are executed without exception. In the case that all children could not be executed the State FAILED will be returned.

perform()#

Behaviour of TryInOrder, calls perform() on each child sequentially and catches raised exceptions.

Returns:

The state and list of results according to the behaviour described in TryInOrder()

__hash__()#
class pycram.language.TryAllNode#

Bases: ParallelNode

Executes all children in parallel by creating a thread per children and executing them in the respective thread. All exceptions during execution will be caught, saved to a list and returned upon end.

Behaviour:

Returns a tuple containing the final state of execution (SUCCEEDED, FAILED) and a list of results from each child’s perform() method. The state is SUCCEEDED if one or more children could be executed without raising an exception. If all children fail the State FAILED will be returned.

perform()#

Behaviour of TryAll, creates a new thread for each child and executes all children in their respective threads.

Returns:

The state and list of results according to the behaviour described in TryAll()

__hash__()#
class pycram.language.CodeNode(function: typing_extensions.Optional[typing_extensions.Callable] = None, kwargs: typing_extensions.Optional[typing_extensions.Dict] = None)#

Bases: LanguageNode

Executable code block in a plan.

Variables:
  • function – The function (plan) that was called

  • kwargs – Dictionary holding the keyword arguments of the function

action: LanguageNode#

Superclass for language nodes in a plan. Used to distinguish language nodes from other types of nodes.

function: typing_extensions.Callable = None#
kwargs: typing_extensions.Dict[str, typing_extensions.Any] = None#
perform#
performable#
execute() typing_extensions.Any#

Execute the code with its arguments

Returns:

Anything that the function associated with this object will return.

resolve() typing_extensions.Self#
__hash__()#