Source code for internutopia.core.runner

import json
from typing import Dict, List, Optional, Tuple, Union

from internutopia.core.config import Config, DistributedConfig, TaskCfg
from internutopia.core.robot.rigid_body import IRigidBody

# Init
from internutopia.core.scene.scene import IScene
from internutopia.core.task.task import BaseTask, create_task
from internutopia.core.task_config_manager.base import BaseTaskConfigManager
from internutopia.core.util import extensions_utils, log
from internutopia.core.util.clear_task import clear_stage_by_prim_path


[docs]class SimulatorRunner: def __init__(self, config: Config, task_config_manager: BaseTaskConfigManager): self.config = config self.task_config_manager = task_config_manager self.env_num = self.config.env_num if isinstance(self.config, DistributedConfig): self.runner_id = self.config.distribution_config.runner_id extensions_utils.reload_extensions(self.config.distribution_config.extensions) self.setup_isaacsim() self.metrics_config = None self.metrics_save_path = self.config.metrics_save_path if self.metrics_save_path != 'console': try: with open(self.metrics_save_path, 'w'): pass except Exception as e: log.error(f'Can not create result file at {self.metrics_save_path}.') raise e self.create_world() self._scene = IScene.create() self._stage = self._world.stage # Map task_name -> env_id: self.task_name_to_env_id_map = {} self.env_id_to_task_name_map = {} # finished_tasks contains all the finished tasks in current tasks dict self.finished_tasks = set() self.render_interval = ( self.config.simulator.rendering_interval if self.config.simulator.rendering_interval is not None else 5 ) log.info(f'rendering interval: {self.render_interval}') self.render_trigger = 0 self.loop = False self._render = False @property def current_tasks(self) -> dict[str, BaseTask]: return self._world._current_tasks
[docs] def warm_up(self, steps: int = 10, render: bool = True, physics: bool = True): """ Warm up the simulation by running a specified number of steps. Args: steps (int): The number of warm-up steps to perform. Defaults to 10. render (bool): Whether to render the scene during warm-up. Defaults to True. physics (bool): Whether to enable physics during warm-up. Defaults to True. Raises: ValueError: If both `render` and `physics` are set to False, or if `steps` is less than or equal to 0. """ from omni.isaac.core.simulation_context import SimulationContext if not render and not physics: raise ValueError('both `render` and `physics` are set to False') if steps <= 0: raise ValueError('steps` is less than or equal to 0') if physics: for _ in range(steps): self._world.step(render=render) else: for _ in range(steps): SimulationContext.render(self._world)
[docs] def step( self, actions: Union[List[Dict], None] = None, render: bool = True ) -> Tuple[List[Dict], List[bool], List[float]]: """ Step function to advance the simulation environment by one time step. This method processes actions for active tasks, steps the simulation world, collects observations, updates metrics, and determines task terminations. It also handles rendering based on specified intervals. Args: actions (Union[List[Dict], None], optional): A dictionary mapping task names to another dictionary of robot names and their respective actions. If None, no actions are applied. Defaults to None. render (bool, optional): Flag indicating whether to render the simulation at this step. True triggers rendering if the render interval is met. Defaults to True. Returns: Tuple[Dict, Dict[str, bool], Dict[str, float]]: - obs (Dict): A dictionary containing observations for each task, further divided by robot names and their observation data. - terminated_status (Dict[str, bool]): A dictionary mapping task names to boolean values indicating whether the task has terminated. - reward (Dict[str, float]): A dictionary that would contain rewards for each task or robot; however, the actual return and computation of rewards is not shown in the provided code snippet. Raises: Exception: If an error occurs when applying an action to a robot, the exception is logged and re-raised, providing context about the task, robot, and current tasks state. Notes: - The `_world.step()` method advances the simulation, optionally rendering the environment based on the `render` flag and the render interval. - `get_obs()` is a method to collect observations from the simulation world, though its implementation details are not shown. - Metrics for each task are updated, and upon task completion, results are saved to a JSON file. This includes a flag 'normally_end' set to True, which seems to indicate normal termination of the task. - The function also manages a mechanism to prevent further action application and metric updates for tasks that have been marked as finished. """ """ ================ TODO: Key optimization interval ================= """ terminated_status = [] reward = [] for env_id, action_dict in enumerate(actions): # terminated tasks will no longer apply action if env_id not in self.env_id_to_task_name_map: continue task_name = self.env_id_to_task_name_map[env_id] if task_name in self.finished_tasks: continue if task_name not in self.current_tasks: continue task = self.current_tasks.get(task_name) for name, action in action_dict.items(): if name in task.robots: try: task.robots[name].apply_action(action) except Exception as e: log.error('task_name : %s', task_name) log.error('robot_name : %s', name) log.error('current_tasks : %s', [i for i in self.current_tasks.keys()]) raise e self.render_trigger += 1 self._render = render and self.render_trigger > self.render_interval if self.render_trigger > self.render_interval: self.render_trigger = 0 # Step self._world.step(render=self._render) # Get obs obs = self.get_obs() # update metrics for task in self.current_tasks.values(): if task.is_done(): self.finished_tasks.add(task.name) log.info(f'Task {task.name} finished.') metrics_results = task.calculate_metrics() if self.metrics_save_path == 'console': print(json.dumps(metrics_results, indent=4)) elif self.metrics_save_path == 'none' or self.metrics_save_path is None: pass else: with open(self.metrics_save_path, 'a') as f: f.write(json.dumps(metrics_results)) f.write('\n') # finished tasks will no longer update metrics if task.name not in self.finished_tasks: for metric in task.metrics.values(): metric.update(obs[self.task_name_to_env_id_map[task.name]]) # update terminated_status and rewards for env_id in range(self.env_num): # terminated tasks will no longer apply action if env_id not in self.env_id_to_task_name_map: terminated_status.append(True) reward.append(-1) continue task_name = self.env_id_to_task_name_map[env_id] if task_name in self.finished_tasks: terminated_status.append(True) reward.append(-1) else: terminated_status.append(False) r = self.current_tasks[task_name].reward reward.append(r.calc() if r is not None else -1) return obs, terminated_status, reward
[docs] def get_obs(self) -> List[Dict | None]: """ Get obs Returns: List[Dict]: obs from isaac sim. """ obs = {} for task_name, task in self.current_tasks.items(): obs[task_name] = task.get_observations() # Add render obs for task_name, task_obs in obs.items(): for robot_name, robot_obs in task_obs.items(): obs[task_name][robot_name]['render'] = self._render _obs = [] for env_id in range(self.env_num): if env_id in self.env_id_to_task_name_map: _obs.append(obs[self.env_id_to_task_name_map[env_id]]) else: _obs.append(None) return _obs
[docs] def stop(self): """ Stop all current operations and clean up the **World** """ self._world.reset() self._world.clear() self._world.stop()
def get_current_time_step_index(self) -> int: return self._world.current_time_step_index
[docs] def reset(self, env_ids: Optional[List[int]] = None) -> Tuple[List, List]: """ Resets the environment for the given environment IDs or initializes it if no IDs are provided. This method handles resetting the simulation context, generating new task configs, and finalizing tasks when necessary. It supports partial resets for specific environments and ensures proper handling of task transitions. Args: env_ids (Optional[List[int]]): A list of environment IDs to reset. If None, all environments are reset or initialized based on the current state. Returns: Tuple[List, List]: A tuple containing two lists. The first list contains observations for the reset environments, and the second list contains the new task configs. Raises: ValueError: If the provided `env_ids` are invalid or don't correspond to any existing tasks. RuntimeError: If the simulation context fails to reset or initialize properly. Notes: - If `env_ids` is None and there are current tasks, all environments are reset. - If `env_ids` is None and there are no current tasks, the simulation context is initialized. - Observations are collected only for environments that are reset or initialized. - Tasks corresponding to the reset environments are transitioned to new episodes. """ from omni.isaac.core.simulation_context import SimulationContext new_task_configs = [] if env_ids is None and self.current_tasks: # reset log.info('==================== reset all env ====================') env_ids = [i for i in range(self.env_num)] if env_ids is None: # init log.info('===================== init reset =====================') SimulationContext.reset(self._world, soft=False) new_task_configs = self._next_episodes() else: # switch to next episodes env_to_reset = [env_id for env_id in env_ids if env_id in self.env_id_to_task_name_map] if not env_to_reset: log.warning(f'Not reset empty envs: {env_ids}.') return [None for _ in env_ids], [None for _ in env_ids] tasks = [self.env_id_to_task_name_map[env_id] for env_id in env_to_reset] _task_configs = self._next_episodes(tasks) for env_id in env_ids: if env_id in self.env_id_to_task_name_map: new_task_configs.append(_task_configs.pop(0)) else: new_task_configs.append(None) [self.finished_tasks.discard(task) for task in tasks] all_obs = self.get_obs() obs = [all_obs[i] for i in env_ids] if env_ids else all_obs if not self.current_tasks: # finished self._finalize() return obs, new_task_configs
def _finalize(self): """ Finalize the tasks and do some post-processing. """ pass def world_clear(self): self._world.clear()
[docs] def clear_single_task(self, task_name: str): """ Clear single task with task_name Args: task_name (str): Task name to clear. """ from omni.isaac.core.loggers import DataLogger if task_name not in self.current_tasks: log.warning(f'Clear task {task_name} fail. The task {task_name} is not in current_tasks.') return old_task = self.current_tasks[task_name] old_task.cleanup() del self.current_tasks[task_name] self._world._task_scene_built = False self._world._data_logger = DataLogger() log.info(f'Clear stage: /World/env_{self.task_name_to_env_id_map[task_name]}') clear_stage_by_prim_path(f'/World/env_{self.task_name_to_env_id_map[task_name]}')
def _next_episodes(self, reset_tasks: List[str] = None) -> List[TaskCfg]: """ Switch tasks that need to be reset to the next episode. This method handles cleaning-up tasks, resetting sim backend, creating new tasks, and restoring states for non-reset environments. Args: reset_tasks (List[str]): a list of task names that need to be reset. If None, the method initializes all environments without resetting specific tasks. Returns: List[TaskCfg]: a list of TaskCfg. Raises: RuntimeError: If a task specified in `reset_tasks` isn't found in the current tasks. """ from isaacsim.core.simulation_manager import SimulationManager env_id_list = [] env_offset_list = [] task_name_list = [] task_configs_list = [] if reset_tasks: # recycling env_id for task_name in reset_tasks: if task_name not in self.current_tasks: raise RuntimeError(f'Task with task_name {task_name} not in `current_tasks`.') old_task = self.current_tasks[task_name] env_id_list.append(old_task.env_id) # save the state of envs that need to be kept for _task_name, task in self.current_tasks.items(): if _task_name in reset_tasks: continue task.save_info() # clean up tasks that need to reset for task_name in reset_tasks: self.clear_single_task(task_name) # clear all rigid bodies in scene register and physics backend for task in self.current_tasks.values(): task.clear_rigid_bodies() SimulationManager._on_stop('reset') else: # init SimulationManager._on_stop('reset') env_id_list = [None for _ in range(self.env_num)] # get next_task_configs for idx, env_id in enumerate(env_id_list): if not isinstance(self.config, DistributedConfig): next_task = self.task_config_manager.get_next(env_id) else: import ray next_task = ray.get(self.task_config_manager.get_next.remote(env_id, self.runner_id)) new_task_name, new_env_id, new_env_offset, task_cfg = next_task if task_cfg is None and env_id in self.env_id_to_task_name_map: del self.env_id_to_task_name_map[env_id] env_id_list[idx] = new_env_id env_offset_list.append(new_env_offset) task_name_list.append(new_task_name) task_configs_list.append(task_cfg) # create tasks with new task configs _new_tasks = [] _new_tasks_names = [] for idx, task_config in enumerate(task_configs_list): if task_config is None: continue task = create_task(task_config, self._scene) task.set_up_runtime(task_name_list[idx], env_id_list[idx], env_offset_list[idx]) self._world.add_task(task) _new_tasks.append(task) _new_tasks_names.append(task.name) task.set_up_scene(self._scene) # map task_name and env_id of new tasks self.task_name_to_env_id_map[task.name] = task.env_id self.env_id_to_task_name_map[task.env_id] = task.name # create sim_view SimulationManager._create_simulation_view('reset') # restore the state of envs that haven't been reset if reset_tasks: for t in self.current_tasks.values(): if t.name in _new_tasks_names: continue t.restore_info() self._scene.unwrap()._finalize(self._world.physics_sim_view) # noqa # post_reset for new tasks for task in _new_tasks: task.post_reset() # log new episodes log.info('===================== episodes ========================') for task in _new_tasks: log.info(f'Next episode: {task.name} at {str(task.env_id)}') log.info('======================================================') return task_configs_list def get_obj(self, name: str) -> IRigidBody: # Only supported in gym_env. # TODO: handle name by a more robust way and maybe support vec env return self._scene.get(name + '_0') def remove_collider(self, prim_path: str): from omni.physx.scripts import utils build = self._world.stage.GetPrimAtPath(prim_path) if build.IsValid(): utils.removeCollider(build) def add_collider(self, prim_path: str): from omni.physx.scripts import utils build = self._world.stage.GetPrimAtPath(prim_path) if build.IsValid(): utils.setCollider(build, approximationShape=None) def create_world(self): physics_dt = self.config.simulator.physics_dt rendering_dt = self.config.simulator.rendering_dt physics_dt = eval(physics_dt) if isinstance(physics_dt, str) else physics_dt self.dt = physics_dt rendering_dt = eval(rendering_dt) if isinstance(rendering_dt, str) else rendering_dt use_fabric = self.config.simulator.use_fabric log.info(f'simulator params: physics dt={physics_dt}, rendering dt={rendering_dt}, use_fabric={use_fabric}') from omni.isaac.core import World self._world: World = World( physics_dt=physics_dt, rendering_dt=rendering_dt, stage_units_in_meters=1.0, sim_params={'use_fabric': use_fabric}, ) def setup_isaacsim(self): # Init Isaac Sim from isaacsim import SimulationApp # noqa headless = self.config.simulator.headless native = self.config.simulator.native webrtc = self.config.simulator.webrtc self._simulation_app = SimulationApp( {'headless': headless, 'anti_aliasing': 0, 'hide_ui': False, 'multi_gpu': False} ) self._simulation_app._carb_settings.set('/physics/cooking/ujitsoCollisionCooking', False) log.debug('SimulationApp init done') # Determine streaming mode based on isaacsim version. try: from isaacsim import util # noqa except ImportError: # isaacsim 420 behavior self.setup_streaming_420(native, webrtc) else: # isaac 450 behavior if native: log.warning('native streaming is DEPRECATED, webrtc streaming is used instead') webrtc = native or webrtc self.setup_streaming_450(webrtc) def setup_streaming_420(self, native: bool, webrtc: bool): if webrtc: from omni.isaac.core.utils.extensions import enable_extension # noqa self._simulation_app.set_setting('/app/window/drawMouse', True) self._simulation_app.set_setting('/app/livestream/proto', 'ws') self._simulation_app.set_setting('/app/livestream/websocket/framerate_limit', 60) self._simulation_app.set_setting('/ngx/enabled', False) enable_extension('omni.services.streamclient.webrtc') elif native: from omni.isaac.core.utils.extensions import enable_extension # noqa self._simulation_app.set_setting('/app/window/drawMouse', True) self._simulation_app.set_setting('/app/livestream/proto', 'ws') self._simulation_app.set_setting('/app/livestream/websocket/framerate_limit', 120) self._simulation_app.set_setting('/ngx/enabled', False) enable_extension('omni.kit.streamsdk.plugins-3.2.1') enable_extension('omni.kit.livestream.core-3.2.0') enable_extension('omni.kit.livestream.native') def setup_streaming_450(self, webrtc: bool): if webrtc: from omni.isaac.core.utils.extensions import enable_extension self._simulation_app.set_setting('/app/window/drawMouse', True) enable_extension('omni.kit.livestream.webrtc') @property def simulation_app(self): return self._simulation_app