Source code for internutopia.core.robot.controller
from abc import ABC, abstractmethod
from collections import OrderedDict
from typing import Any, Dict, List, Union
import numpy as np
from internutopia.core.config.robot import ControllerCfg, RobotCfg
from internutopia.core.robot.articulation_action import ArticulationAction
from internutopia.core.robot.articulation_subset import ArticulationSubset
from internutopia.core.robot.robot import BaseRobot
from internutopia.core.scene.scene import IScene
from internutopia.core.util import log
[docs]class BaseController(ABC):
"""Base class of controller."""
controllers = {}
def __init__(self, config: ControllerCfg, robot: BaseRobot, scene: IScene):
"""Initialize the controller.
Args:
config (ControllerCfg): controller configuration.
robot (BaseRobot): robot owning the controller.
scene (IScene): scene interface.
"""
self.sub_controllers = None
self.scene = scene
if config.name is None:
raise ValueError('must specify controller name.')
self._obs = {}
self._robot = robot
self.config = config
self.sub_controllers: List[BaseController]
self.obs_keys = []
[docs] @abstractmethod
def action_to_control(self, action: Union[np.ndarray, List]) -> ArticulationAction:
"""Convert input action (in 1d array format) to joint signals to apply.
Args:
action (Union[np.ndarray, List]): input control action.
Returns:
ArticulationAction: joint signals to apply
"""
raise NotImplementedError()
[docs] def get_obs(self) -> OrderedDict[str, Any]:
"""Get observation of controller.
Returns:
OrderedDict[str, Any]: observation key and value.
"""
return OrderedDict()
[docs] @classmethod
def register(cls, name: str):
"""
Register an controller class with the given name(decorator).
Args:
name(str): name of the controller
"""
def wrapper(controller_class):
"""
Register the controller class.
"""
cls.controllers[name] = controller_class
return controller_class
return wrapper
@property
def robot(self):
return self._robot
@robot.setter
def robot(self, value):
self._robot = value
[docs] def cleanup(self):
"""
Operations that need to be cleaned up before switching scenes (or resetting)
"""
pass
def _make_ordered(self, obs: Dict = None) -> OrderedDict:
if obs is None:
return OrderedDict()
if not self.obs_keys:
self.obs_keys = [i for i in obs.keys()]
return OrderedDict((key, obs[key]) for key in self.obs_keys)
[docs] def get_joint_subset(self) -> ArticulationSubset | None:
"""Get the joint subset controlled by the controller.
Returns:
ArticulationSubset: joint subset.
"""
if hasattr(self, 'joint_subset'):
return self.joint_subset
if not hasattr(self, 'sub_controllers'):
return None
if self.sub_controllers is None or len(self.sub_controllers) == 0:
return None
return self.sub_controllers[0].get_joint_subset()
[docs]def create_controllers(robot_cfg: RobotCfg, robot: BaseRobot, scene: IScene) -> OrderedDict[str, BaseController]:
"""Create all controllers of one robot.
Args:
robot_cfg (RobotCfg): config of the robot.
robot (BaseRobot): robot instance.
scene (Scene): scene from isaac sim.
Returns:
Dict[str, BaseController]: dict of controllers with controller name as key.
"""
controller_map = {}
if robot_cfg.controllers is None:
return OrderedDict(controller_map)
for controller_cfg in robot_cfg.controllers:
controller_name = controller_cfg.name
controller_cls = BaseController.controllers[controller_cfg.type]
controller_ins: BaseController = controller_cls(config=controller_cfg, robot=robot, scene=scene)
if controller_cfg.sub_controllers is not None:
inject_sub_controllers(
parent=controller_ins,
configs=controller_cfg.sub_controllers,
robot=robot,
scene=scene,
)
controller_map[controller_name] = controller_ins
log.debug(f'[create_controllers] {controller_name} loaded')
return OrderedDict(
(controller_cfg.name, controller_map[controller_cfg.name]) for controller_cfg in robot_cfg.controllers
)
[docs]def inject_sub_controllers(
parent: BaseController,
configs: List[ControllerCfg],
robot: BaseRobot,
scene: IScene,
):
"""Recursively create and inject sub-controllers into parent controller.
Args:
parent (BaseController): parent controller instance.
configs (List[ControllerParams]): user configs of sub-controllers.
robot (BaseRobot): robot instance.
scene (Scene): scene from isaac sim.
"""
if len(configs) == 0:
return
sub_controllers: List[BaseController] = []
for config in configs:
controller_cls = BaseController.controllers[config.type]
controller_ins = controller_cls(config=config, robot=robot, scene=scene)
if config.sub_controllers is not None:
inject_sub_controllers(controller_ins, configs=config.sub_controllers, robot=robot, scene=scene)
sub_controllers.append(controller_ins)
parent.sub_controllers = sub_controllers