Source code for internutopia.core.vec_env

from typing import Any, Dict, List, OrderedDict, Tuple, Union

from internutopia.core.config import Config, DistributedConfig
from internutopia.core.distribution.launcher import Launcher
from internutopia.core.task_config_manager.base import create_task_config_manager
from internutopia.core.util import extensions_utils, log


[docs]class Env: """ Vectorized environments to run multiple independent environments simultaneously with multiple agents. **NOT gymnasium compatible**. Parameters: config (Config): The config instance used for simulation management. """ def __init__(self, config: Config) -> None: self._render = None self._config = config self.task_config_manager = create_task_config_manager(self._config) self._runner_list = [] self.env_num = self._config.env_num self.proc_num = 1 self.is_remote = False if isinstance(config, DistributedConfig): import ray extensions = extensions_utils.dump_extensions() self._config.distribution_config.extensions = extensions self.is_remote = True self.proc_num = self._config.distribution_config.proc_num cluster_gpu_count = ray.cluster_resources().get('GPU', 0) request_gpu_count = self.proc_num * self._config.distribution_config.gpu_num_per_proc if cluster_gpu_count < request_gpu_count: description = f'Insufficient cluster resources, requested GPU: {request_gpu_count}, total GPU: {cluster_gpu_count}, ' description += ( 'Please adjust proc_num and gpu_num_per_proc to ensure that resources can meet requirements' ) raise RuntimeError(description) for runner_id in range(self.proc_num): self._config.distribution_config.runner_id = runner_id self._runner_list.append(Launcher(self._config, self.task_config_manager).start()) else: self._runner_list.append(Launcher(self._config, self.task_config_manager).start()) return
[docs] def reset(self, env_ids: List[int] = None) -> Tuple[List, List]: """ Resets the environments specified by the given environment IDs and returns the initial observations and task configs. If no environment IDs are provided, all environments will be reset. If no tasks are running after the reset, a log message is generated, and empty lists are returned. Parameters: env_ids (List[int]): A list of environment IDs to reset. If None, all environments will be reset. Returns: Tuple[List, List]: A tuple containing two lists: the initial observations and task configs for the reset environments. If no tasks are running, both lists will be empty. """ obs = [] task_configs = [] new_env_ids = [None for _ in range(self.proc_num)] if env_ids is not None: result_list = [] for env_id in env_ids: runner_id = env_id // self.env_num if new_env_ids[runner_id] is None: new_env_ids[runner_id] = [env_id % self.env_num] else: new_env_ids[runner_id].append(env_id % self.env_num) for runner_id in range(self.proc_num): if new_env_ids[runner_id]: result_list.append(self._runner_list[runner_id].reset(env_ids=new_env_ids[runner_id])) else: result_list = [ self._runner_list[runner_id].reset(env_ids=new_env_ids[runner_id]) for runner_id in range(self.proc_num) ] if self.is_remote: import ray result_list = ray.get(result_list) for _obs, _task_configs in result_list: obs.extend(_obs) task_configs.extend(_task_configs) if all(task_config is None for task_config in task_configs): log.info('No more episodes left') return obs, task_configs
[docs] def warm_up(self, steps: int = 10, render: bool = True, physics: bool = True): """ Warm up the env by running a specified number of steps. Args: steps (int): The number of warm-up steps to perform. Defaults to 10. render (bool): Whether to render the scene during warm-up. Defaults to True. physics (bool): Whether to enable physics during warm-up. Defaults to True. """ result_list = [ self._runner_list[index].warm_up(steps, render, physics) for index in range(len(self._runner_list)) ] if self.is_remote: import ray result_list = ray.get(result_list)
[docs] def step(self, action: List[Union[Dict, OrderedDict]]) -> Tuple[List, List, List, List, List]: """ Perform a single step in the environment using the provided actions. This method takes a list of actions, validates its structure and length, then delegates the step execution to the internal runner. It computes and returns the observations, rewards, termination status, truncation status, and additional information. Args: action (List[Union[Dict, OrderedDict]]): A list of actions to be executed in the environment. Each action is either a dictionary or an ordered dictionary. Returns: Tuple[List, List, List, List, List]: A tuple containing the following elements: - obs (List): The observations resulting from the actions. - reward (List): The rewards obtained from the actions. - terminated (List): The termination status of the environments. - truncated (List): The truncation status of the environments. - info (List): Additional information about the step execution. """ assert isinstance(action, list) assert len(action) == self.env_num * self.proc_num obs = [] reward = [] terminated = [] truncated = [False for _ in action] info = [None for _ in action] result_list = [ self._runner_list[index].step(action[index * self.env_num : (index + 1) * self.env_num]) for index in range(len(self._runner_list)) ] if self.is_remote: import ray result_list = ray.get(result_list) for _obs, _terminated, _reward in result_list: obs.extend(_obs) terminated.extend(_terminated) reward.extend(_reward) return obs, reward, terminated, truncated, info
@property def runner(self): """ The runner property provides access to the internal runner instance. """ if not self.is_remote: return self._runner_list[0].runner raise NotImplementedError('not implemented in distribution mode.') @property def is_render(self): """ Get render state. """ return self._render @property def active_task_configs(self): """ Get active task configs with env id as key. """ if not self.is_remote: return self.task_config_manager.get_active_task_configs() else: import ray return ray.get(self.task_config_manager.get_active_task_configs.remote())
[docs] def get_dt(self): """ Get dt of simulation environment. """ if not self.is_remote: return self._runner_list[0].runner.dt raise NotImplementedError('not implemented in distribution mode.')
[docs] def get_observations(self) -> List | Any: """ Get observations from Isaac environment """ obs = [] result_list = [self._runner_list[index].get_obs() for index in range(len(self._runner_list))] if self.is_remote: import ray result_list = ray.get(result_list) for _obs in result_list: obs.extend(_obs) return obs
[docs] def close(self): """Close the environment""" if not self.is_remote: self._runner_list[0].runner.simulation_app.close() else: import ray for proxy in self._runner_list: ray.kill(proxy.runner) ray.kill(self.task_config_manager)
@property def simulation_app(self): """Simulation app instance""" if not self.is_remote: return self._runner_list[0].runner.simulation_app raise NotImplementedError('not implemented in distribution mode.')
[docs] def finished(self) -> bool: """Check if all tasks are finished""" if not self.is_remote: return len(self._runner_list[0].runner.current_tasks) == 0 raise NotImplementedError('not implemented in distribution mode.')