pycram.worlds.multiverse_communication.clients

Contents

pycram.worlds.multiverse_communication.clients#

Classes#

Module Contents#

class pycram.worlds.multiverse_communication.clients.MultiverseClient(name: str, port: int, is_prospection_world: bool = False, simulation_wait_time_factor: float = 1.0, **kwargs)#

Bases: pycram.worlds.multiverse_communication.socket.MultiverseSocket

is_prospection_world = False#
simulation_wait_time_factor = 1.0#
class pycram.worlds.multiverse_communication.clients.MultiverseReader(name: str, port: int, is_prospection_world: bool = False, simulation_wait_time_factor: float = 1.0, **kwargs)#

Bases: MultiverseClient

MAX_WAIT_TIME_FOR_DATA: datetime.timedelta#

The maximum wait time for the data in seconds.

data_lock#
thread#
stop_thread = False#
get_body_pose(name: str, wait: bool = False) typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.List[float]]]#

Get the body pose from the multiverse server.

Parameters:
  • name – The name of the body.

  • wait – Whether to wait for the data.

Returns:

The position and orientation of the body.

get_multiple_body_poses(body_names: typing_extensions.List[str], wait: bool = False) typing_extensions.Optional[typing_extensions.Dict[str, pycram.datastructures.pose.PoseStamped]]#

Get the body poses from the multiverse server for multiple bodies.

Parameters:
  • body_names – The names of the bodies.

  • wait – Whether to wait for the data.

Returns:

The positions and orientations of the bodies as a dictionary.

get_body_position(name: str, wait: bool = False) typing_extensions.Optional[typing_extensions.List[float]]#

Get the body position from the multiverse server.

Parameters:
  • name – The name of the body.

  • wait – Whether to wait for the data.

Returns:

The position of the body.

get_multiple_body_positions(body_names: typing_extensions.List[str], wait: bool = False) typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.List[float]]]#

Get the body positions from the multiverse server for multiple bodies.

Parameters:
  • body_names – The names of the bodies.

  • wait – Whether to wait for the data.

Returns:

The positions of the bodies as a dictionary.

get_body_orientation(name: str, wait: bool = False) typing_extensions.Optional[typing_extensions.List[float]]#

Get the body orientation from the multiverse server.

Parameters:
  • name – The name of the body.

  • wait – Whether to wait for the data.

Returns:

The orientation of the body.

get_multiple_body_orientations(body_names: typing_extensions.List[str], wait: bool = False) typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.List[float]]]#

Get the body orientations from the multiverse server for multiple bodies.

Parameters:
  • body_names – The names of the bodies.

  • wait – Whether to wait for the data.

Returns:

The orientations of the bodies as a dictionary.

get_body_property(name: str, property_: pycram.datastructures.enums.MultiverseProperty, wait: bool = False) typing_extensions.Optional[typing_extensions.List[float]]#

Get the body property from the multiverse server.

Parameters:
  • name – The name of the body.

  • property – The property of the body as a Property.

  • wait – Whether to wait for the data.

Returns:

The property of the body.

get_multiple_body_properties(body_names: typing_extensions.List[str], properties: typing_extensions.List[pycram.datastructures.enums.MultiverseProperty], wait: bool = False) typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.Dict[str, typing_extensions.List[float]]]]#

Get the body properties from the multiverse server for multiple bodies.

Parameters:
  • body_names – The names of the bodies.

  • properties – The properties of the bodies.

  • wait – Whether to wait for the data.

Returns:

The properties of the bodies as a dictionary.

get_body_data(name: str, properties: typing_extensions.Optional[typing_extensions.List[pycram.datastructures.enums.MultiverseProperty]] = None, wait: bool = False) typing_extensions.Optional[typing_extensions.Dict]#

Get the body data from the multiverse server.

Parameters:
  • name – The name of the body.

  • properties – The properties of the body.

  • wait – Whether to wait for the data.

Returns:

The body data as a dictionary.

get_multiple_body_data(body_names: typing_extensions.List[str], properties: typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.List[pycram.datastructures.enums.MultiverseProperty]]] = None, wait: bool = False) typing_extensions.Optional[typing_extensions.Dict]#

Get the body data from the multiverse server for multiple bodies.

Parameters:
  • body_names – The names of the bodies.

  • properties – The properties of the bodies.

  • wait – Whether to wait for the data.

Returns:

The body data as a dictionary.

wait_for_body_data(name: str, properties: typing_extensions.Optional[typing_extensions.List[pycram.datastructures.enums.MultiverseProperty]] = None) typing_extensions.Dict#

Wait for the body data from the multiverse server.

Parameters:
  • name – The name of the body.

  • properties – The properties of the body.

Returns:

The body data as a dictionary.

wait_for_multiple_body_data(body_names: typing_extensions.List[str], properties: typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.List[pycram.datastructures.enums.MultiverseProperty]]] = None) typing_extensions.Dict#

Wait for the body data from the multiverse server for multiple bodies.

Parameters:
  • body_names – The names of the bodies.

  • properties – The properties of the bodies.

Returns:

The body data as a dictionary.

_wait_for_body_data_template(body_names: typing_extensions.Union[str, typing_extensions.List[str]], check_func: typing_extensions.Callable[[typing_extensions.Union[str, typing_extensions.List[str]], typing_extensions.Dict, typing_extensions.Union[typing_extensions.Dict, typing_extensions.List]], bool], properties: typing_extensions.Optional[typing_extensions.Union[typing_extensions.Dict, typing_extensions.List]] = None) typing_extensions.Dict#

Wait for the body data from the multiverse server for multiple bodies.

Parameters:
  • body_names – The names of the bodies.

  • properties – The properties of the bodies.

  • check_func – The function to check if the data is received.

Returns:

The body data as a dictionary.

check_multiple_body_data(body_names: typing_extensions.List[str], data: typing_extensions.Dict, properties: typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.List[pycram.datastructures.enums.MultiverseProperty]]] = None) bool#

Check if the body data is received from the multiverse server for multiple bodies.

Parameters:
  • body_names – The names of the bodies.

  • data – The data received from the multiverse server.

  • properties – The properties of the bodies.

Returns:

Whether the body data is received.

static check_for_body_data(name: str, data: typing_extensions.Dict, properties: typing_extensions.Optional[typing_extensions.List[pycram.datastructures.enums.MultiverseProperty]] = None) bool#

Check if the body data is received from the multiverse server.

Parameters:
  • name – The name of the body.

  • data – The data received from the multiverse server.

  • properties – The properties of the body.

Returns:

Whether the body data is received.

get_received_data()#

Get the latest received data from the multiverse server.

receive_all_data_from_server()#

Get all data from the multiverse server.

join()#
class pycram.worlds.multiverse_communication.clients.MultiverseWriter(name: str, port: int, simulation: typing_extensions.Optional[str] = None, is_prospection_world: bool = False, simulation_wait_time_factor: float = 1.0, **kwargs)#

Bases: MultiverseClient

simulation = None#
lock#
spawn_robot_with_actuators(robot_name: str, actuator_joint_commands: typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.List[str]]] = None) None#

Spawn the robot with controlled actuators in the simulation.

Parameters:
  • robot_name – The name of the robot.

  • actuator_joint_commands – A dictionary mapping actuator names to joint command names.

_reset_request_meta_data(set_simulation_name: bool = True)#

Reset the request metadata.

Parameters:

set_simulation_name – Whether to set the simulation name to the value of self.simulation_name.

set_body_pose(body_name: str, position: typing_extensions.List[float], orientation: typing_extensions.List[float]) None#

Set the body pose in the simulation.

Parameters:
  • body_name – The name of the body.

  • position – The position of the body.

  • orientation – The orientation of the body.

set_multiple_body_poses(body_data: typing_extensions.Dict[str, typing_extensions.Dict[pycram.datastructures.enums.MultiverseBodyProperty, typing_extensions.List[float]]]) None#

Set the body poses in the simulation for multiple bodies.

Parameters:

body_data – The data to be sent for multiple bodies.

set_body_position(body_name: str, position: typing_extensions.List[float]) None#

Set the body position in the simulation.

Parameters:
  • body_name – The name of the body.

  • position – The position of the body.

set_body_orientation(body_name: str, orientation: typing_extensions.List[float]) None#

Set the body orientation in the simulation.

Parameters:
  • body_name – The name of the body.

  • orientation – The orientation of the body.

set_body_property(body_name: str, property_: pycram.datastructures.enums.MultiverseProperty, value: typing_extensions.List[float]) None#

Set the body property in the simulation.

Parameters:
  • body_name – The name of the body.

  • property – The property of the body.

  • value – The value of the property.

remove_body(body_name: str) None#

Remove the body from the simulation.

Parameters:

body_name – The name of the body.

reset_world() None#

Reset the world in the simulation.

send_body_data_to_server(body_name: str, body_data: typing_extensions.Dict[pycram.datastructures.enums.MultiverseProperty, typing_extensions.List[float]]) typing_extensions.Dict#

Send data to the multiverse server.

Parameters:
  • body_name – The name of the body.

  • body_data – The data to be sent.

Returns:

The response from the server.

send_multiple_body_data_to_server(body_data: typing_extensions.Dict[str, typing_extensions.Dict[pycram.datastructures.enums.MultiverseProperty, typing_extensions.List[float]]]) typing_extensions.Dict#

Send data to the multiverse server for multiple bodies.

Parameters:

body_data – The data to be sent for multiple bodies.

Returns:

The response from the server.

send_meta_data_and_get_response(send_meta_data: typing_extensions.Dict) typing_extensions.Dict#

Send metadata to the multiverse server and get the response.

Parameters:

send_meta_data – The metadata to be sent.

Returns:

The response from the server.

send_data_to_server(data: typing_extensions.List, send_meta_data: typing_extensions.Optional[typing_extensions.Dict] = None, receive_meta_data: typing_extensions.Optional[typing_extensions.Dict] = None, set_simulation_name: bool = True) typing_extensions.Dict#

Send data to the multiverse server.

Parameters:
  • data – The data to be sent.

  • send_meta_data – The metadata to be sent.

  • receive_meta_data – The metadata to be received.

  • set_simulation_name – Whether to set the simulation name to the value of self.simulation.

Returns:

The response from the server.

class pycram.worlds.multiverse_communication.clients.MultiverseController(name: str, port: int, is_prospection_world: bool = False, **kwargs)#

Bases: MultiverseWriter

init_controller(actuator_joint_commands: typing_extensions.Dict[str, typing_extensions.List[str]]) None#

Initialize the controller by sending the controller data to the multiverse server.

Parameters:

actuator_joint_commands – A dictionary mapping actuator names to joint command names.

class pycram.worlds.multiverse_communication.clients.MultiverseAPI(name: str, port: int, simulation: str, is_prospection_world: bool = False, simulation_wait_time_factor: float = 1.0)#

Bases: MultiverseClient

API_REQUEST_WAIT_TIME: datetime.timedelta#

The wait time for the API request in seconds.

APIs_THAT_NEED_WAIT_TIME: typing_extensions.List[pycram.datastructures.enums.MultiverseAPIName]#
simulation#
wait: bool = False#
get_body_bounding_box(body_name: str, with_children: bool = False) typing_extensions.Union[pycram.datastructures.dataclasses.AxisAlignedBoundingBox, typing_extensions.List[pycram.datastructures.dataclasses.AxisAlignedBoundingBox]]#

Get the body bounding box from the multiverse server, they are with respect to the body’s frame.

_get_bounding_box(body_name: str, with_children: bool = False) typing_extensions.List[typing_extensions.List[float]]#

Get the body bounding box from the multiverse server.

save(save_name: str, save_directory: typing_extensions.Optional[str] = None) str#

Save the current state of the simulation.

Parameters:
  • save_name – The name of the save.

  • save_directory – The path to save the simulation, can be relative or absolute. If the path is relative, it will be saved in the saved folder in multiverse.

Returns:

The save path.

load(save_name: str, save_directory: typing_extensions.Optional[str] = None) None#

Load the saved state of the simulation.

Parameters:
  • save_name – The name of the save.

  • save_directory – The path to load the simulation, can be relative or absolute. If the path is relative, it will be loaded from the saved folder in multiverse.

static get_save_path(save_name: str, save_directory: typing_extensions.Optional[str] = None) str#

Get the save path.

Parameters:
  • save_name – The save name.

  • save_directory – The save directory.

Returns:

The save path.

attach(constraint: pycram.world_concepts.constraints.Constraint) None#

Request to attach the child link to the parent link.

Parameters:

constraint – The constraint.

_attach(child_link_name: str, parent_link_name: str, attachment_pose: str) None#

Attach the child link to the parent link.

Parameters:
  • child_link_name – The name of the child link.

  • parent_link_name – The name of the parent link.

  • attachment_pose – The attachment pose.

Get the link names of the constraint.

Parameters:

constraint – The constraint.

Returns:

The link names of the constraint.

Get the parent link name of the constraint.

Parameters:

constraint – The constraint.

Returns:

The parent link name of the constraint.

Get the child link name of the constraint.

Parameters:

constraint – The constraint.

Returns:

The child link name of the constraint.

Get the link name from link object, if the link belongs to a one link object, return the object name.

Parameters:

link – The link.

Returns:

The link name.

detach(constraint: pycram.world_concepts.constraints.Constraint) None#

Request to detach the child link from the parent link.

Parameters:

constraint – The constraint.

_detach(child_link_name: str, parent_link_name: str) None#

Detach the child link from the parent link.

Parameters:
  • child_link_name – The name of the child link.

  • parent_link_name – The name of the parent link.

_get_attachment_pose_as_string(constraint: pycram.world_concepts.constraints.Constraint) str#

Get the attachment pose as a string.

Parameters:

constraint – The constraint.

Returns:

The attachment pose as a string.

static _pose_to_string(pose: pycram.datastructures.pose.PoseStamped) str#

Convert the pose to a string.

Parameters:

pose – The pose.

Returns:

The pose as a string.

check_object_exists(obj: pycram.world_concepts.world_object.Object) bool#

Check if the object exists in the simulation.

Parameters:

obj – The object.

Returns:

Whether the object exists in the simulation.

get_resultant_force_and_torque_on_object(obj: pycram.world_concepts.world_object.Object) typing_extensions.Tuple[typing_extensions.List[float], typing_extensions.List[float]]#

Get the resultant force and torque on the object.

Parameters:

obj – The object.

Returns:

The resultant force and torque on the object.

get_contact_points_between_bodies(body_1_name: str, body_2_name: str) typing_extensions.List[pycram.datastructures.dataclasses.MultiverseContactPoint]#

Request the contact points between two bodies.

Parameters:
  • body_1_name – The name of the first body.

  • body_2_name – The name of the second body.

Returns:

The contact points between the bodies as a list of MultiverseContactPoint.

get_objects_intersected_with_rays(from_positions: typing_extensions.List[typing_extensions.List[float]], to_positions: typing_extensions.List[typing_extensions.List[float]]) typing_extensions.List[pycram.datastructures.dataclasses.MultiverseRayResult]#

Get the rays intersections with the objects from the from_positions to the to_positions.

Parameters:
  • from_positions – The starting positions of the rays.

  • to_positions – The ending positions of the rays.

Returns:

The rays intersections with the objects as a list of MultiverseRayResult.

_get_rays(from_positions: typing_extensions.List[typing_extensions.List[float]], to_positions: typing_extensions.List[typing_extensions.List[float]]) typing_extensions.List[str]#

Get the rays intersections with the objects from the from_positions to the to_positions.

Parameters:
  • from_positions – The starting positions of the rays.

  • to_positions – The ending positions of the rays.

Returns:

The rays intersections with the objects as a dictionary.

static _parse_get_rays_response(response: typing_extensions.List[str]) typing_extensions.List[pycram.datastructures.dataclasses.MultiverseRayResult]#

Parse the response of the get rays API.

Parameters:

response – The response of the get rays API as a list of strings.

Returns:

The rays as a list of lists of floats.

static list_of_positions_to_string(positions: typing_extensions.List[typing_extensions.List[float]]) str#

Convert the list of positions to a string.

Parameters:

positions – The list of positions.

Returns:

The list of positions as a string.

static _parse_constraint_effort(contact_effort: typing_extensions.List[str]) typing_extensions.Tuple[typing_extensions.List[float], typing_extensions.List[float]]#

Parse the contact effort of an object.

Parameters:

contact_effort – The contact effort of the object as a list of strings.

Returns:

The contact effort of the object as a list of floats.

static _parse_contact_points(body_1, body_2, contact_points: typing_extensions.List[str]) typing_extensions.List[pycram.datastructures.dataclasses.MultiverseContactPoint]#

Parse the contact points of an object.

Parameters:
  • body_1 – The name of the first body.

  • body_2 – The name of the second body.

  • contact_points – The contact points of the object as a list of strings.

Returns:

The contact positions, and normal vectors as a list of MultiverseContactPoint.

get_contact_points(body_name: str) typing_extensions.List[pycram.datastructures.dataclasses.MultiverseContactPoint]#

Get the contact points of a body from the multiverse server.

Parameters:

body_name – The name of the body.

pause_simulation() None#

Pause the simulation.

unpause_simulation() None#

Unpause the simulation.

_request_single_api_callback(api_name: pycram.datastructures.enums.MultiverseAPIName, *params) typing_extensions.List[str]#

Request a single API callback from the server.

Parameters:

api_data – The API data to request the callback.

Returns:

The API response as a list of strings.

_request_apis_callbacks(api_data: typing_extensions.Dict[pycram.datastructures.enums.MultiverseAPIName, typing_extensions.List]) typing_extensions.Dict[pycram.datastructures.enums.MultiverseAPIName, typing_extensions.List[str]]#

Request the API callbacks from the server.

Parameters:

api_data – The API data to add to the request metadata.

Returns:

The API response as a list of strings.

static validate_apis_response(api_data: typing_extensions.Dict[pycram.datastructures.enums.MultiverseAPIName, typing_extensions.List], responses: typing_extensions.Dict[pycram.datastructures.enums.MultiverseAPIName, typing_extensions.List[str]])#

Validate the responses from the multiverse server and raise error if an api request failed.

Parameters:
  • api_data – The data of the api request which has the api name and the arguments.

  • responses – The responses of the given api requests.

Raises:

FailedAPIResponse – when one of the responses reports that the request failed.

_get_all_apis_responses() typing_extensions.Dict[pycram.datastructures.enums.MultiverseAPIName, typing_extensions.List[str]]#

Get all the API responses from the server.

Returns:

The API responses as a list of APIData.

_add_api_request(api_name: str, *params)#

Add an API request to the request metadata.

Parameters:
  • api_name – The name of the API.

  • params – The parameters of the API.

_send_api_request()#

Send the API request to the server.

_reset_api_callback()#

Initialize the API callback in the request metadata.