Workers Interface

Last updated: Jul 03, 2026 (API docstrings are auto-generated).

VeRL-Omni workers wrap the Diffusers / FSDP training engine, the rollout engine (vLLM-Omni), and the optional reference policy. The single-controller trainer drives them through a unified RPC layer.

verl_omni.workers.engine_workers.TrainingWorker

TrainingWorker provides a Tinker-like API (https://thinkingmachines.ai/tinker/) as a RayWorkerGroup to a single controller.

verl_omni.workers.engine_workers.ActorRolloutRefWorker

Hybrid worker that includes actor model, rollout and optional ref model.

verl_omni.workers.engine.fsdp.diffusers_impl.DiffusersFSDPEngine

Base Diffusers engine using PyTorch FullyShardedDataParallel (FSDP).

verl_omni.workers.engine.lora_adapter_mixin.LoRAAdapterMixin

Backend-agnostic helpers for named PEFT/LoRA policy adapters.

verl_omni.workers.engine.fsdp.diffusers_impl.PPODiffusersFSDPEngine

Diffusers FSDP engine with PPO forward/backward and I/O preparation.

verl_omni.workers.config.DiffusionModelConfig

verl_omni.workers.config.DiffusionActorConfig

verl_omni.workers.config.FSDPDiffusionActorConfig

verl_omni.workers.config.DiffusionLossConfig

verl_omni.workers.config.DiffusionRolloutConfig

verl_omni.workers.config.DiffusionRolloutAlgoConfig

Algorithm configuration for the SDE-based diffusion rollout.

verl_omni.workers.config.DiffusionPipelineConfig

verl_omni.workers.config.DiffusionSamplingConfig

Engine Workers

class verl_omni.workers.engine_workers.TrainingWorker(config: TrainingWorkerConfig)[source]

TrainingWorker provides a Tinker-like API (https://thinkingmachines.ai/tinker/) as a RayWorkerGroup to a single controller. Currently, we only provide more coarse grained APIs, and do not provide exact APIs as Tinker does. But this can be added in the future.

__init__(config: TrainingWorkerConfig)[source]

Initialize the worker with environment settings and device configuration.

Parameters:

cuda_visible_devices (str, optional) – CUDA visible devices configuration. Defaults to None.

reset()[source]

Reset the model engine to the initial state. If the engine is not initialized, we initialize it. Otherwise, reload ckpt and reset states

to(device, model=True, optimizer=True, grad=True)[source]

Manual control of load/offload

train_mini_batch(data: TensorDict) TensorDict[source]

Split a batch into N mini-batches run for multiple epochs

Parameters:

data

Returns:

class verl_omni.workers.engine_workers.ActorRolloutRefWorker(config: DictConfig, role: str, distillation_config: DistillationConfig | None = None, **kwargs)[source]

Hybrid worker that includes actor model, rollout and optional ref model. For standalone actor or rollout, use ActorWorker or BaseRollout respectively.

NOTE: ActorRolloutRefWorker no longer support spmd mode and run native server mode.

__init__(config: DictConfig, role: str, distillation_config: DistillationConfig | None = None, **kwargs)[source]

Initialize the worker with environment settings and device configuration.

Parameters:

cuda_visible_devices (str, optional) – CUDA visible devices configuration. Defaults to None.

async update_weights(global_steps: int = None, mode: str = 'auto')[source]

Update weights from trainer to rollout.

  1. For sync training with colocated trainer and rollout (mode="naive"), update rollout directly from the model engine. - before update_weights: rollout should be in sleep mode. - after update_weights: rollout should be in wake_up mode.

  2. For async training with disaggregated trainer and rollout (any non-naive mode), send weights only through the checkpoint engine.

LoRA handling: when model.lora.merge=True (peft_merge), LoRA is merged into base weights before sync. The engine returns full HF-keyed params with peft_config=None, so the rollout receives a standard weight update.

Parameters:
  • global_steps – Current global training step count, passed to rollout for logging/tracking.

  • mode – Weight update strategy. "auto" resolves from config.rollout.checkpoint_engine.backend; "naive" uses direct colocated sync; any other value delegates to checkpoint_engine.send_weights.

Diffusers FSDP Engine

DiffusersFSDPEngine is the abstract base that implements the verl.workers.engine.base.BaseEngine interface for diffusion transformer backbones (e.g. Qwen-Image), including LoRA, mixed precision, and parameter / optimizer offloading.

class verl_omni.workers.engine.fsdp.diffusers_impl.DiffusersFSDPEngine(model_config: DiffusionModelConfig, engine_config: FSDPEngineConfig, optimizer_config: FSDPOptimizerConfig, checkpoint_config: CheckpointConfig)[source]

Base Diffusers engine using PyTorch FullyShardedDataParallel (FSDP).

Supports model sharding, activation/optimizer offloading, LoRA, and sequence parallelism.

__init__(model_config: DiffusionModelConfig, engine_config: FSDPEngineConfig, optimizer_config: FSDPOptimizerConfig, checkpoint_config: CheckpointConfig)[source]

Initialize the DiffusersFSDPEngine.

Sets up distributed device meshes, LoRA, and offload policies based on config.

Parameters:

config – Configuration object with FSDP and model settings.

disable_adapter()

Temporarily disable all PEFT adapters.

eval_mode(**kwargs)[source]

Return a context manager that switches to evaluation mode with FSDP-specific handling.

Includes activation offload entry/exit.

abstractmethod forward_step(micro_batch: TensorDict, loss_function, forward_only, step)[source]

Run one diffusion step forward (and loss); implemented by algorithm-specific subclasses.

get_per_tensor_param(layered_summon=False, base_sync_done=False, adapter_name: str | None = None, **kwargs)[source]

Get a generator that yields per-tensor parameters and optional peft config.

Returns:

A generator that yields tuples of parameter names and tensors. Optional[dict]: Optional peft config.

Return type:

Generator[tuple[str, torch.Tensor]]

initialize()[source]

Build the model, optimizer, and learning rate scheduler under FSDP.

Applies device, dtype, and precision configurations, including mixed precision. Sets up checkpoint manager and FLOPs counter.

load_checkpoint(local_path: str, hdfs_path: str | None = None, del_local_after_load: int = True, **kwargs) None[source]

Load FSDP checkpoint, restoring parameters and optimizer state.

lr_scheduler_step()[source]

Advance FSDP scheduler and return updated learning rate.

optimizer_step()[source]

Clip gradients, skip update if non-finite, and step optimizer.

Returns:

Norm of gradients before clipping.

Return type:

grad_norm (float)

optimizer_zero_grad()[source]

Zero gradients and enforce FSDP grad-clipping logic.

save_checkpoint(local_path: str, hdfs_path: str | None = None, global_step: int = 0, max_ckpt_to_keep: int | None = None, **kwargs) None[source]

Save FSDP checkpoint, handling parameter offload as needed.

to(device: str, model: bool = True, optimizer: bool = True, grad: bool = True)[source]

Move FSDP model and/or optimizer to CPU or GPU with offload support. Note that this function executes irrespective of offload config. It serves as manual control

train_mode(**kwargs)[source]

Return a context manager that switches to training mode with FSDP-specific handling.

Includes parameter and optimizer offload entry/exit.

Diffusers PPO FSDP Engine

PPODiffusersFSDPEngine is the concrete engine registered for FlowGRPO-style training (FlowGRPO, MixGRPO, GRPO-Guard). It subclasses DiffusersFSDPEngine and adds PPO forward/backward and batch I/O helpers.

class verl_omni.workers.engine.fsdp.diffusers_impl.PPODiffusersFSDPEngine(model_config: DiffusionModelConfig, engine_config: FSDPEngineConfig, optimizer_config: FSDPOptimizerConfig, checkpoint_config: CheckpointConfig)[source]

Diffusers FSDP engine with PPO forward/backward and I/O preparation.

__init__(model_config: DiffusionModelConfig, engine_config: FSDPEngineConfig, optimizer_config: FSDPOptimizerConfig, checkpoint_config: CheckpointConfig)

Initialize the DiffusersFSDPEngine.

Sets up distributed device meshes, LoRA, and offload policies based on config.

Parameters:

config – Configuration object with FSDP and model settings.

disable_adapter()

Temporarily disable all PEFT adapters.

eval_mode(**kwargs)

Return a context manager that switches to evaluation mode with FSDP-specific handling.

Includes activation offload entry/exit.

forward_backward_batch(data: TensorDict, loss_function: Callable, forward_only: bool = False) list[TensorDict][source]

Run forward/backward over a batch; implemented by algorithm-specific subclasses.

forward_step(micro_batch: TensorDict, loss_function, forward_only, step)[source]

Run one diffusion step forward (and loss); implemented by algorithm-specific subclasses.

get_per_tensor_param(layered_summon=False, base_sync_done=False, adapter_name: str | None = None, **kwargs)

Get a generator that yields per-tensor parameters and optional peft config.

Returns:

A generator that yields tuples of parameter names and tensors. Optional[dict]: Optional peft config.

Return type:

Generator[tuple[str, torch.Tensor]]

initialize()

Build the model, optimizer, and learning rate scheduler under FSDP.

Applies device, dtype, and precision configurations, including mixed precision. Sets up checkpoint manager and FLOPs counter.

load_checkpoint(local_path: str, hdfs_path: str | None = None, del_local_after_load: int = True, **kwargs) None

Load FSDP checkpoint, restoring parameters and optimizer state.

lr_scheduler_step()

Advance FSDP scheduler and return updated learning rate.

optimizer_step()

Clip gradients, skip update if non-finite, and step optimizer.

Returns:

Norm of gradients before clipping.

Return type:

grad_norm (float)

optimizer_zero_grad()

Zero gradients and enforce FSDP grad-clipping logic.

prepare_model_inputs(micro_batch: TensorDict, step: int)[source]

Extract and pre-process universal tensors, then delegate architecture-specific input construction to the registered DiffusionModelBase subclass.

Handles common tensor extraction and nested-embed unpadding here. Architecture-specific input dict construction is delegated to the model registry.

prepare_model_outputs(output, micro_batch: TensorDict)[source]

Post-process raw model output; implemented by algorithm-specific subclasses.

save_checkpoint(local_path: str, hdfs_path: str | None = None, global_step: int = 0, max_ckpt_to_keep: int | None = None, **kwargs) None

Save FSDP checkpoint, handling parameter offload as needed.

to(device: str, model: bool = True, optimizer: bool = True, grad: bool = True)

Move FSDP model and/or optimizer to CPU or GPU with offload support. Note that this function executes irrespective of offload config. It serves as manual control

train_mode(**kwargs)

Return a context manager that switches to training mode with FSDP-specific handling.

Includes parameter and optimizer offload entry/exit.

LoRA Adapter Mixin

Reusable PEFT/LoRA helpers for named policy adapters (e.g. default and old). Used by DiffusersFSDPEngine.

class verl_omni.workers.engine.lora_adapter_mixin.LoRAAdapterMixin[source]

Backend-agnostic helpers for named PEFT/LoRA policy adapters.

copy_adapter(source: str = 'default', target: str = 'old') None[source]

Copy LoRA state between named policy adapters.

disable_adapter()[source]

Temporarily disable all PEFT adapters.

ema_update_adapter(source: str = 'default', target: str = 'old', decay: float = 0.0) None[source]

EMA-update target adapter parameters from source adapter parameters.

use_adapter(name: str)[source]

Temporarily select a named PEFT adapter.

"reference" is a logical policy state (see policy_state_adapters) that runs with all LoRA adapters disabled, not a registered PEFT adapter.

Loss Functions

verl_omni.workers.utils.losses.diffusion_loss(config: DiffusionActorConfig, model_output, data: TensorDict, dp_group=None)[source]

Compute loss for diffusion model

Padding Utilities

Padding utilities for diffusion model training.

verl_omni.workers.utils.padding.embeds_padding_2_no_padding(data: TensorDict) TensorDict[source]

Convert TensorDict from prompt embeds with padding to no-padding format. For diffusion model training only.

Currently we expect the prompt embedding mask to be [1111000…] format, which means the valid tokens are continuous and start from the left.

Parameters:

data – TensorDict with prompt_embeds, prompt_embeds_mask, negative_prompt_embeds, negative_prompt_embeds_mask.

Returns:

TensorDict where prompt_embeds and negative_prompt_embeds are replaced with jagged torch.nested tensors. Tensor masks are also converted to nested tensors after stripping padding; missing or non-tensor masks leave the full embedding sequence intact.

Worker Configs

The configs below are dataclass mirrors of the YAML / Hydra options consumed by the engine workers. They are typically built from omegaconf.DictConfig via verl.utils.config.omega_conf_to_dataclass().

class verl_omni.workers.config.DiffusionModelConfig(_target_: str = '', path: str = '???', architecture: str | None = None, transformer_config: Optional[dict[str, Any]]=None, algorithm: str = '???', local_path: str | None = None, tokenizer_path: str | None = None, local_tokenizer_path: str | None = None, model_type: str = 'diffusion_model', load_tokenizer: bool = True, tokenizer: Any = None, processor: Any = None, use_shm: bool = False, trust_remote_code: bool = False, custom_chat_template: str | None = None, external_lib: str | None = None, enable_gradient_checkpointing: bool = True, attn_backend: str = '_flash_3_varlen_hub', lora_rank: int = 0, lora_alpha: int = 64, lora_init_weights: str = 'gaussian', target_modules: Any | None = 'all-linear', target_parameters: list[str] | None = None, exclude_modules: str | None = None, lora: dict[str, typing.Any]=<factory>, lora_adapter_path: str | None = None, policy_state_adapters: tuple[str, ...]=('default', ), lora_dtype: str | None = None, mtp: verl.workers.config.model.MtpConfig | None = <factory>, pipeline: verl_omni.workers.config.diffusion.rollout.DiffusionPipelineConfig = <factory>, algo: verl_omni.workers.config.diffusion.rollout.DiffusionRolloutAlgoConfig | None = <factory>, fsdp_layer_prefixes: list[str] = <factory>, config_path: str | None = None, transformer_subfolder: str = 'transformer')[source]
class verl_omni.workers.config.DiffusionActorConfig(_target_: str = '', strategy: str = '???', ppo_mini_batch_size: int = 256, ppo_micro_batch_size_per_gpu: int = '???', diffusion_loss: verl_omni.workers.config.diffusion.actor.DiffusionLossConfig = <factory>, loss_scale_factor: float | None = None, use_kl_loss: bool = False, kl_loss_coef: float = 0.001, ppo_epochs: int = 1, shuffle: bool = False, data_loader_seed: int = 42, checkpoint: verl.trainer.config.config.CheckpointConfig = <factory>, optim: verl.workers.config.optimizer.OptimizerConfig = <factory>, engine: verl.base_config.BaseConfig = <factory>, rollout_n: int = '???', model_config: verl_omni.workers.config.diffusion.model.DiffusionModelConfig = <factory>, log_prob_micro_batch_size_per_gpu: int | None = None, profiler: verl.utils.profiler.config.ProfilerConfig | None = None, global_batch_info: dict = <factory>, rollout_correction: verl.trainer.config.algorithm.RolloutCorrectionConfig = <factory>)[source]
class verl_omni.workers.config.FSDPDiffusionActorConfig(_target_: str = '', strategy: str = 'fsdp', ppo_mini_batch_size: int = 256, ppo_micro_batch_size_per_gpu: int = '???', diffusion_loss: verl_omni.workers.config.diffusion.actor.DiffusionLossConfig = <factory>, loss_scale_factor: float | None = None, use_kl_loss: bool = False, kl_loss_coef: float = 0.001, ppo_epochs: int = 1, shuffle: bool = False, data_loader_seed: int = 42, checkpoint: verl.trainer.config.config.CheckpointConfig = <factory>, optim: verl.workers.config.optimizer.OptimizerConfig = <factory>, engine: verl.base_config.BaseConfig = <factory>, rollout_n: int = '???', model_config: verl_omni.workers.config.diffusion.model.DiffusionModelConfig = <factory>, log_prob_micro_batch_size_per_gpu: int | None = None, profiler: verl.utils.profiler.config.ProfilerConfig | None = None, global_batch_info: dict = <factory>, rollout_correction: verl.trainer.config.algorithm.RolloutCorrectionConfig = <factory>, grad_clip: float = 1.0, fsdp_config: verl.workers.config.engine.FSDPEngineConfig = <factory>)[source]
class verl_omni.workers.config.DiffusionLossConfig(_target_: str = '', loss_mode: str = 'flow_grpo', clip_ratio: float = 0.0001, adv_clip_max: float = 5.0, mix_beta: float = 0.5, ref_kl_coef: float = 0.0, adaptive_weight_min: float = 1e-05, dpo_beta: float = 2000.0, kl_mask_threshold: float = 1e-05, add_kl_coefficient: bool = True)[source]
class verl_omni.workers.config.DiffusionRolloutConfig(_target_: str = '', name: str | None = '???', mode: str = 'async', nnodes: int = 0, n_gpus_per_node: int = 8, n: int = 1, seed: int | None = None, prompt_length: int = 512, dtype: str = 'bfloat16', gpu_memory_utilization: float = 0.5, enforce_eager: bool = False, cudagraph_capture_sizes: list | None = None, rollout_attn_backend: str = 'FLASH_ATTN', free_cache_engine: bool = True, data_parallel_size: int = 1, expert_parallel_size: int = 1, tensor_model_parallel_size: int = 2, pipeline_model_parallel_size: int = 1, max_num_batched_tokens: int = 8192, logprobs_mode: str | None = 'processed_logprobs', scheduling_policy: str | None = 'fcfs', val_kwargs: verl_omni.workers.config.diffusion.rollout.DiffusionSamplingConfig = <factory>, max_model_len: int | None = None, max_num_seqs: int = 1024, step_execution: bool = False, log_prob_micro_batch_size_per_gpu: int | None = None, disable_log_stats: bool = True, engine_kwargs: dict = <factory>, pipeline: verl_omni.workers.config.diffusion.rollout.DiffusionPipelineConfig = <factory>, calculate_log_probs: bool = False, rollout_adapter: str = 'default', agent: verl.workers.config.rollout.AgentLoopConfig = <factory>, multi_turn: verl.workers.config.rollout.MultiTurnConfig = <factory>, prometheus: verl.workers.config.rollout.PrometheusConfig = <factory>, checkpoint_engine: verl.workers.config.rollout.CheckpointEngineConfig = <factory>, enable_chunked_prefill: bool = True, enable_prefix_caching: bool = True, load_format: str = 'dummy', layered_summon: bool = False, skip_tokenizer_init: bool = True, quantization: str | None = None, enable_rollout_routing_replay: bool = False, enable_sleep_mode: bool = True, mtp: verl.workers.config.model.MtpConfig | None = <factory>, profiler: verl.utils.profiler.config.ProfilerConfig | None = None, algo: verl_omni.workers.config.diffusion.rollout.DiffusionRolloutAlgoConfig | None = <factory>, disaggregation: verl.workers.config.disaggregation.DisaggregationConfig = <factory>, external_lib: str | None = None)[source]
resolve_algorithm(model_config) None[source]

Update model_config.algorithm to the _stepwise variant when step_execution is enabled.

When step_execution=True and a <algorithm>_stepwise pipeline class is registered for the given architecture, model_config.algorithm is updated in-place so that the engine uses the experimental prepare_encode / step_scheduler / post_decode overrides.

class verl_omni.workers.config.DiffusionRolloutAlgoConfig(_target_: str = '', noise_level: float = 1.0, sde_type: str = 'sde', sde_window_size: int | None = None, sde_window_range: list[int] | None = None, sample_strategy: str = 'random', iters_per_group: int = 1, sde_window_seed: int = 0)[source]

Algorithm configuration for the SDE-based diffusion rollout.

class verl_omni.workers.config.DiffusionPipelineConfig(_target_: str = '', height: int = 512, width: int = 512, num_inference_steps: int = 10, true_cfg_scale: float = 1.0, max_sequence_length: int = 512, guidance_scale: float | None = None, num_frames: int = 1)[source]
class verl_omni.workers.config.DiffusionSamplingConfig(_target_: str = '', n: int = 1, seed: int = 42, pipeline: verl_omni.workers.config.diffusion.rollout.DiffusionPipelineConfig = <factory>, algo: verl_omni.workers.config.diffusion.rollout.DiffusionRolloutAlgoConfig = <factory>)[source]