Source code for internutopia.core.task.task

import traceback
from abc import ABC
from typing import Any, Dict, List, Union

from internutopia.core.config import TaskCfg
from internutopia.core.object import init_objects
from internutopia.core.robot.rigid_body import IRigidBody
from internutopia.core.robot.robot import BaseRobot, init_robots
from internutopia.core.scene.scene import IScene
from internutopia.core.task.metric import BaseMetric, create_metric
from internutopia.core.util import log
from internutopia.core.util.pose_mixin import PoseMixin


[docs]class BaseTask(ABC): """ wrap of omniverse isaac sim's base task * enable register for auto register task * contains scene/robots/objects. """ tasks = {} def __init__(self, config: TaskCfg, scene: IScene): self.name = None self.env_id = 0 self.env_offset = [] self.root_path = None self.scene_prim = None self.objects = None self.robots: Union[Dict[str, BaseRobot], None] = None self._scene = scene self.scene_rigid_bodies: Dict[str, IRigidBody] = {} self.config = config self.metrics: Dict[str, BaseMetric] = {} self.steps = 0 self.work = True self.loaded = False for metric_config in config.metrics: self.metrics[metric_config.name] = create_metric(metric_config, self.config) from internutopia.core.task.reward import BaseReward, create_reward # noqa self.reward: Union[BaseReward, None] = create_reward(config.reward, self) if config.reward is not None else None
[docs] def set_up_runtime(self, task_name, env_id, env_offset): """ Sets up info (task_name, env_id, env_offset) for this task. TODO: refactor and rename Args: task_name (str): The name of the task. env_id (int): The env ID of this task. env_offset (List[float]): The env offset for the task. """ self.env_id: int = env_id self.env_offset: List[float] = env_offset self.root_path = f'/World/env_{str(self.env_id)}' if env_id not in PoseMixin.env_offset_map: PoseMixin.env_offset_map[str(env_id)] = env_offset log.info(f'env {env_id} at {env_offset}') self.name = task_name for metric in self.metrics.values(): metric.set_up_runtime(task_name, env_id, env_offset)
[docs] def load(self): """ Loads the environment scene and initializes robots and objects. This method first checks if a scene asset path is defined in the task config. If so, it creates a scene using the provided path and specified parameters such as scaling and positioning. The scene is then populated with robots and objects based on the configurations stored within `self.config`. Raises: - Exceptions may be raised during file operations or USD scene creation, but specific exceptions are not documented here. Attributes Modified: - **self.robots**: A collection of initialized robots set up within the scene. - **self.objects**: A dictionary mapping object names to their respective initialized instances within the scene. - **self.loaded**: A boolean flag indicating whether the environment has been successfully loaded, set to `True` upon successful completion of this method. Logs: - Information about the initialized robots and objects is logged using the `log.info` method after successful setup. """ from pxr import Usd if self.config.scene_asset_path is not None: self._scene.load(self.config, self.env_id, self.env_offset) for prim in Usd.PrimRange.AllPrims(self._scene.scene_prim): if prim.GetAttribute('physics:rigidBodyEnabled'): log.debug(f'[BaseTask.load] found rigid body at path: {prim.GetPath()}') _rb = IRigidBody.create(prim_path=str(prim.GetPath()), name=str(prim.GetPath())) self.scene_rigid_bodies[str(prim.GetPath())] = _rb self.robots = init_robots(self.config, self._scene) self.objects = init_objects(self.config, self._scene) self.loaded = True
def clear_rigid_bodies(self): for rigid_body_name in self.scene_rigid_bodies.keys(): if self._scene.object_exists(rigid_body_name): self._scene.remove(target=rigid_body_name)
[docs] def save_info(self): """ Saves the robot information and rigidbody statuses. """ self.save_robot_info() self._save_rigidbody_statuses()
def _save_rigidbody_statuses(self): """ Saves the current status of all rigid bodies in the scene by querying their physics properties excluding those in the robot. Note: rigid prims within articulations aren't included since those RigidBody' physical status (transform, velocity, etc) can't be set individually. """ for rigid_body_name, rigid_body in self.scene_rigid_bodies.items(): if not self._scene.object_exists(rigid_body_name): log.error(f'[cache_info] {rigid_body_name} does not exist.') continue rigid_body.save_status() def _restore_rigidbody_statuses(self): """ Restores the statuses of all rigid bodies in the scene based on their stored status data excluding those in the robot. """ for rigid_body_name, rigid_body in self.scene_rigid_bodies.items(): if rigid_body.status is None or not self._scene.object_exists(rigid_body_name): continue rigid_body.restore_status()
[docs] def set_up_scene(self, scene: IScene) -> None: """ Adding assets to the stage as well as adding the encapsulated objects such as XFormPrim..etc to the task_objects happens here. Args: scene (Scene): [description] """ self._scene = scene if not self.loaded: self.load()
[docs] def get_observations(self) -> Dict[str, Any]: """ Returns current observations from the objects needed for the behavioral layer. Return: Dict[str, Any]: observation of robots in this task """ if not self.work: return {} obs = {} for robot_name, robot in self.robots.items(): try: _obs = robot.get_obs() if _obs: obs[robot_name] = _obs except Exception as e: log.error(self.name) log.error(e) traceback.print_exc() return {} return obs
[docs] def update_metrics(self): """ Updates all metrics stored within the instance. Scans through the dictionary of metrics kept by the current instance, invoking the 'update' method on each one. This facilitates the aggregation or recalculation of metric values as needed. Note: This method does not return any value; its purpose is to modify the state of the metric objects internally. """ for _, metric in self.metrics.items(): metric.update()
[docs] def calculate_metrics(self) -> dict: """ Calculates and aggregates the results of all metrics registered within the instance. This method iterates over the stored metrics, calling their respective `calc` methods to compute the metric values. The computed values are then compiled into a dictionary, where each key corresponds to the metrics' name, and each value is the result of the metric calculation. Returns: dict: A dictionary containing the calculated results of all metrics, with metric names as keys. Note: Ensure that all metrics added to the instance have a `calc` method implemented. Example Usage: ```python # Assuming `self.metrics` is populated with metric instances. results = calculate_metrics() print(results) # Output: {'metric1': 0.85, 'metric2': 0.92, 'metric3': 0.78} ``` """ metrics_res = {} for name, metric in self.metrics.items(): metrics_res[name] = metric.calc() return metrics_res
[docs] def is_done(self) -> bool: """ Returns True of the task is done. The result should be decided by the state of the task. """ raise NotImplementedError
[docs] def pre_step(self, time_step_index: int, simulation_time: float) -> None: """ Called before stepping the physics simulation. Args: time_step_index (int): [description] simulation_time (float): [description] """ self.steps += 1 return
[docs] def save_robot_info(self): """ Saves information of all robots in the task instance. """ for robot in self.robots.values(): robot.save_robot_info()
[docs] def restore_info(self): """ Restores the information and statuses of rigid bodies and robots. """ self._restore_rigidbody_statuses() for robot in self.robots.values(): robot.restore_robot_info()
[docs] def post_reset(self) -> None: """Calls while doing a .reset() on the world.""" self.steps = 0 for robot in self.robots.values(): robot.post_reset() # TODO: Verify whether RigidPrims' post_reset need to be called return
[docs] def cleanup(self) -> None: """ Used to clean up the resources loaded in the task. """ for obj in self.objects.values(): # Using try here because we want to ignore all exceptions try: self._scene.remove(obj.name) finally: log.info('[cleanup] objs cleaned.') for robot in self.robots.values(): # Using try here because we want to ignore all exceptions log.info(f'[cleanup] cleanup robot {robot.articulation.name}') try: robot.cleanup() self._scene.remove(robot.articulation.name, registry_only=True) finally: log.info('[cleanup] robots cleaned.')
[docs] @classmethod def register(cls, name: str): """ Register an task class with the given name(decorator). Args: name(str): name of the task """ def wrapper(task_class): """ Register the task class. """ cls.tasks[name] = task_class return task_class return wrapper
def create_task(config: TaskCfg, scene: IScene): task_cls: BaseTask = BaseTask.tasks[config.type](config, scene) return task_cls