Pipelines Interface

Last updated: Jun 05, 2026 (API docstrings are auto-generated).

A pipeline in VeRL-Omni packages everything needed to plug a particular diffusion model architecture into the training loop:

  • a training-side adapter subclassing DiffusionModelBase that handles scheduler setup, model-input construction, and the per-step forward / reverse-sampling logic used by RL algorithms (e.g. FlowGRPO);

  • an optional rollout-side adapter registered via VllmOmniPipelineBase that hooks into vLLM-Omni’s diffusion serving stack to expose log-probabilities.

Adapters are auto-selected by matching the pair (DiffusionModelConfig.architecture, DiffusionModelConfig.algorithm) against the registered (architecture, algorithm) key. The architecture is read from the model’s model_index.json; the algorithm string is taken from the model config’s actor_rollout_ref.model.algorithm value.

verl_omni.pipelines.model_base.DiffusionModelBase

Abstract base class for diffusion model training helpers.

verl_omni.pipelines.model_base.VllmOmniPipelineBase

Registry base for vllm-omni custom diffusion pipeline classes.

verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteScheduler

SDE version of the FlowMatchEulerDiscreteScheduler.

Model Base

class verl_omni.pipelines.model_base.DiffusionModelBase[source]

Abstract base class for diffusion model training helpers.

Different diffusion models have very different forward / sampling logic. Subclass this ABC and implement the three abstract methods to plug your model into the verl training loop.

To register, decorate your subclass with @DiffusionModelBase.register("name", algorithm="..."). The name must match the _class_name value in the pipeline’s model_index.json (which is auto-detected into DiffusionModelConfig.architecture). The algorithm must match DiffusionModelConfig.algorithm.

Example:

@DiffusionModelBase.register("QwenImagePipeline", algorithm="flow_grpo")
class QwenImage(DiffusionModelBase):
    ...
abstractmethod classmethod build_scheduler(model_config: DiffusionModelConfig) SchedulerMixin[source]

Build and configure the diffusion scheduler for this model. The returned scheduler should have timesteps and sigmas already set.

Parameters:

model_config (DiffusionModelConfig) – the configuration of the diffusion model.

abstractmethod classmethod forward_and_sample_previous_step(module: ModelMixin, scheduler: SchedulerMixin, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)[source]

Forward the model and sample the previous step. Used for RL-algorithms based on reversed-sampling (FlowGRPO, DanceGRPO, etc.).

Parameters:
  • module (ModelMixin) – the diffusion model to be forwarded.

  • scheduler (SchedulerMixin) – the scheduler used for the diffusion process.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

  • model_inputs (dict[str, torch.Tensor]) – the inputs to the diffusion model.

  • negative_model_inputs (Optional[dict[str, torch.Tensor]]) – the negative inputs for guidance.

  • scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – the extra inputs for the scheduler, which may contain the latents and timesteps.

  • step (int) – the current step in the diffusion process.

Returns:

(log_prob, prev_sample_mean, std_dev_t, sqrt_dt)

Return type:

tuple

classmethod get_class(model_config: DiffusionModelConfig) type[DiffusionModelBase][source]

Return the registered subclass for (architecture, algorithm).

abstractmethod classmethod prepare_model_inputs(module: ModelMixin, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict, dict | None][source]

Build architecture-specific inputs for a model forward. For reverse-trajectory algorithms, latents and timesteps usually contain the full rollout trajectory and step selects the current slice. For forward-process objectives, callers may pass an already selected/noised latent and timestep directly. The caller is responsible for universal pre-processing (common tensor extraction and nested-embed unpadding) before invoking this method.

Parameters:
  • module (ModelMixin) – the diffusion transformer module.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

  • latents (torch.Tensor) – latent tensor from the micro-batch; either a full trajectory of shape (B, T, …) or a selected/noised latent of shape (B, …).

  • timesteps (torch.Tensor) – timestep tensor from the micro-batch; either a full trajectory of shape (B, T) or a selected timestep of shape (B,).

  • prompt_embeds (torch.Tensor) – dense positive prompt embeddings, shape (B, L, D).

  • prompt_embeds_mask (torch.Tensor) – attention mask for prompt_embeds, shape (B, L).

  • negative_prompt_embeds (torch.Tensor) – dense negative prompt embeddings, shape (B, L, D).

  • negative_prompt_embeds_mask (torch.Tensor) – attention mask for negative_prompt_embeds.

  • micro_batch (TensorDict) – the full micro-batch, available for architecture-specific metadata (e.g. height, width, vae_scale_factor).

  • step (int) – the current denoising step index.

classmethod register(architecture: str, algorithm: str)[source]

Class decorator that registers a subclass for (architecture, algorithm).

abstractmethod classmethod set_timesteps(scheduler: SchedulerMixin, model_config: DiffusionModelConfig, device: str)[source]

Set timesteps and sigmas on the scheduler and move them to device.

Parameters:
  • scheduler (SchedulerMixin) – the scheduler used for the diffusion process.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

  • device (str) – the device to move the timesteps and sigmas to.

class verl_omni.pipelines.model_base.VllmOmniPipelineBase[source]

Registry base for vllm-omni custom diffusion pipeline classes.

To register, decorate your custom pipeline class with @VllmOmniPipelineBase.register("name", algorithm="..."). The name must match the _class_name value in the pipeline’s model_index.json (which is auto-detected into DiffusionModelConfig.architecture). The algorithm must match DiffusionModelConfig.algorithm.

Example:

@VllmOmniPipelineBase.register("QwenImagePipeline", algorithm="flow_grpo")
class QwenImagePipelineWithLogProb(QwenImagePipeline):
    ...
classmethod get_class(architecture: str, algorithm: str) type | None[source]

Return the registered pipeline class for (architecture, algorithm), or None.

classmethod get_pipeline_path(architecture: str, algorithm: str) str | None[source]

Return the fully-qualified dotted import path for (architecture, algorithm), or None.

classmethod register(architecture: str, algorithm: str)[source]

Class decorator that registers a pipeline for (architecture, algorithm).

Pipeline Helpers

Convenience wrappers that dispatch to the registered subclass for the current architecture. The Diffusers FSDP engine and the agent loop call into these helpers rather than touching the registry directly.

Schedulers

class verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteScheduler(num_train_timesteps: int = 1000, shift: float = 1.0, use_dynamic_shifting: bool = False, base_shift: float | None = 0.5, max_shift: float | None = 1.15, base_image_seq_len: int = 256, max_image_seq_len: int = 4096, invert_sigmas: bool = False, shift_terminal: float = None, use_karras_sigmas: bool = False, use_exponential_sigmas: bool = False, use_beta_sigmas: bool = False, time_shift_type: Literal['exponential', 'linear'] = 'exponential', stochastic_sampling: bool = False)[source]

SDE version of the FlowMatchEulerDiscreteScheduler. The implementation is based on FlowGRPO paper (https://arxiv.org/abs/2505.05470) and diffusers v0.37 branch.

sample_previous_step(sample: Tensor, model_output: Tensor, timestep: FloatTensor | None = None, generator: Generator | None = None, per_token_timesteps: Tensor | None = None, noise_level: float = 0.7, prev_sample: Tensor | None = None, sde_type: Literal['cps', 'sde', 'dance_sde'] = 'sde', return_logprobs: bool = True, return_sqrt_dt: bool = False)[source]

Run a single SDE / CPS reverse step.

Parameters:
  • sample (torch.FloatTensor) – A current instance of a sample created by the diffusion process.

  • model_output (torch.FloatTensor) – The direct output from learned diffusion model.

  • timestep (torch.FloatTensor, optional) – The current discrete timestep in the diffusion chain. When None, the internal step_index is used (sequential denoising loop).

  • generator (torch.Generator, optional) – A random number generator.

  • per_token_timesteps (torch.Tensor, optional) – The timesteps for each token in the sample. Currently not supported.

  • noise_level (float, optional, defaults to 0.7) – The noise level used in the SDE.

  • prev_sample (torch.FloatTensor, optional) – The sample from the previous timestep. If provided, it is used directly for log-probability computation instead of being sampled.

  • sde_type (str, optional, defaults to “sde”) – The type of SDE to use. Choose between “sde”, “cps”, and “dance_sde”.

  • return_logprobs (bool, optional, defaults to True) – Whether to return log probabilities of the previous sample.

  • return_sqrt_dt (bool, optional, defaults to False) – Whether to additionally return sqrt(-dt) as a tensor of shape (batch_size,). Used by GRPO-Guard to compute the importance-ratio normalization (see GRPOGuardLoss).

step(model_output: FloatTensor, timestep: float | FloatTensor, sample: FloatTensor, s_churn: float = 0.0, s_tmin: float = 0.0, s_tmax: float = inf, s_noise: float = 1.0, generator: Generator | None = None, per_token_timesteps: Tensor | None = None, return_dict: bool = True, noise_level: float = 0.7, prev_sample: FloatTensor | None = None, sde_type: Literal['sde', 'cps', 'dance_sde'] = 'sde', return_logprobs: bool = True) FlowMatchSDEDiscreteSchedulerOutput | tuple[source]

Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion process from the learned model outputs (most often the predicted noise).

Modified from https://github.com/yifan123/flow_grpo/blob/main/flow_grpo/diffusers_patch/sd3_sde_with_logprob.py

Parameters:
  • model_output (torch.FloatTensor) – The direct output from learned diffusion model.

  • timestep (float) – The current discrete timestep in the diffusion chain.

  • sample (torch.FloatTensor) – A current instance of a sample created by the diffusion process.

  • s_churn (float)

  • s_tmin (float)

  • s_tmax (float)

  • s_noise (float, defaults to 1.0) – Scaling factor for noise added to the sample.

  • generator (torch.Generator, optional) – A random number generator.

  • per_token_timesteps (torch.Tensor, optional) – The timesteps for each token in the sample.

  • return_dict (bool) – Whether or not to return a [~schedulers.scheduling_flow_match_euler_discrete.FlowMatchSDEDiscreteSchedulerOutput] or tuple.

  • noise_level (float, optional, defaults to 0.7) – The noise level used in the SDE.

  • prev_sample (torch.FloatTensor, optional) – The sample from the previous timestep. If not provided, it will be sampled inside the function.

  • sde_type (str, optional, defaults to “sde”) – The type of SDE to use. Choose between “sde”, “cps”, and “dance_sde”.

  • return_logprobs (bool, optional, defaults to True) – Whether to return log probabilities of the previous sample.

class verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteSchedulerOutput(prev_sample: FloatTensor, log_prob: FloatTensor | None, prev_sample_mean: FloatTensor, std_dev_t: FloatTensor)[source]

Output class for the scheduler’s step function output.

Parameters:
  • prev_sample (torch.FloatTensor of shape (batch_size, sequence_length, num_channels) for images) – Computed sample (x_{t-1}) of previous timestep. prev_sample should be used as next model input in the denoising loop.

  • log_prob (torch.FloatTensor of shape (batch_size,), optional) – The log probability of the previous sample.

  • prev_sample_mean (torch.FloatTensor of shape (batch_size, sequence_length, num_channels) for images) – The mean of the computed sample of previous timestep.

  • std_dev_t (torch.FloatTensor of shape (batch_size, 1, 1)) – The standard deviation used to compute prev_sample.

Built-in Pipelines

Qwen-Image (FlowGRPO)

Qwen-Image (MixGRPO)

SD3 DPO