Model (WIP)#
This document provides a overview of the modular model structure used in our robotic manipulation framework. It explains each componentβs purpose and configuration, enabling users to:
βοΈ Make customizations: Adjust parameters such as select_layer, tune_llm, or change the action horizon.
π Add custom models: Add new backbones, action heads, or policy models by following the provided templates.
Overview:
Backbone: Processes camera observations and natural language instructions into contextual embeddings.
Action Head: Converts backbone features into executable robot actions, often via diffusion-based generative modeling.
Policy Model: Orchestrates the backbone and action head, managing data flow and inference to produce final robot commands.
Minor customizations or full replacements are supported. For developing new policy models that combine or extend these modules, follow the guidance below.
Backbone#
We provide some kinds of the backbone: EagleBackbone
, EagleBackbone1_5
, DiffusionRgbEncoder
, PaliGemmaWithExpertModel
, and ACTBackbone
, .
EagleBackbone#
The Eagle Backbone serves as the foundational component that processes multimodal inputs (vision and language) and generates unified feature representations suitable for downstream tasks.
It has several steps to process the input:
Input Preparation: Converts raw inputs into model-compatible format
Feature Extraction: Processes visual and textual inputs separately
Multimodal Fusion: Combines visual and textual features using attention mechanisms
Output Generation: Produces unified feature representations
Key Features#
Configurable layer selection for LLM truncation
Visual and language model fine-tuning control
Built-in image preprocessing with EagleProcessor
Support for vision reprojection and resolution scaling
Usage#
Basic Initialization#
from internmanip.model.backbone.eagle_backbone import EagleBackbone
backbone = EagleBackbone(
select_layer=12, # Use first 12 LLM layers
tune_llm=False, # Freeze LLM parameters
tune_visual=True, # Fine-tune vision model
processor_cfg={
"model_path": "/path/to/eagle/model",
"max_input_tiles": 4,
"model_spec": {...}
}
)
Forward Pass#
batch_features = backbone.prepare_input(batch_dict)
output = backbone(batch_features)
# Returns: {"backbone_features": embeddings, "backbone_attention_mask": mask}
EagleBackbone1_5#
A simplified version designed for Eagle 1.5 models with streamlined configuration.
Key Features#
Simplified initialization with fewer parameters
Automatic parameter tracking and warnings
Prefix-based input filtering for multi-modal inputs
Fixed 2048 β 1536 dimension projection
Usage#
Basic Initialization#
from internmanip.model.backbone.eagle_backbone import EagleBackbone1_5
# Basic usage
backbone = EagleBackbone1_5(
select_layer=16, # Use first 16 LLM layers
tune_llm=True, # Fine-tune LLM
tune_visual=False, # Freeze vision model
project_to_dim=1536 # Output dimension
)
Forward Pass#
# Forward pass expects eagle_ prefixed inputs
vl_input = {
"eagle_input_ids": tensor,
"eagle_attention_mask": tensor,
"eagle_pixel_values": tensor,
"eagle_image_sizes": tensor
}
output = backbone(vl_input)
DiffusionRgbEncoder#
The Diffusion Vision Backbone is designed specifically for diffusion policy implementations. It provides efficient visual feature extraction optimized for robotic manipulation tasks.
It has several steps to process the input:
Image Preprocessing: Optional cropping and normalization
Backbone Feature Extraction: Pretrained CNN feature extraction
Spatial Softmax Pooling: Converts feature maps to spatial keypoints
Final Linear Projection: Maps keypoints to output feature dimensions
Key Features#
Automatically adapts to different input image dimensions
Leverages torchvision pretrained models
Generates focused attention points for manipulation tasks
Usage#
Basic Initialization#
from internmanip.configs.model.dp_cfg import DiffusionConfig
from internmanip.model.backbone.diffusion_vision_backbone import DiffusionRgbEncoder
config = DiffusionConfig(
vision_backbone='resnet50',
pretrained_backbone_weights=None, # Train from scratch
crop_shape=(256, 256),
crop_is_random=True,
spatial_softmax_num_keypoints=64,
use_group_norm=True, # Replace BatchNorm with GroupNorm
image_features=(3, 480, 640) # Original image dimensions
)
encoder = DiffusionRgbEncoder(config)
Forward Pass#
images = torch.randn(8, 3, 480, 640)
features = encoder(images)
# Output shape: [8, 64] where 64 = spatial_softmax_num_keypoints * 2
PaliGemmaWithExpertModel#
A hybrid backbone combining PaliGemma and Gemma Expert for enhanced multimodal reasoning. This model merges visual grounding with language modeling using shared and expert-specific transformer layers.
Key Features#
Dual Transformer Architecture: Combines
PaliGemma
(VLM) andGemma Expert
(LLM).Expert-tuned Attention Layers: Joint attention computation across base and expert models.
Vision-Language Embedding Support: Includes
embed_image()
andembed_language_tokens()
interfaces.Custom bfloat16 Precision Control: Converts selected submodules to bfloat16 with compatibility for physical intelligence models.
Selective Freezing: Fine-grained control over training of visual encoder and/or base model.
Usage#
Basic Initialization#
from internmanip.model.paligemma_with_expert import PaliGemmaWithExpertModel
from internmanip.configs.paligemma_with_expert_config import PaliGemmaWithExpertConfig
config = PaliGemmaWithExpertConfig(
paligemma_config=...,
gemma_expert_config=...,
freeze_vision_encoder=True,
train_expert_only=False,
attention_implementation='eager', # or 'fa2', 'flex'
)
model = PaliGemmaWithExpertModel(config)
Forward Pass#
# input_embeds: List of tensors from vision and language encoders
outputs, cache = model(
attention_mask=...,
position_ids=...,
past_key_values=..., # optional for autoregressive decoding
inputs_embeds=[vision_embeds, language_embeds],
use_cache=True,
fill_kv_cache=False,
)
ACTBackbone#
The ACT (Action Chunking Transformer) Backbone is designed specifically for the ACT framework, which processes multimodal inputs including robot states, environment states, and visual observations to generate contextual embeddings for action prediction.
It has several steps to process the input:
Image Feature Extraction: Uses pretrained CNN backbones (e.g., ResNet) to extract visual features from camera observations
State Processing: Processes robot state and environment state through linear projections
Latent Encoding: Handles latent space encoding for VAE-based training (optional)
Transformer Encoding: Uses a transformer encoder to fuse all features into unified representations
Positional Embedding: Applies 2D sinusoidal positional embeddings for visual features and learned embeddings for state features
Key Features#
Modular Design: Separates image processing, state processing, and transformer encoding
Multi-modal Fusion: Combines visual, state, and latent features through attention mechanisms
Flexible Input Handling: Supports various input tensor shapes and dimensions
VAE Integration: Optional variational autoencoder for latent space modeling
Configurable Architecture: Adjustable transformer layers, attention heads, and feature dimensions
Architecture Components#
Image Backbone: Pretrained CNN (ResNet variants) for visual feature extraction
State Projections: Linear layers for robot and environment state processing
Transformer Encoder: Multi-layer transformer for feature fusion
Positional Embeddings: 2D sinusoidal embeddings for visual features, learned embeddings for states
Usage#
Basic Initialization#
from internmanip.model.backbone.act_backbone import ACTBackbone
from internmanip.model.basemodel.act_detr.configuration_act import ACTConfig
config = ACTConfig()
config.dim_model = 256
config.latent_dim = 64
config.n_encoder_layers = 4
config.n_heads = 8
config.dim_feedforward = 1024
config.dropout = 0.1
config.pre_norm = True
config.feedforward_activation = "relu"
config.use_vae = False
# Set up input/output features
from internmanip.model.types import PolicyFeature, FeatureType
input_features = {
'observation.state': PolicyFeature(type=FeatureType.STATE, shape=(8,)),
'observation.environment_state': PolicyFeature(type=FeatureType.ENV, shape=(10,))
}
output_features = {
'action': PolicyFeature(type=FeatureType.ACTION, shape=(8,))
}
config.set_input_output_features(input_features, output_features)
backbone = ACTBackbone(config)
Forward Pass#
batch = {
"observation.state": torch.randn(batch_size, 8),
"observation.environment_state": torch.randn(batch_size, 10),
"observation.images": [torch.randn(batch_size, 3, 224, 224)], # Optional
"action": torch.randn(batch_size, 16, 8),
"action_is_pad": torch.zeros(batch_size, 16, dtype=torch.bool)
}
latent_sample = torch.randn(batch_size, config.latent_dim)
encoder_out = backbone.encode_features(batch, latent_sample)
# Returns: (seq_len, batch_size, dim_model) tensor
Action Head#
Action Head is responsible for converting the high-level contextual features from the VLM backbone into executable robot actions. It uses a flow-matching based approach for robust action generation.
We provides some kinds of action head: FlowmatchingActionHead
, FlowmatchingActionHead_1_5
, DiffusionActionHead
, PI0FlowMatching
and ACTActionHead
.
FlowmatchingActionHead#
The main flow matching action head that uses a DiT (Diffusion Transformer) model to generate robot actions through denoising diffusion.
Key Features#
Flow matching training with Beta distribution noise scheduling
Multi-embodiment support with category-specific encoders/decoders
Sinusoidal timestep encoding for diffusion process
Cross-attention between state-action embeddings and vision-language features
Configurable projector and diffusion model fine-tuning
Usage#
Basic Initialization#
from internmanip.model.action_head.flow_matching_action_head import FlowmatchingActionHead, FlowmatchingActionHeadConfig
config = FlowmatchingActionHeadConfig(
action_dim=7, # Robot action dimension
action_horizon=16, # Action sequence length
max_num_embodiments=32, # Number of robot types
num_inference_timesteps=10, # Denoising steps
tune_projector=True, # Fine-tune encoders/decoders
tune_diffusion_model=True, # Fine-tune DiT model
diffusion_model_cfg={...} # DiT configuration
)
action_head = FlowmatchingActionHead(config)
Forward Pass#
backbone_output = {"backbone_features": vl_embeddings, "backbone_attention_mask": mask}
action_input = {"action": actions, "state": states, "embodiment_id": robot_ids, "action_mask": mask}
output = action_head(backbone_output, action_input)
# Returns: {"loss": mse_loss, "predictions": predicted_velocity}
Inference#
with torch.no_grad():
result = action_head.inference(backbone_output, action_input)
# Returns: {"action_pred": generated_actions}
FlowmatchingActionHead_1_5#
Enhanced version with additional backbone processing capabilities and batch expansion support.
Key Features#
Vision-language layer normalization and self-attention processing
Batch expansion for data augmentation during training
Improved backbone feature processing pipeline
Usage#
Basic Initialization#
from internmanip.model.action_head.flow_matching_action_head import FlowmatchingActionHead_1_5, FlowmatchingActionHeadConfig_1_5
# Configuration with additional features
config = FlowmatchingActionHeadConfig_1_5(
action_dim=7,
action_horizon=16,
use_vlln=True, # Use vision-language layer norm
expand_batch=2, # Expand batch by 2x for training
vl_self_attention_cfg={...}, # Self-attention configuration
**standard_config_params
)
# Initialize and use similarly to base version
action_head = FlowmatchingActionHead_1_5(config)
Forward pass#
output = action_head(backbone_output, action_input)
# Returns: {"loss": mse_loss, "predictions": predicted_velocity}
Inference#
with torch.no_grad():
result = action_head.get_action(backbone_output, action_input)
# Returns: {"action_pred": generated_actions}
DiffusionActionHead#
The DiffusionActionHead
class uses a 1D convolutional UNet to generate robot actions through denoising diffusion, following the Diffusion Policy framework.
Key Features#
DDPM/DDIM noise scheduling with configurable timesteps and beta parameters
Multi-modal conditioning supporting vision, state, and language inputs
1D Convolutional UNet with FiLM modulation for temporal action sequence generation
Language conditioning via CLIP embeddings with trainable projection layers
Flexible observation encoding with support for multiple camera views and separate encoders
Usage#
Basic Initialization#
from internmanip.model.action_head.diffusion_action_head import DiffusionActionHead
from internmanip.configs.model.dp_cfg import DiffusionConfig
config = DiffusionConfig(
action_dim=7, # Robot action dimension
horizon=16, # Action sequence length
n_obs_steps=2, # Number of observation steps
n_action_steps=8, # Number of action steps to execute
robot_state_dim=14, # Robot state dimension
num_train_timesteps=100, # Training denoising timesteps
num_inference_steps=10, # Inference denoising steps
use_language_conditioning=True, # Enable language conditioning
language_model_name="openai/clip-vit-base-patch32",
noise_scheduler_type="DDPM" # or "DDIM"
)
action_head = DiffusionActionHead(config)
Training#
batch = {
"observation.state": robot_states, # (B, n_obs_steps, state_dim)
"observation.images": rgb_images, # (B, n_obs_steps, num_cameras, C, H, W)
"language": language_instructions, # List[str] or str
"action": target_actions, # (B, horizon, action_dim)
"action_is_pad": padding_mask # (B, horizon)
}
loss_dict = action_head.compute_loss(batch)
loss = loss_dict["loss"]
loss.backward()
Inference#
with torch.no_grad():
all_actions = action_head.generate_all_actions(batch)
# Returns: (B, horizon, action_dim)
executable_actions = action_head.generate_actions(batch)
# Returns: (B, n_action_steps, action_dim)
PI0FlowMatching#
This model enables end-to-end multimodal action generation through diffusion-style flow matching, using PaliGemma for perception and Gemma Expert for reasoning.
Key Features#
Multi-Stage Embedding: Separately processes prefix (image & language) and suffix (robot state, actions, timestep).
Diffusion-Based Control: Implements action prediction via learned denoising steps over time.
Dual-Transformer Backbone: Combines PaliGemma with an Expert Gemma head for expert-guided decoding.
Pretrained Vision-Language Model: Leverages the power of PaliGemma for grounded understanding.
Fine-Grained Attention Control: Per-token control over attention and padding across all modalities.
Usage#
Training Forward Pass#
loss = policy(
images, # List[Tensor]: shape [B, C, H, W]
img_masks, # List[Tensor]: shape [B] (bool)
lang_tokens, # Tensor: [B, T]
lang_masks, # Tensor: [B, T] (bool)
state, # Tensor: [B, T, state_dim]
actions, # Tensor: [B, T, action_dim]
)
# Output: [B, T, action_dim], per-step loss
Action Sampling#
actions = policy.sample_actions(
images,
img_masks,
lang_tokens,
lang_masks,
state
)
# Output: [B, T, action_dim], denoised predicted actions
Internally runs an Euler integrator over diffusion steps to progressively refine the action trajectory.
ACTActionHead#
The ACTActionHead
is designed specifically for the ACT framework, responsible for converting encoded features from the ACT backbone into executable robot actions through transformer-based decoding and optional VAE encoding.
It has several steps to process the input:
Latent Encoding: Optional VAE encoder for latent space modeling during training
Transformer Decoding: Uses a transformer decoder to generate action sequences
Action Prediction: Final linear projection to output robot actions
Positional Embedding: Learned positional embeddings for action sequence generation
Key Features#
VAE Integration: Optional variational autoencoder for latent space modeling
Transformer Decoder: Multi-layer transformer decoder for action sequence generation
Action Chunking: Generates action sequences of configurable length
Flexible Conditioning: Supports various input modalities (state, image, language)
Temporal Ensembling: Optional temporal ensemble for improved action consistency
Architecture Components#
VAE Encoder: BERT-style encoder for latent space modeling (optional)
Transformer Decoder: Multi-layer decoder with cross-attention to encoder features
Action Head: Linear projection layer for final action prediction
Positional Embeddings: Learned embeddings for action sequence positions
Usage#
Basic Initialization#
from internmanip.model.action_head.act_action_head import ACTActionHead
from internmanip.model.basemodel.act_detr.configuration_act import ACTConfig
config = ACTConfig()
config.dim_model = 256
config.latent_dim = 64
config.chunk_size = 16
config.n_decoder_layers = 4
config.n_heads = 8
config.dim_feedforward = 1024
config.dropout = 0.1
config.pre_norm = True
config.feedforward_activation = "relu"
config.use_vae = False
# Set up input/output features
from internmanip.model.types import PolicyFeature, FeatureType
input_features = {
'observation.state': PolicyFeature(type=FeatureType.STATE, shape=(8,)),
'observation.environment_state': PolicyFeature(type=FeatureType.ENV, shape=(10,))
}
output_features = {
'action': PolicyFeature(type=FeatureType.ACTION, shape=(8,))
}
config.set_input_output_features(input_features, output_features)
action_head = ACTActionHead(config)
Forward Pass#
# For training with VAE
batch = {
"observation.state": torch.randn(batch_size, 8),
"observation.environment_state": torch.randn(batch_size, 10),
"action": torch.randn(batch_size, 16, 8),
"action_is_pad": torch.zeros(batch_size, 16, dtype=torch.bool)
}
# Encode latent (if using VAE)
latent_sample, mu, log_sigma_x2 = action_head.encode_latent(batch)
# Decode actions
encoder_out = torch.randn(seq_len, batch_size, config.dim_model)
actions = action_head.decode_actions(encoder_out, batch_size, device)
# Returns: (batch_size, chunk_size, action_dim) tensor
Inference#
with torch.no_grad():
# For inference without VAE
batch = {
"observation.state": torch.randn(batch_size, 8),
"observation.environment_state": torch.randn(batch_size, 10)
}
latent_sample, _, _ = action_head.encode_latent(batch)
# latent_sample will be zeros when not using VAE
encoder_out = backbone.encode_features(batch, latent_sample)
actions = action_head.decode_actions(encoder_out, batch_size, device)
# Returns: (batch_size, chunk_size, action_dim) tensor
Add a Custom Model#
The Policy Model serves as the top-level orchestrator that integrates VLM backbone and action head components to create end-to-end robotic manipulation policies. It manages the complete pipeline from multimodal observations to executable robot actions.
You need to implement a policy model to drive the robot.
Create a new Python file in internmanip/model/basemodel/
directory. Here is a template for a custom policy model:
from pydantic import BaseModel
from typing import Dict, Any, Optional
import torch
import torch.nn as nn
from internmanip.dataset.transform.base import BatchFeature
from internmanip.model.basemodel.base import BasePolicyModel
class CustomPolicyConfig(BaseModel):
"""Configuration for Custom Policy Model."""
pass
@BasePolicyModel.register("custom_policy")
class CustomPolicy(BasePolicyModel):
"""Custom Policy Model implementation."""
def __init__(self, config: CustomPolicyConfig):
super().__init__()
self.config = config
pass
def forward(self, batch: BatchFeature, train: bool = True, **kwargs) -> Dict[str, torch.Tensor]:
pass
def inference(self, batch: BatchFeature, **kwargs) -> Dict[str, torch.Tensor]:
"""Inference-specific forward pass."""
return self.forward(batch, train=False, **kwargs)
def calc_loss(self, batch: BatchFeature, **kwargs) -> torch.Tensor:
"""Calculate training loss."""
outputs = self.forward(batch, train=True, **kwargs)
return outputs["loss"]