Pipelines Interface

Last updated: Jul 03, 2026 (API docstrings are auto-generated).

A pipeline in VeRL-Omni packages everything needed to plug a particular diffusion model architecture into the training loop:

  • a training-side adapter subclassing DiffusionModelBase that handles scheduler setup, model-input construction, and the per-step forward / reverse-sampling logic used by RL algorithms (e.g. FlowGRPO);

  • an optional rollout-side adapter registered via VllmOmniPipelineBase that hooks into vLLM-Omni’s diffusion serving stack to expose log-probabilities.

Adapters are auto-selected by matching the pair (DiffusionModelConfig.architecture, DiffusionModelConfig.algorithm) against the registered (architecture, algorithm) key. The architecture is read from the model’s model_index.json; the algorithm string is taken from the model config’s actor_rollout_ref.model.algorithm value.

verl_omni.pipelines.model_base.DiffusionModelBase

Abstract base class for diffusion model training helpers.

verl_omni.pipelines.model_base.VllmOmniPipelineBase

Registry base for vllm-omni custom diffusion pipeline classes.

verl_omni.pipelines.qwen_image_flow_grpo.QwenImage

Training adapter for the Qwen-Image diffusion model.

verl_omni.pipelines.qwen_image_mix_grpo.QwenImageMixGRPO

Training adapter for Qwen-Image with the MixGRPO algorithm.

verl_omni.pipelines.sd3_dpo.StableDiffusion3DPO

Training adapter for SD3 Diffusion-DPO.

verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteScheduler

SDE version of the FlowMatchEulerDiscreteScheduler.

Model Base

class verl_omni.pipelines.model_base.DiffusionModelBase[source]

Abstract base class for diffusion model training helpers.

Different diffusion models have very different forward / sampling logic. Subclass this ABC and implement the three abstract methods to plug your model into the verl training loop.

To register, decorate your subclass with @DiffusionModelBase.register("name", algorithm="..."). The name must match the _class_name value in the pipeline’s model_index.json (which is auto-detected into DiffusionModelConfig.architecture). The algorithm must match DiffusionModelConfig.algorithm.

Example:

@DiffusionModelBase.register("QwenImagePipeline", algorithm="flow_grpo")
class QwenImage(DiffusionModelBase):
    ...
abstractmethod classmethod build_scheduler(model_config: DiffusionModelConfig) SchedulerMixin[source]

Build and configure the diffusion scheduler for this model. The returned scheduler should have timesteps and sigmas already set.

Parameters:

model_config (DiffusionModelConfig) – the configuration of the diffusion model.

abstractmethod classmethod forward_and_sample_previous_step(module: ModelMixin, scheduler: SchedulerMixin, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)[source]

Forward the model and sample the previous step. Used for RL-algorithms based on reversed-sampling (FlowGRPO, DanceGRPO, etc.).

Parameters:
  • module (ModelMixin) – the diffusion model to be forwarded.

  • scheduler (SchedulerMixin) – the scheduler used for the diffusion process.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

  • model_inputs (dict[str, torch.Tensor]) – the inputs to the diffusion model.

  • negative_model_inputs (Optional[dict[str, torch.Tensor]]) – the negative inputs for guidance.

  • scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – the extra inputs for the scheduler, which may contain the latents and timesteps.

  • step (int) – the current step in the diffusion process.

Returns:

(log_prob, prev_sample_mean, std_dev_t, sqrt_dt)

Return type:

tuple

classmethod get_class(model_config: DiffusionModelConfig) type[DiffusionModelBase][source]

Return the registered subclass for (architecture, algorithm).

abstractmethod classmethod prepare_model_inputs(module: ModelMixin, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict, dict | None][source]

Build architecture-specific inputs for a model forward. For reverse-trajectory algorithms, latents and timesteps usually contain the full rollout trajectory and step selects the current slice. For forward-process objectives, callers may pass an already selected/noised latent and timestep directly. The caller is responsible for universal pre-processing (common tensor extraction and nested-embed unpadding) before invoking this method.

Parameters:
  • module (ModelMixin) – the diffusion transformer module.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

  • latents (torch.Tensor) – latent tensor from the micro-batch; either a full trajectory of shape (B, T, …) or a selected/noised latent of shape (B, …).

  • timesteps (torch.Tensor) – timestep tensor from the micro-batch; either a full trajectory of shape (B, T) or a selected timestep of shape (B,).

  • prompt_embeds (torch.Tensor) – dense positive prompt embeddings, shape (B, L, D).

  • prompt_embeds_mask (torch.Tensor) – attention mask for prompt_embeds, shape (B, L).

  • negative_prompt_embeds (torch.Tensor) – dense negative prompt embeddings, shape (B, L, D).

  • negative_prompt_embeds_mask (torch.Tensor) – attention mask for negative_prompt_embeds.

  • micro_batch (TensorDict) – the full micro-batch, available for architecture-specific metadata (e.g. height, width, vae_scale_factor).

  • step (int) – the current denoising step index.

classmethod register(architecture: str, algorithm: str)[source]

Class decorator that registers a subclass for (architecture, algorithm).

abstractmethod classmethod set_timesteps(scheduler: SchedulerMixin, model_config: DiffusionModelConfig, device: str)[source]

Set timesteps and sigmas on the scheduler and move them to device.

Parameters:
  • scheduler (SchedulerMixin) – the scheduler used for the diffusion process.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

  • device (str) – the device to move the timesteps and sigmas to.

class verl_omni.pipelines.model_base.VllmOmniPipelineBase[source]

Registry base for vllm-omni custom diffusion pipeline classes.

To register, decorate your custom pipeline class with @VllmOmniPipelineBase.register("name", algorithm="..."). The name must match the _class_name value in the pipeline’s model_index.json (which is auto-detected into DiffusionModelConfig.architecture). The algorithm must match DiffusionModelConfig.algorithm.

Example:

@VllmOmniPipelineBase.register("QwenImagePipeline", algorithm="flow_grpo")
class QwenImagePipelineWithLogProb(QwenImagePipeline):
    ...
classmethod get_class(architecture: str, algorithm: str) type | None[source]

Return the registered pipeline class for (architecture, algorithm), or None.

classmethod get_pipeline_path(architecture: str, algorithm: str) str | None[source]

Return the fully-qualified dotted import path for (architecture, algorithm), or None.

classmethod register(architecture: str, algorithm: str)[source]

Class decorator that registers a pipeline for (architecture, algorithm).

Pipeline Helpers

Convenience wrappers that dispatch to the registered subclass for the current architecture. The Diffusers FSDP engine and the agent loop call into these helpers rather than touching the registry directly.

verl_omni.pipelines.utils.build_scheduler(model_config: DiffusionModelConfig) SchedulerMixin[source]

Build and configure the scheduler for the diffusion model. The returned scheduler has timesteps and sigmas already set.

Parameters:

model_config (DiffusionModelConfig) – the configuration of the diffusion model.

verl_omni.pipelines.utils.forward_and_sample_previous_step(module: ModelMixin, scheduler: SchedulerMixin, model_config: DiffusionModelConfig, model_inputs: dict, negative_model_inputs: dict | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)[source]

Forward the model and sample previous step. This method is usually used for RL-algorithms based on reversed-sampling process. Such as FlowGRPO, DanceGRPO, etc.

Parameters:
  • module (ModelMixin) – the diffusion model to be forwarded.

  • scheduler (SchedulerMixin) – the scheduler used for the diffusion process.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

  • model_inputs (dict[str, torch.Tensor]) – the inputs to the diffusion model.

  • negative_model_inputs (Optional[dict[str, torch.Tensor]]) – the negative inputs for guidance.

  • scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – the extra inputs for the scheduler, which may contain the latents and timesteps.

  • step (int) – the current step in the diffusion process.

verl_omni.pipelines.utils.prepare_model_inputs(module: ModelMixin, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor | None, negative_prompt_embeds: Tensor | None, negative_prompt_embeds_mask: Tensor | None, micro_batch: TensorDict, step: int) tuple[dict, dict | None][source]

Build architecture-specific model inputs for the forward pass. Dispatches to the registered DiffusionModelBase subclass for the current architecture.

Parameters:
  • module (ModelMixin) – the diffusion transformer module.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

  • latents (torch.Tensor) – latent tensor from the micro-batch. This can be a full trajectory or an already selected/noised latent, depending on the algorithm.

  • timesteps (torch.Tensor) – timestep tensor from the micro-batch. This can be a full trajectory or an already selected timestep, depending on the algorithm.

  • prompt_embeds (torch.Tensor) – dense positive prompt embeddings, shape (B, L, D).

  • prompt_embeds_mask (torch.Tensor) – attention mask for prompt_embeds, shape (B, L).

  • negative_prompt_embeds (torch.Tensor) – dense negative prompt embeddings, shape (B, L, D).

  • negative_prompt_embeds_mask (torch.Tensor) – attention mask for negative_prompt_embeds.

  • micro_batch (TensorDict) – the full micro-batch, available for architecture-specific metadata (e.g. height, width, vae_scale_factor).

  • step (int) – the current denoising step index.

verl_omni.pipelines.utils.set_timesteps(scheduler: SchedulerMixin, model_config: DiffusionModelConfig)[source]

Set correct timesteps and sigmas for diffusion model schedulers.

Parameters:
  • scheduler (SchedulerMixin) – the scheduler used for the diffusion process.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

Schedulers

class verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteScheduler(num_train_timesteps: int = 1000, shift: float = 1.0, use_dynamic_shifting: bool = False, base_shift: float | None = 0.5, max_shift: float | None = 1.15, base_image_seq_len: int = 256, max_image_seq_len: int = 4096, invert_sigmas: bool = False, shift_terminal: float = None, use_karras_sigmas: bool = False, use_exponential_sigmas: bool = False, use_beta_sigmas: bool = False, time_shift_type: Literal['exponential', 'linear'] = 'exponential', stochastic_sampling: bool = False)[source]

SDE version of the FlowMatchEulerDiscreteScheduler. The implementation is based on FlowGRPO paper (https://arxiv.org/abs/2505.05470) and diffusers v0.37 branch.

sample_previous_step(sample: Tensor, model_output: Tensor, timestep: FloatTensor | None = None, generator: Generator | None = None, per_token_timesteps: Tensor | None = None, noise_level: float = 0.7, prev_sample: Tensor | None = None, sde_type: Literal['cps', 'sde', 'dance_sde'] = 'sde', return_logprobs: bool = True, return_sqrt_dt: bool = False, include_logprob_normalizer: bool = True)[source]

Run a single SDE / CPS reverse step.

Parameters:
  • sample (torch.FloatTensor) – A current instance of a sample created by the diffusion process.

  • model_output (torch.FloatTensor) – The direct output from learned diffusion model.

  • timestep (torch.FloatTensor, optional) – The current discrete timestep in the diffusion chain. When None, the internal step_index is used (sequential denoising loop).

  • generator (torch.Generator, optional) – A random number generator.

  • per_token_timesteps (torch.Tensor, optional) – The timesteps for each token in the sample. Currently not supported.

  • noise_level (float, optional, defaults to 0.7) – The noise level used in the SDE.

  • prev_sample (torch.FloatTensor, optional) – The sample from the previous timestep. If provided, it is used directly for log-probability computation instead of being sampled.

  • sde_type (str, optional, defaults to “sde”) – The type of SDE to use. Choose between “sde”, “cps”, and “dance_sde”.

  • return_logprobs (bool, optional, defaults to True) – Whether to return log probabilities of the previous sample.

  • return_sqrt_dt (bool, optional, defaults to False) – Whether to additionally return sqrt(-dt) as a tensor of shape (batch_size,). Used by GRPO-Guard to compute the importance-ratio normalization (see GRPOGuardLoss).

  • include_logprob_normalizer (bool, optional, defaults to True) – Whether to include Gaussian normalizer constants in log probabilities.

step(model_output: FloatTensor, timestep: float | FloatTensor, sample: FloatTensor, s_churn: float = 0.0, s_tmin: float = 0.0, s_tmax: float = inf, s_noise: float = 1.0, generator: Generator | None = None, per_token_timesteps: Tensor | None = None, return_dict: bool = True, noise_level: float = 0.7, prev_sample: FloatTensor | None = None, sde_type: Literal['sde', 'cps', 'dance_sde'] = 'sde', return_logprobs: bool = True, include_logprob_normalizer: bool = True) FlowMatchSDEDiscreteSchedulerOutput | tuple[source]

Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion process from the learned model outputs (most often the predicted noise).

Modified from https://github.com/yifan123/flow_grpo/blob/main/flow_grpo/diffusers_patch/sd3_sde_with_logprob.py

Parameters:
  • model_output (torch.FloatTensor) – The direct output from learned diffusion model.

  • timestep (float) – The current discrete timestep in the diffusion chain.

  • sample (torch.FloatTensor) – A current instance of a sample created by the diffusion process.

  • s_churn (float)

  • s_tmin (float)

  • s_tmax (float)

  • s_noise (float, defaults to 1.0) – Scaling factor for noise added to the sample.

  • generator (torch.Generator, optional) – A random number generator.

  • per_token_timesteps (torch.Tensor, optional) – The timesteps for each token in the sample.

  • return_dict (bool) – Whether or not to return a [~schedulers.scheduling_flow_match_euler_discrete.FlowMatchSDEDiscreteSchedulerOutput] or tuple.

  • noise_level (float, optional, defaults to 0.7) – The noise level used in the SDE.

  • prev_sample (torch.FloatTensor, optional) – The sample from the previous timestep. If not provided, it will be sampled inside the function.

  • sde_type (str, optional, defaults to “sde”) – The type of SDE to use. Choose between “sde”, “cps”, and “dance_sde”.

  • return_logprobs (bool, optional, defaults to True) – Whether to return log probabilities of the previous sample.

  • include_logprob_normalizer (bool, optional, defaults to True) – Whether to include Gaussian normalizer constants in log probabilities.

class verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteSchedulerOutput(prev_sample: FloatTensor, log_prob: FloatTensor | None, prev_sample_mean: FloatTensor, std_dev_t: FloatTensor)[source]

Output class for the scheduler’s step function output.

Parameters:
  • prev_sample (torch.FloatTensor of shape (batch_size, sequence_length, num_channels) for images) – Computed sample (x_{t-1}) of previous timestep. prev_sample should be used as next model input in the denoising loop.

  • log_prob (torch.FloatTensor of shape (batch_size,), optional) – The log probability of the previous sample.

  • prev_sample_mean (torch.FloatTensor of shape (batch_size, sequence_length, num_channels) for images) – The mean of the computed sample of previous timestep.

  • std_dev_t (torch.FloatTensor of shape (batch_size, 1, 1)) – The standard deviation used to compute prev_sample.

Built-in Pipelines

Qwen-Image (FlowGRPO)

class verl_omni.pipelines.qwen_image_flow_grpo.QwenImage[source]

Training adapter for the Qwen-Image diffusion model.

Implements the DiffusionModelBase interface for the QwenImagePipeline architecture, providing scheduler configuration, model-input construction, and the forward/sampling step used during RL training (e.g. FlowGRPO).

Registered under "QwenImagePipeline" so it is automatically selected when DiffusionModelConfig.architecture matches that name.

classmethod build_scheduler(model_config: DiffusionModelConfig)[source]

Build and configure the SDE scheduler for the Qwen-Image model.

Parameters:

model_config (DiffusionModelConfig) – Configuration for the diffusion model, used to determine the model path and timestep settings.

Returns:

Scheduler with timesteps already set

for the current device.

Return type:

FlowMatchSDEDiscreteScheduler

classmethod forward_and_sample_previous_step(module: QwenImageTransformer2DModel, scheduler: FlowMatchSDEDiscreteScheduler, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)[source]

Run the Qwen-Image transformer and sample the previous denoising step.

Used by RL algorithms (FlowGRPO) that require log-probabilities for reversed-sampling. Applies True-CFG guidance when model_config.true_cfg_scale > 1.0.

Parameters:
  • module (QwenImageTransformer2DModel) – The Qwen-Image transformer module.

  • scheduler (FlowMatchSDEDiscreteScheduler) – Scheduler used to sample the previous step and compute log-probabilities.

  • model_config (DiffusionModelConfig) – Configuration providing true_cfg_scale, algo.noise_level, and algo.sde_type.

  • model_inputs (dict[str, torch.Tensor]) – Positive-prompt inputs for the transformer forward pass.

  • negative_model_inputs (Optional[dict[str, torch.Tensor]]) – Negative-prompt inputs used for True-CFG; may be None when CFG is disabled.

  • scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – Must contain "all_latents" and "all_timesteps" tensors.

  • step (int) – Current denoising step index.

Returns:

A 4-tuple of (log_prob, prev_sample_mean, std_dev_t, sqrt_dt).

Return type:

tuple

classmethod prepare_model_inputs(module: QwenImageTransformer2DModel, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict, dict][source]

Build Qwen-Image-specific inputs for the transformer forward pass.

Parameters:
  • module (QwenImageTransformer2DModel) – The Qwen-Image transformer module.

  • model_config (DiffusionModelConfig) – Configuration providing guidance scale and other model settings.

  • latents (torch.Tensor) – Full latent tensor of shape (B, T, ...).

  • timesteps (torch.Tensor) – Full timestep tensor of shape (B, T).

  • prompt_embeds (torch.Tensor) – Positive prompt embeddings of shape (B, L, D).

  • prompt_embeds_mask (torch.Tensor) – Attention mask for prompt_embeds of shape (B, L).

  • negative_prompt_embeds (torch.Tensor) – Negative prompt embeddings of shape (B, L, D).

  • negative_prompt_embeds_mask (torch.Tensor) – Attention mask for negative_prompt_embeds.

  • micro_batch (TensorDict) – Micro-batch containing metadata such as height, width, and vae_scale_factor.

  • step (int) – Current denoising step index used to slice latents and timesteps.

Returns:

A pair of (model_inputs, negative_model_inputs) dicts

ready to be unpacked into the transformer forward call.

Return type:

tuple[dict, dict]

classmethod set_timesteps(scheduler: FlowMatchSDEDiscreteScheduler, model_config: DiffusionModelConfig, device: str)[source]

Configure timesteps and sigmas on the scheduler for Qwen-Image.

Parameters:
  • scheduler (FlowMatchSDEDiscreteScheduler) – The scheduler whose timesteps and sigmas will be set.

  • model_config (DiffusionModelConfig) – Configuration providing height, width, and number of inference steps.

  • device (str) – The device (e.g. "cuda") to move the timesteps to.

class verl_omni.pipelines.qwen_image_flow_grpo.QwenImagePipelineWithLogProb(*args: Any, **kwargs: Any)[source]

Rollout pipeline for Qwen-Image that captures per-step log-probabilities.

Extends QwenImagePipeline with a custom SDE-based scheduler and additional output fields required for RL training (e.g. FlowGRPO). In addition to the final generated image the pipeline returns all intermediate latents, their log-probabilities, and the corresponding timesteps.

Registered under "QwenImagePipeline" for vllm-omni rollout dispatch.

diffuse(prompt_embeds, prompt_embeds_mask, negative_prompt_embeds, negative_prompt_embeds_mask, latents, img_shapes, txt_seq_lens, negative_txt_seq_lens, timesteps, do_true_cfg, guidance, true_cfg_scale, noise_level, sde_window, sde_type, generator, logprobs)[source]

Run the full SDE diffusion loop and collect per-step rollout data.

Iterates over all timesteps, optionally applying True-CFG guidance, and collects latents and log-probabilities within the SDE window.

Parameters:
  • prompt_embeds (torch.Tensor) – Positive prompt embeddings.

  • prompt_embeds_mask (torch.Tensor) – Attention mask for prompt_embeds.

  • negative_prompt_embeds (torch.Tensor) – Negative prompt embeddings for CFG.

  • negative_prompt_embeds_mask (torch.Tensor) – Attention mask for negative_prompt_embeds.

  • latents (torch.Tensor) – Initial noisy latents.

  • img_shapes (list) – Per-sample image shapes used by the transformer.

  • txt_seq_lens (list[int]) – Sequence lengths for positive prompt embeddings.

  • negative_txt_seq_lens (list[int]) – Sequence lengths for negative prompt embeddings.

  • timesteps (torch.Tensor) – Scheduler timestep sequence.

  • do_true_cfg (bool) – Whether to apply True-CFG guidance.

  • guidance (torch.Tensor | None) – Guidance scale tensor, or None.

  • true_cfg_scale (float) – Classifier-free guidance scale.

  • noise_level (float) – SDE noise injection magnitude within the window.

  • sde_window (tuple[int, int]) – (start, end) step indices defining where SDE noise is injected and rollout data is collected.

  • sde_type (str) – SDE variant; one of "sde" or "cps".

  • generator (torch.Generator | None) – Optional random generator for reproducibility.

  • logprobs (bool) – Whether to compute and return per-step log-probabilities.

Returns:

A 4-tuple of

(latents, all_latents, all_log_probs, all_timesteps) where all_latents has shape (B, W+1, ...) (W = SDE-window length), all_log_probs has shape (B, W) or None when logprobs is False, and all_timesteps has shape (B, W).

Return type:

tuple

forward(req: vllm_omni.diffusion.request.OmniDiffusionRequest, prompt_token_ids: Tensor | list[int] | None = None, prompt_mask: Tensor | None = None, negative_prompt_ids: Tensor | list[int] | None = None, negative_prompt_mask: Tensor | None = None, true_cfg_scale: float = 4.0, height: int | None = None, width: int | None = None, num_inference_steps: int = 50, sigmas: list[float] | None = None, guidance_scale: float = 1.0, num_images_per_prompt: int = 1, generator: Generator | list[Generator] | None = None, latents: Tensor | None = None, prompt_embeds: Tensor | None = None, prompt_embeds_mask: Tensor | None = None, negative_prompt_embeds: Tensor | None = None, negative_prompt_embeds_mask: Tensor | None = None, output_type: str | None = 'pil', attention_kwargs: dict[str, Any] | None = None, callback_on_step_end_tensor_inputs: tuple[str, ...] = ('latents',), max_sequence_length: int = 512, noise_level: float = 0.7, sde_window_size: int | None = None, sde_window_range: tuple[int, int] = (0, 5), sde_type: Literal['sde', 'cps'] = 'sde', logprobs: bool = True) vllm_omni.diffusion.data.DiffusionOutput[source]

End-to-end image generation with rollout data collection.

Encodes the prompt, prepares latents, runs the SDE diffusion loop via diffuse(), and decodes the final latents through the VAE. Sampling parameters in req take precedence over the keyword arguments.

Parameters:
  • req (OmniDiffusionRequest) – Rollout request containing prompts and OmniDiffusionSamplingParams.

  • prompt_token_ids (torch.Tensor | list[int], optional) – Token IDs for the positive prompt.

  • prompt_mask (torch.Tensor, optional) – Attention mask for prompt_token_ids.

  • negative_prompt_ids (torch.Tensor | list[int], optional) – Token IDs for the negative prompt used in True-CFG.

  • negative_prompt_mask (torch.Tensor, optional) – Attention mask for negative_prompt_ids.

  • true_cfg_scale (float) – Classifier-free guidance scale; CFG is disabled when <= 1.

  • height (int, optional) – Output image height in pixels.

  • width (int, optional) – Output image width in pixels.

  • num_inference_steps (int) – Number of denoising steps.

  • sigmas (list[float], optional) – Custom sigmas for the scheduler.

  • guidance_scale (float) – Distilled guidance scale embedded in the transformer (guidance_embeds mode).

  • num_images_per_prompt (int) – Number of images to generate per prompt.

  • generator (torch.Generator | list[torch.Generator], optional) – Random generator(s) for reproducibility.

  • latents (torch.Tensor, optional) – Pre-generated initial latents; sampled from a Gaussian when None.

  • prompt_embeds (torch.Tensor, optional) – Pre-computed positive prompt embeddings; bypasses the text encoder.

  • prompt_embeds_mask (torch.Tensor, optional) – Attention mask for pre-computed prompt_embeds.

  • negative_prompt_embeds (torch.Tensor, optional) – Pre-computed negative prompt embeddings.

  • negative_prompt_embeds_mask (torch.Tensor, optional) – Attention mask for negative_prompt_embeds.

  • output_type (str, optional) – Format of the returned image; "latent" returns raw latents, otherwise the VAE-decoded image.

  • attention_kwargs (dict, optional) – Extra keyword arguments forwarded to the attention layers.

  • callback_on_step_end_tensor_inputs (tuple[str, ...]) – Names of tensors to expose in the step-end callback.

  • max_sequence_length (int) – Maximum prompt embedding sequence length.

  • noise_level (float) – SDE noise injection magnitude within the window.

  • sde_window_size (int, optional) – Number of SDE steps; when None the full timestep range is used.

  • sde_window_range (tuple[int, int]) – (start, end) range from which the SDE window start position is randomly sampled.

  • sde_type (str) – SDE variant; "sde" or "cps".

  • logprobs (bool) – Whether to compute per-step log-probabilities.

Returns:

Contains the decoded output image and a

custom_output dict with keys "all_latents", "all_log_probs", "all_timesteps", "prompt_embeds", "prompt_embeds_mask", "negative_prompt_embeds", and "negative_prompt_embeds_mask".

Return type:

DiffusionOutput

Qwen-Image (MixGRPO)

class verl_omni.pipelines.qwen_image_mix_grpo.QwenImageMixGRPO[source]

Training adapter for Qwen-Image with the MixGRPO algorithm.

classmethod build_scheduler(model_config: DiffusionModelConfig)

Build and configure the SDE scheduler for the Qwen-Image model.

Parameters:

model_config (DiffusionModelConfig) – Configuration for the diffusion model, used to determine the model path and timestep settings.

Returns:

Scheduler with timesteps already set

for the current device.

Return type:

FlowMatchSDEDiscreteScheduler

classmethod forward_and_sample_previous_step(module: QwenImageTransformer2DModel, scheduler: FlowMatchSDEDiscreteScheduler, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)

Run the Qwen-Image transformer and sample the previous denoising step.

Used by RL algorithms (FlowGRPO) that require log-probabilities for reversed-sampling. Applies True-CFG guidance when model_config.true_cfg_scale > 1.0.

Parameters:
  • module (QwenImageTransformer2DModel) – The Qwen-Image transformer module.

  • scheduler (FlowMatchSDEDiscreteScheduler) – Scheduler used to sample the previous step and compute log-probabilities.

  • model_config (DiffusionModelConfig) – Configuration providing true_cfg_scale, algo.noise_level, and algo.sde_type.

  • model_inputs (dict[str, torch.Tensor]) – Positive-prompt inputs for the transformer forward pass.

  • negative_model_inputs (Optional[dict[str, torch.Tensor]]) – Negative-prompt inputs used for True-CFG; may be None when CFG is disabled.

  • scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – Must contain "all_latents" and "all_timesteps" tensors.

  • step (int) – Current denoising step index.

Returns:

A 4-tuple of (log_prob, prev_sample_mean, std_dev_t, sqrt_dt).

Return type:

tuple

classmethod prepare_model_inputs(module: QwenImageTransformer2DModel, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict, dict]

Build Qwen-Image-specific inputs for the transformer forward pass.

Parameters:
  • module (QwenImageTransformer2DModel) – The Qwen-Image transformer module.

  • model_config (DiffusionModelConfig) – Configuration providing guidance scale and other model settings.

  • latents (torch.Tensor) – Full latent tensor of shape (B, T, ...).

  • timesteps (torch.Tensor) – Full timestep tensor of shape (B, T).

  • prompt_embeds (torch.Tensor) – Positive prompt embeddings of shape (B, L, D).

  • prompt_embeds_mask (torch.Tensor) – Attention mask for prompt_embeds of shape (B, L).

  • negative_prompt_embeds (torch.Tensor) – Negative prompt embeddings of shape (B, L, D).

  • negative_prompt_embeds_mask (torch.Tensor) – Attention mask for negative_prompt_embeds.

  • micro_batch (TensorDict) – Micro-batch containing metadata such as height, width, and vae_scale_factor.

  • step (int) – Current denoising step index used to slice latents and timesteps.

Returns:

A pair of (model_inputs, negative_model_inputs) dicts

ready to be unpacked into the transformer forward call.

Return type:

tuple[dict, dict]

classmethod set_timesteps(scheduler: FlowMatchSDEDiscreteScheduler, model_config: DiffusionModelConfig, device: str)

Configure timesteps and sigmas on the scheduler for Qwen-Image.

Parameters:
  • scheduler (FlowMatchSDEDiscreteScheduler) – The scheduler whose timesteps and sigmas will be set.

  • model_config (DiffusionModelConfig) – Configuration providing height, width, and number of inference steps.

  • device (str) – The device (e.g. "cuda") to move the timesteps to.

class verl_omni.pipelines.qwen_image_mix_grpo.QwenImageMixGRPOPipelineWithLogProb(*args: Any, **kwargs: Any)[source]

Rollout pipeline for Qwen-Image with the MixGRPO algorithm.

forward(req: vllm_omni.diffusion.request.OmniDiffusionRequest, **kwargs: Any)[source]

End-to-end image generation with rollout data collection.

Encodes the prompt, prepares latents, runs the SDE diffusion loop via diffuse(), and decodes the final latents through the VAE. Sampling parameters in req take precedence over the keyword arguments.

Parameters:
  • req (OmniDiffusionRequest) – Rollout request containing prompts and OmniDiffusionSamplingParams.

  • prompt_token_ids (torch.Tensor | list[int], optional) – Token IDs for the positive prompt.

  • prompt_mask (torch.Tensor, optional) – Attention mask for prompt_token_ids.

  • negative_prompt_ids (torch.Tensor | list[int], optional) – Token IDs for the negative prompt used in True-CFG.

  • negative_prompt_mask (torch.Tensor, optional) – Attention mask for negative_prompt_ids.

  • true_cfg_scale (float) – Classifier-free guidance scale; CFG is disabled when <= 1.

  • height (int, optional) – Output image height in pixels.

  • width (int, optional) – Output image width in pixels.

  • num_inference_steps (int) – Number of denoising steps.

  • sigmas (list[float], optional) – Custom sigmas for the scheduler.

  • guidance_scale (float) – Distilled guidance scale embedded in the transformer (guidance_embeds mode).

  • num_images_per_prompt (int) – Number of images to generate per prompt.

  • generator (torch.Generator | list[torch.Generator], optional) – Random generator(s) for reproducibility.

  • latents (torch.Tensor, optional) – Pre-generated initial latents; sampled from a Gaussian when None.

  • prompt_embeds (torch.Tensor, optional) – Pre-computed positive prompt embeddings; bypasses the text encoder.

  • prompt_embeds_mask (torch.Tensor, optional) – Attention mask for pre-computed prompt_embeds.

  • negative_prompt_embeds (torch.Tensor, optional) – Pre-computed negative prompt embeddings.

  • negative_prompt_embeds_mask (torch.Tensor, optional) – Attention mask for negative_prompt_embeds.

  • output_type (str, optional) – Format of the returned image; "latent" returns raw latents, otherwise the VAE-decoded image.

  • attention_kwargs (dict, optional) – Extra keyword arguments forwarded to the attention layers.

  • callback_on_step_end_tensor_inputs (tuple[str, ...]) – Names of tensors to expose in the step-end callback.

  • max_sequence_length (int) – Maximum prompt embedding sequence length.

  • noise_level (float) – SDE noise injection magnitude within the window.

  • sde_window_size (int, optional) – Number of SDE steps; when None the full timestep range is used.

  • sde_window_range (tuple[int, int]) – (start, end) range from which the SDE window start position is randomly sampled.

  • sde_type (str) – SDE variant; "sde" or "cps".

  • logprobs (bool) – Whether to compute per-step log-probabilities.

Returns:

Contains the decoded output image and a

custom_output dict with keys "all_latents", "all_log_probs", "all_timesteps", "prompt_embeds", "prompt_embeds_mask", "negative_prompt_embeds", and "negative_prompt_embeds_mask".

Return type:

DiffusionOutput

SD3 DPO

class verl_omni.pipelines.sd3_dpo.StableDiffusion3DPO[source]

Training adapter for SD3 Diffusion-DPO.

This adapter is intentionally limited to SD3-specific tensor preparation and transformer forwarding. The pairwise DPO objective itself belongs in verl_omni.workers.utils.losses.

classmethod build_scheduler(model_config: DiffusionModelConfig)[source]

Build and configure the SD3 flow-matching scheduler.

abstractmethod classmethod forward_and_sample_previous_step(module: ModelMixin, scheduler: SchedulerMixin, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)

Forward the model and sample the previous step. Used for RL-algorithms based on reversed-sampling (FlowGRPO, DanceGRPO, etc.).

Parameters:
  • module (ModelMixin) – the diffusion model to be forwarded.

  • scheduler (SchedulerMixin) – the scheduler used for the diffusion process.

  • model_config (DiffusionModelConfig) – the configuration of the diffusion model.

  • model_inputs (dict[str, torch.Tensor]) – the inputs to the diffusion model.

  • negative_model_inputs (Optional[dict[str, torch.Tensor]]) – the negative inputs for guidance.

  • scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – the extra inputs for the scheduler, which may contain the latents and timesteps.

  • step (int) – the current step in the diffusion process.

Returns:

(log_prob, prev_sample_mean, std_dev_t, sqrt_dt)

Return type:

tuple

classmethod prepare_model_inputs(module: ModelMixin, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict[str, Any], dict[str, Any] | None][source]

Build SD3 transformer inputs.

For DPO-specific training, callers should normally pass already-noised latents and the sampled training timesteps. latents: (B, C, H, W) # already-noised latents timesteps: (B,)

classmethod set_timesteps(scheduler: SchedulerMixin, model_config: DiffusionModelConfig, device: str)[source]

No-op for SD3.5 DPO training.

DPO flow-matching samples timesteps from the full num_train_timesteps schedule (logit-normal over ~1000 steps). Rollout / offline data prep use separate diffusers pipelines with their own inference schedulers; they are not configured through this hook.