Pipelines Interface
Last updated: Jun 05, 2026 (API docstrings are auto-generated).
A pipeline in VeRL-Omni packages everything needed to plug a particular diffusion model architecture into the training loop:
a training-side adapter subclassing
DiffusionModelBasethat handles scheduler setup, model-input construction, and the per-step forward / reverse-sampling logic used by RL algorithms (e.g. FlowGRPO);an optional rollout-side adapter registered via
VllmOmniPipelineBasethat hooks into vLLM-Omni’s diffusion serving stack to expose log-probabilities.
Adapters are auto-selected by matching the pair
(DiffusionModelConfig.architecture, DiffusionModelConfig.algorithm) against the
registered (architecture, algorithm) key. The architecture is read from the
model’s model_index.json; the algorithm string is taken from the model config’s
actor_rollout_ref.model.algorithm value.
Abstract base class for diffusion model training helpers. |
|
Registry base for vllm-omni custom diffusion pipeline classes. |
|
|
SDE version of the FlowMatchEulerDiscreteScheduler. |
Model Base
- class verl_omni.pipelines.model_base.DiffusionModelBase[source]
Abstract base class for diffusion model training helpers.
Different diffusion models have very different forward / sampling logic. Subclass this ABC and implement the three abstract methods to plug your model into the verl training loop.
To register, decorate your subclass with
@DiffusionModelBase.register("name", algorithm="..."). The name must match the_class_namevalue in the pipeline’smodel_index.json(which is auto-detected intoDiffusionModelConfig.architecture). The algorithm must matchDiffusionModelConfig.algorithm.Example:
@DiffusionModelBase.register("QwenImagePipeline", algorithm="flow_grpo") class QwenImage(DiffusionModelBase): ...
- abstractmethod classmethod build_scheduler(model_config: DiffusionModelConfig) SchedulerMixin[source]
Build and configure the diffusion scheduler for this model. The returned scheduler should have timesteps and sigmas already set.
- Parameters:
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
- abstractmethod classmethod forward_and_sample_previous_step(module: ModelMixin, scheduler: SchedulerMixin, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)[source]
Forward the model and sample the previous step. Used for RL-algorithms based on reversed-sampling (FlowGRPO, DanceGRPO, etc.).
- Parameters:
module (ModelMixin) – the diffusion model to be forwarded.
scheduler (SchedulerMixin) – the scheduler used for the diffusion process.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
model_inputs (dict[str, torch.Tensor]) – the inputs to the diffusion model.
negative_model_inputs (Optional[dict[str, torch.Tensor]]) – the negative inputs for guidance.
scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – the extra inputs for the scheduler, which may contain the latents and timesteps.
step (int) – the current step in the diffusion process.
- Returns:
(log_prob, prev_sample_mean, std_dev_t, sqrt_dt)- Return type:
tuple
- classmethod get_class(model_config: DiffusionModelConfig) type[DiffusionModelBase][source]
Return the registered subclass for
(architecture, algorithm).
- abstractmethod classmethod prepare_model_inputs(module: ModelMixin, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict, dict | None][source]
Build architecture-specific inputs for a model forward. For reverse-trajectory algorithms,
latentsandtimestepsusually contain the full rollout trajectory andstepselects the current slice. For forward-process objectives, callers may pass an already selected/noised latent and timestep directly. The caller is responsible for universal pre-processing (common tensor extraction and nested-embed unpadding) before invoking this method.- Parameters:
module (ModelMixin) – the diffusion transformer module.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
latents (torch.Tensor) – latent tensor from the micro-batch; either a full trajectory of shape (B, T, …) or a selected/noised latent of shape (B, …).
timesteps (torch.Tensor) – timestep tensor from the micro-batch; either a full trajectory of shape (B, T) or a selected timestep of shape (B,).
prompt_embeds (torch.Tensor) – dense positive prompt embeddings, shape (B, L, D).
prompt_embeds_mask (torch.Tensor) – attention mask for prompt_embeds, shape (B, L).
negative_prompt_embeds (torch.Tensor) – dense negative prompt embeddings, shape (B, L, D).
negative_prompt_embeds_mask (torch.Tensor) – attention mask for negative_prompt_embeds.
micro_batch (TensorDict) – the full micro-batch, available for architecture-specific metadata (e.g. height, width, vae_scale_factor).
step (int) – the current denoising step index.
- classmethod register(architecture: str, algorithm: str)[source]
Class decorator that registers a subclass for
(architecture, algorithm).
- abstractmethod classmethod set_timesteps(scheduler: SchedulerMixin, model_config: DiffusionModelConfig, device: str)[source]
Set timesteps and sigmas on the scheduler and move them to device.
- Parameters:
scheduler (SchedulerMixin) – the scheduler used for the diffusion process.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
device (str) – the device to move the timesteps and sigmas to.
- class verl_omni.pipelines.model_base.VllmOmniPipelineBase[source]
Registry base for vllm-omni custom diffusion pipeline classes.
To register, decorate your custom pipeline class with
@VllmOmniPipelineBase.register("name", algorithm="..."). The name must match the_class_namevalue in the pipeline’smodel_index.json(which is auto-detected intoDiffusionModelConfig.architecture). The algorithm must matchDiffusionModelConfig.algorithm.Example:
@VllmOmniPipelineBase.register("QwenImagePipeline", algorithm="flow_grpo") class QwenImagePipelineWithLogProb(QwenImagePipeline): ...
- classmethod get_class(architecture: str, algorithm: str) type | None[source]
Return the registered pipeline class for
(architecture, algorithm), orNone.
Pipeline Helpers
Convenience wrappers that dispatch to the registered subclass for the current architecture. The Diffusers FSDP engine and the agent loop call into these helpers rather than touching the registry directly.
Schedulers
- class verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteScheduler(num_train_timesteps: int = 1000, shift: float = 1.0, use_dynamic_shifting: bool = False, base_shift: float | None = 0.5, max_shift: float | None = 1.15, base_image_seq_len: int = 256, max_image_seq_len: int = 4096, invert_sigmas: bool = False, shift_terminal: float = None, use_karras_sigmas: bool = False, use_exponential_sigmas: bool = False, use_beta_sigmas: bool = False, time_shift_type: Literal['exponential', 'linear'] = 'exponential', stochastic_sampling: bool = False)[source]
SDE version of the FlowMatchEulerDiscreteScheduler. The implementation is based on FlowGRPO paper (https://arxiv.org/abs/2505.05470) and diffusers v0.37 branch.
- sample_previous_step(sample: Tensor, model_output: Tensor, timestep: FloatTensor | None = None, generator: Generator | None = None, per_token_timesteps: Tensor | None = None, noise_level: float = 0.7, prev_sample: Tensor | None = None, sde_type: Literal['cps', 'sde', 'dance_sde'] = 'sde', return_logprobs: bool = True, return_sqrt_dt: bool = False)[source]
Run a single SDE / CPS reverse step.
- Parameters:
sample (torch.FloatTensor) – A current instance of a sample created by the diffusion process.
model_output (torch.FloatTensor) – The direct output from learned diffusion model.
timestep (torch.FloatTensor, optional) – The current discrete timestep in the diffusion chain. When None, the internal step_index is used (sequential denoising loop).
generator (torch.Generator, optional) – A random number generator.
per_token_timesteps (torch.Tensor, optional) – The timesteps for each token in the sample. Currently not supported.
noise_level (float, optional, defaults to 0.7) – The noise level used in the SDE.
prev_sample (torch.FloatTensor, optional) – The sample from the previous timestep. If provided, it is used directly for log-probability computation instead of being sampled.
sde_type (str, optional, defaults to “sde”) – The type of SDE to use. Choose between “sde”, “cps”, and “dance_sde”.
return_logprobs (bool, optional, defaults to True) – Whether to return log probabilities of the previous sample.
return_sqrt_dt (bool, optional, defaults to False) – Whether to additionally return sqrt(-dt) as a tensor of shape (batch_size,). Used by GRPO-Guard to compute the importance-ratio normalization (see GRPOGuardLoss).
- step(model_output: FloatTensor, timestep: float | FloatTensor, sample: FloatTensor, s_churn: float = 0.0, s_tmin: float = 0.0, s_tmax: float = inf, s_noise: float = 1.0, generator: Generator | None = None, per_token_timesteps: Tensor | None = None, return_dict: bool = True, noise_level: float = 0.7, prev_sample: FloatTensor | None = None, sde_type: Literal['sde', 'cps', 'dance_sde'] = 'sde', return_logprobs: bool = True) FlowMatchSDEDiscreteSchedulerOutput | tuple[source]
Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion process from the learned model outputs (most often the predicted noise).
Modified from https://github.com/yifan123/flow_grpo/blob/main/flow_grpo/diffusers_patch/sd3_sde_with_logprob.py
- Parameters:
model_output (torch.FloatTensor) – The direct output from learned diffusion model.
timestep (float) – The current discrete timestep in the diffusion chain.
sample (torch.FloatTensor) – A current instance of a sample created by the diffusion process.
s_churn (float)
s_tmin (float)
s_tmax (float)
s_noise (float, defaults to 1.0) – Scaling factor for noise added to the sample.
generator (torch.Generator, optional) – A random number generator.
per_token_timesteps (torch.Tensor, optional) – The timesteps for each token in the sample.
return_dict (bool) – Whether or not to return a [~schedulers.scheduling_flow_match_euler_discrete.FlowMatchSDEDiscreteSchedulerOutput] or tuple.
noise_level (float, optional, defaults to 0.7) – The noise level used in the SDE.
prev_sample (torch.FloatTensor, optional) – The sample from the previous timestep. If not provided, it will be sampled inside the function.
sde_type (str, optional, defaults to “sde”) – The type of SDE to use. Choose between “sde”, “cps”, and “dance_sde”.
return_logprobs (bool, optional, defaults to True) – Whether to return log probabilities of the previous sample.
- class verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteSchedulerOutput(prev_sample: FloatTensor, log_prob: FloatTensor | None, prev_sample_mean: FloatTensor, std_dev_t: FloatTensor)[source]
Output class for the scheduler’s step function output.
- Parameters:
prev_sample (torch.FloatTensor of shape (batch_size, sequence_length, num_channels) for images) – Computed sample (x_{t-1}) of previous timestep. prev_sample should be used as next model input in the denoising loop.
log_prob (torch.FloatTensor of shape (batch_size,), optional) – The log probability of the previous sample.
prev_sample_mean (torch.FloatTensor of shape (batch_size, sequence_length, num_channels) for images) – The mean of the computed sample of previous timestep.
std_dev_t (torch.FloatTensor of shape (batch_size, 1, 1)) – The standard deviation used to compute prev_sample.