How to Add Custom Metric#
Because metrics are typically designed for specific tasks, there is rarely a completely “universal” metric.
Therefore, after customizing the task, you usually need to customize the metric to record the info you need.
1. Defining a New Metric#
Before adding a new metric, we need to clarify the following issues:
Metric Name: What will the metric be called?
Metric Objective: What specific Objective will the metric achieve?
Metric Update Method: Record the status of the metric while task is running
Metrics Calculation: Summarize all records at the end of the task
Here is how we define TraveledDistanceMetric based on the above issues:
Record the total distance a robot moves from start to finish.
(Additional metrics as needed.)
To add a custom metric, you need to:
Create a config class for metric config, inheriting from the
internutopia.core.config.metric.MetricCfg
.Create a class for metric, inheriting from the
internutopia.core.task.metric.BaseMetric
.
2. Create Metrics Config Class#
Here’s an example of a config class for a metric:
# This is also the simplest configuration.
from typing import Optional
from internutopia.core.config.metric import MetricCfg
class TraveledDistanceMetricCfg(MetricCfg):
name: Optional[str] = 'traveled_distance_metric'
type: Optional[str] = 'TraveledDistanceMetric'
robot_name: str
Define a unique type name.
Define new parameters needed by the metric directly in the config.
3. Create Metrics Class#
In this doc, we demonstrate a simple metrics used to track the total distance a robot moves.
import numpy as np
from pydantic import BaseModel
from internutopia.core.config import TaskCfg
from internutopia.core.task.metric import BaseMetric
from internutopia.core.util import log
from internutopia_extension.configs.metrics.traveled_distance_metric import TraveledDistanceMetricCfg
@BaseMetric.register('TraveledDistanceMetric')
class TraveledDistanceMetric(BaseMetric):
"""
Calculate the total distance a robot moves
"""
def __init__(self, config: TraveledDistanceMetricCfg, task_config: TaskCfg):
super().__init__(config, task_config)
self.distance: float = 0.0
self.position = None
self.robot_name = config.robot_name
def update(self, obs: dict):
"""
This function is called at each world step.
"""
if self.position is None:
self.position = obs[self.robot_name]['position']
return
self.distance += np.linalg.norm(self.position - obs[self.robot_name]['position'])
self.position = obs[self.robot_name]['position']
return
def calc(self):
"""
This function is called to calculate the metrics when the episode is terminated.
"""
log.info('TraveledDistanceMetric calc() called.')
return self.distance
The
update
method will be invoked after every step.Do not use computationally intensive operations. This is performed by a For loop.
The received
obs
is a dict, where the key is the robot name and the value corresponds to the obs output from gym_env. It’s similar to the vec_env observation format.
The
calc
method will be invoked at the end of an episode.
4. Metric Usage Preview#
To use the custom metrics, you can simply include them in the configuration settings as follows
from internutopia.core.config import Config, SimConfig
from internutopia.core.gym_env import Env
from internutopia.core.util import has_display
from internutopia.macros import gm
from internutopia_extension import import_extensions
from internutopia_extension.configs.metrics.traveled_distance_metric import TraveledDistanceMetricCfg
from internutopia_extension.configs.robots.h1 import (
H1RobotCfg,
h1_camera_cfg,
h1_tp_camera_cfg,
move_along_path_cfg,
move_by_speed_cfg,
rotate_cfg,
)
from internutopia_extension.configs.tasks import FiniteStepTaskCfg
headless = False
if not has_display():
headless = True
h1_1 = H1RobotCfg(
position=(0.0, 0.0, 1.05),
controllers=[
move_by_speed_cfg,
move_along_path_cfg,
rotate_cfg,
],
sensors=[
h1_camera_cfg.update(name='camera', resolution=(320, 240), enable=True),
h1_tp_camera_cfg.update(enable=False),
],
)
config = Config(
simulator=SimConfig(physics_dt=1 / 240, rendering_dt=1 / 240, use_fabric=False, headless=headless),
metrics_save_path='./h1_traveled_distance_metric.jsonl',
task_configs=[
FiniteStepTaskCfg(
max_steps=300,
metrics=[TraveledDistanceMetricCfg(robot_name='h1')],
scene_asset_path=gm.ASSET_PATH + '/scenes/empty.usd',
scene_scale=(0.01, 0.01, 0.01),
robots=[h1_1],
),
],
)
import_extensions()
env = Env(config)
obs, _ = env.reset()
print(f'========INIT OBS{obs}=============')
path = [(1.0, 0.0, 0.0), (1.0, 1.0, 0.0), (3.0, 4.0, 0.0)]
i = 0
move_action = {move_along_path_cfg.name: [path]}
while env.simulation_app.is_running():
i += 1
action = move_action
obs, _, terminated, _, _ = env.step(action=action)
if terminated:
obs, info = env.reset()
if info is None:
break
env.close()
And you can check result in ./h1_traveled_distance_metric.jsonl
, the key of output json is the name
of metrics.
{"traveled_distance_metric": 0.7508679775492055}