Pipelines Interface
Last updated: Jul 03, 2026 (API docstrings are auto-generated).
A pipeline in VeRL-Omni packages everything needed to plug a particular diffusion model architecture into the training loop:
a training-side adapter subclassing
DiffusionModelBasethat handles scheduler setup, model-input construction, and the per-step forward / reverse-sampling logic used by RL algorithms (e.g. FlowGRPO);an optional rollout-side adapter registered via
VllmOmniPipelineBasethat hooks into vLLM-Omni’s diffusion serving stack to expose log-probabilities.
Adapters are auto-selected by matching the pair
(DiffusionModelConfig.architecture, DiffusionModelConfig.algorithm) against the
registered (architecture, algorithm) key. The architecture is read from the
model’s model_index.json; the algorithm string is taken from the model config’s
actor_rollout_ref.model.algorithm value.
Abstract base class for diffusion model training helpers. |
|
Registry base for vllm-omni custom diffusion pipeline classes. |
|
Training adapter for the Qwen-Image diffusion model. |
|
Training adapter for Qwen-Image with the MixGRPO algorithm. |
|
Training adapter for SD3 Diffusion-DPO. |
|
|
SDE version of the FlowMatchEulerDiscreteScheduler. |
Model Base
- class verl_omni.pipelines.model_base.DiffusionModelBase[source]
Abstract base class for diffusion model training helpers.
Different diffusion models have very different forward / sampling logic. Subclass this ABC and implement the three abstract methods to plug your model into the verl training loop.
To register, decorate your subclass with
@DiffusionModelBase.register("name", algorithm="..."). The name must match the_class_namevalue in the pipeline’smodel_index.json(which is auto-detected intoDiffusionModelConfig.architecture). The algorithm must matchDiffusionModelConfig.algorithm.Example:
@DiffusionModelBase.register("QwenImagePipeline", algorithm="flow_grpo") class QwenImage(DiffusionModelBase): ...
- abstractmethod classmethod build_scheduler(model_config: DiffusionModelConfig) SchedulerMixin[source]
Build and configure the diffusion scheduler for this model. The returned scheduler should have timesteps and sigmas already set.
- Parameters:
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
- abstractmethod classmethod forward_and_sample_previous_step(module: ModelMixin, scheduler: SchedulerMixin, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)[source]
Forward the model and sample the previous step. Used for RL-algorithms based on reversed-sampling (FlowGRPO, DanceGRPO, etc.).
- Parameters:
module (ModelMixin) – the diffusion model to be forwarded.
scheduler (SchedulerMixin) – the scheduler used for the diffusion process.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
model_inputs (dict[str, torch.Tensor]) – the inputs to the diffusion model.
negative_model_inputs (Optional[dict[str, torch.Tensor]]) – the negative inputs for guidance.
scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – the extra inputs for the scheduler, which may contain the latents and timesteps.
step (int) – the current step in the diffusion process.
- Returns:
(log_prob, prev_sample_mean, std_dev_t, sqrt_dt)- Return type:
tuple
- classmethod get_class(model_config: DiffusionModelConfig) type[DiffusionModelBase][source]
Return the registered subclass for
(architecture, algorithm).
- abstractmethod classmethod prepare_model_inputs(module: ModelMixin, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict, dict | None][source]
Build architecture-specific inputs for a model forward. For reverse-trajectory algorithms,
latentsandtimestepsusually contain the full rollout trajectory andstepselects the current slice. For forward-process objectives, callers may pass an already selected/noised latent and timestep directly. The caller is responsible for universal pre-processing (common tensor extraction and nested-embed unpadding) before invoking this method.- Parameters:
module (ModelMixin) – the diffusion transformer module.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
latents (torch.Tensor) – latent tensor from the micro-batch; either a full trajectory of shape (B, T, …) or a selected/noised latent of shape (B, …).
timesteps (torch.Tensor) – timestep tensor from the micro-batch; either a full trajectory of shape (B, T) or a selected timestep of shape (B,).
prompt_embeds (torch.Tensor) – dense positive prompt embeddings, shape (B, L, D).
prompt_embeds_mask (torch.Tensor) – attention mask for prompt_embeds, shape (B, L).
negative_prompt_embeds (torch.Tensor) – dense negative prompt embeddings, shape (B, L, D).
negative_prompt_embeds_mask (torch.Tensor) – attention mask for negative_prompt_embeds.
micro_batch (TensorDict) – the full micro-batch, available for architecture-specific metadata (e.g. height, width, vae_scale_factor).
step (int) – the current denoising step index.
- classmethod register(architecture: str, algorithm: str)[source]
Class decorator that registers a subclass for
(architecture, algorithm).
- abstractmethod classmethod set_timesteps(scheduler: SchedulerMixin, model_config: DiffusionModelConfig, device: str)[source]
Set timesteps and sigmas on the scheduler and move them to device.
- Parameters:
scheduler (SchedulerMixin) – the scheduler used for the diffusion process.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
device (str) – the device to move the timesteps and sigmas to.
- class verl_omni.pipelines.model_base.VllmOmniPipelineBase[source]
Registry base for vllm-omni custom diffusion pipeline classes.
To register, decorate your custom pipeline class with
@VllmOmniPipelineBase.register("name", algorithm="..."). The name must match the_class_namevalue in the pipeline’smodel_index.json(which is auto-detected intoDiffusionModelConfig.architecture). The algorithm must matchDiffusionModelConfig.algorithm.Example:
@VllmOmniPipelineBase.register("QwenImagePipeline", algorithm="flow_grpo") class QwenImagePipelineWithLogProb(QwenImagePipeline): ...
- classmethod get_class(architecture: str, algorithm: str) type | None[source]
Return the registered pipeline class for
(architecture, algorithm), orNone.
Pipeline Helpers
Convenience wrappers that dispatch to the registered subclass for the current architecture. The Diffusers FSDP engine and the agent loop call into these helpers rather than touching the registry directly.
- verl_omni.pipelines.utils.build_scheduler(model_config: DiffusionModelConfig) SchedulerMixin[source]
Build and configure the scheduler for the diffusion model. The returned scheduler has timesteps and sigmas already set.
- Parameters:
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
- verl_omni.pipelines.utils.forward_and_sample_previous_step(module: ModelMixin, scheduler: SchedulerMixin, model_config: DiffusionModelConfig, model_inputs: dict, negative_model_inputs: dict | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)[source]
Forward the model and sample previous step. This method is usually used for RL-algorithms based on reversed-sampling process. Such as FlowGRPO, DanceGRPO, etc.
- Parameters:
module (ModelMixin) – the diffusion model to be forwarded.
scheduler (SchedulerMixin) – the scheduler used for the diffusion process.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
model_inputs (dict[str, torch.Tensor]) – the inputs to the diffusion model.
negative_model_inputs (Optional[dict[str, torch.Tensor]]) – the negative inputs for guidance.
scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – the extra inputs for the scheduler, which may contain the latents and timesteps.
step (int) – the current step in the diffusion process.
- verl_omni.pipelines.utils.prepare_model_inputs(module: ModelMixin, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor | None, negative_prompt_embeds: Tensor | None, negative_prompt_embeds_mask: Tensor | None, micro_batch: TensorDict, step: int) tuple[dict, dict | None][source]
Build architecture-specific model inputs for the forward pass. Dispatches to the registered DiffusionModelBase subclass for the current architecture.
- Parameters:
module (ModelMixin) – the diffusion transformer module.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
latents (torch.Tensor) – latent tensor from the micro-batch. This can be a full trajectory or an already selected/noised latent, depending on the algorithm.
timesteps (torch.Tensor) – timestep tensor from the micro-batch. This can be a full trajectory or an already selected timestep, depending on the algorithm.
prompt_embeds (torch.Tensor) – dense positive prompt embeddings, shape (B, L, D).
prompt_embeds_mask (torch.Tensor) – attention mask for prompt_embeds, shape (B, L).
negative_prompt_embeds (torch.Tensor) – dense negative prompt embeddings, shape (B, L, D).
negative_prompt_embeds_mask (torch.Tensor) – attention mask for negative_prompt_embeds.
micro_batch (TensorDict) – the full micro-batch, available for architecture-specific metadata (e.g. height, width, vae_scale_factor).
step (int) – the current denoising step index.
- verl_omni.pipelines.utils.set_timesteps(scheduler: SchedulerMixin, model_config: DiffusionModelConfig)[source]
Set correct timesteps and sigmas for diffusion model schedulers.
- Parameters:
scheduler (SchedulerMixin) – the scheduler used for the diffusion process.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
Schedulers
- class verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteScheduler(num_train_timesteps: int = 1000, shift: float = 1.0, use_dynamic_shifting: bool = False, base_shift: float | None = 0.5, max_shift: float | None = 1.15, base_image_seq_len: int = 256, max_image_seq_len: int = 4096, invert_sigmas: bool = False, shift_terminal: float = None, use_karras_sigmas: bool = False, use_exponential_sigmas: bool = False, use_beta_sigmas: bool = False, time_shift_type: Literal['exponential', 'linear'] = 'exponential', stochastic_sampling: bool = False)[source]
SDE version of the FlowMatchEulerDiscreteScheduler. The implementation is based on FlowGRPO paper (https://arxiv.org/abs/2505.05470) and diffusers v0.37 branch.
- sample_previous_step(sample: Tensor, model_output: Tensor, timestep: FloatTensor | None = None, generator: Generator | None = None, per_token_timesteps: Tensor | None = None, noise_level: float = 0.7, prev_sample: Tensor | None = None, sde_type: Literal['cps', 'sde', 'dance_sde'] = 'sde', return_logprobs: bool = True, return_sqrt_dt: bool = False, include_logprob_normalizer: bool = True)[source]
Run a single SDE / CPS reverse step.
- Parameters:
sample (torch.FloatTensor) – A current instance of a sample created by the diffusion process.
model_output (torch.FloatTensor) – The direct output from learned diffusion model.
timestep (torch.FloatTensor, optional) – The current discrete timestep in the diffusion chain. When None, the internal step_index is used (sequential denoising loop).
generator (torch.Generator, optional) – A random number generator.
per_token_timesteps (torch.Tensor, optional) – The timesteps for each token in the sample. Currently not supported.
noise_level (float, optional, defaults to 0.7) – The noise level used in the SDE.
prev_sample (torch.FloatTensor, optional) – The sample from the previous timestep. If provided, it is used directly for log-probability computation instead of being sampled.
sde_type (str, optional, defaults to “sde”) – The type of SDE to use. Choose between “sde”, “cps”, and “dance_sde”.
return_logprobs (bool, optional, defaults to True) – Whether to return log probabilities of the previous sample.
return_sqrt_dt (bool, optional, defaults to False) – Whether to additionally return sqrt(-dt) as a tensor of shape (batch_size,). Used by GRPO-Guard to compute the importance-ratio normalization (see GRPOGuardLoss).
include_logprob_normalizer (bool, optional, defaults to True) – Whether to include Gaussian normalizer constants in log probabilities.
- step(model_output: FloatTensor, timestep: float | FloatTensor, sample: FloatTensor, s_churn: float = 0.0, s_tmin: float = 0.0, s_tmax: float = inf, s_noise: float = 1.0, generator: Generator | None = None, per_token_timesteps: Tensor | None = None, return_dict: bool = True, noise_level: float = 0.7, prev_sample: FloatTensor | None = None, sde_type: Literal['sde', 'cps', 'dance_sde'] = 'sde', return_logprobs: bool = True, include_logprob_normalizer: bool = True) FlowMatchSDEDiscreteSchedulerOutput | tuple[source]
Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion process from the learned model outputs (most often the predicted noise).
Modified from https://github.com/yifan123/flow_grpo/blob/main/flow_grpo/diffusers_patch/sd3_sde_with_logprob.py
- Parameters:
model_output (torch.FloatTensor) – The direct output from learned diffusion model.
timestep (float) – The current discrete timestep in the diffusion chain.
sample (torch.FloatTensor) – A current instance of a sample created by the diffusion process.
s_churn (float)
s_tmin (float)
s_tmax (float)
s_noise (float, defaults to 1.0) – Scaling factor for noise added to the sample.
generator (torch.Generator, optional) – A random number generator.
per_token_timesteps (torch.Tensor, optional) – The timesteps for each token in the sample.
return_dict (bool) – Whether or not to return a [~schedulers.scheduling_flow_match_euler_discrete.FlowMatchSDEDiscreteSchedulerOutput] or tuple.
noise_level (float, optional, defaults to 0.7) – The noise level used in the SDE.
prev_sample (torch.FloatTensor, optional) – The sample from the previous timestep. If not provided, it will be sampled inside the function.
sde_type (str, optional, defaults to “sde”) – The type of SDE to use. Choose between “sde”, “cps”, and “dance_sde”.
return_logprobs (bool, optional, defaults to True) – Whether to return log probabilities of the previous sample.
include_logprob_normalizer (bool, optional, defaults to True) – Whether to include Gaussian normalizer constants in log probabilities.
- class verl_omni.pipelines.schedulers.flow_match_sde.FlowMatchSDEDiscreteSchedulerOutput(prev_sample: FloatTensor, log_prob: FloatTensor | None, prev_sample_mean: FloatTensor, std_dev_t: FloatTensor)[source]
Output class for the scheduler’s step function output.
- Parameters:
prev_sample (torch.FloatTensor of shape (batch_size, sequence_length, num_channels) for images) – Computed sample (x_{t-1}) of previous timestep. prev_sample should be used as next model input in the denoising loop.
log_prob (torch.FloatTensor of shape (batch_size,), optional) – The log probability of the previous sample.
prev_sample_mean (torch.FloatTensor of shape (batch_size, sequence_length, num_channels) for images) – The mean of the computed sample of previous timestep.
std_dev_t (torch.FloatTensor of shape (batch_size, 1, 1)) – The standard deviation used to compute prev_sample.
Built-in Pipelines
Qwen-Image (FlowGRPO)
- class verl_omni.pipelines.qwen_image_flow_grpo.QwenImage[source]
Training adapter for the Qwen-Image diffusion model.
Implements the
DiffusionModelBaseinterface for theQwenImagePipelinearchitecture, providing scheduler configuration, model-input construction, and the forward/sampling step used during RL training (e.g. FlowGRPO).Registered under
"QwenImagePipeline"so it is automatically selected whenDiffusionModelConfig.architecturematches that name.- classmethod build_scheduler(model_config: DiffusionModelConfig)[source]
Build and configure the SDE scheduler for the Qwen-Image model.
- Parameters:
model_config (DiffusionModelConfig) – Configuration for the diffusion model, used to determine the model path and timestep settings.
- Returns:
- Scheduler with timesteps already set
for the current device.
- Return type:
- classmethod forward_and_sample_previous_step(module: QwenImageTransformer2DModel, scheduler: FlowMatchSDEDiscreteScheduler, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)[source]
Run the Qwen-Image transformer and sample the previous denoising step.
Used by RL algorithms (FlowGRPO) that require log-probabilities for reversed-sampling. Applies True-CFG guidance when
model_config.true_cfg_scale > 1.0.- Parameters:
module (QwenImageTransformer2DModel) – The Qwen-Image transformer module.
scheduler (FlowMatchSDEDiscreteScheduler) – Scheduler used to sample the previous step and compute log-probabilities.
model_config (DiffusionModelConfig) – Configuration providing
true_cfg_scale,algo.noise_level, andalgo.sde_type.model_inputs (dict[str, torch.Tensor]) – Positive-prompt inputs for the transformer forward pass.
negative_model_inputs (Optional[dict[str, torch.Tensor]]) – Negative-prompt inputs used for True-CFG; may be
Nonewhen CFG is disabled.scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – Must contain
"all_latents"and"all_timesteps"tensors.step (int) – Current denoising step index.
- Returns:
A 4-tuple of
(log_prob, prev_sample_mean, std_dev_t, sqrt_dt).- Return type:
tuple
- classmethod prepare_model_inputs(module: QwenImageTransformer2DModel, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict, dict][source]
Build Qwen-Image-specific inputs for the transformer forward pass.
- Parameters:
module (QwenImageTransformer2DModel) – The Qwen-Image transformer module.
model_config (DiffusionModelConfig) – Configuration providing guidance scale and other model settings.
latents (torch.Tensor) – Full latent tensor of shape
(B, T, ...).timesteps (torch.Tensor) – Full timestep tensor of shape
(B, T).prompt_embeds (torch.Tensor) – Positive prompt embeddings of shape
(B, L, D).prompt_embeds_mask (torch.Tensor) – Attention mask for prompt_embeds of shape
(B, L).negative_prompt_embeds (torch.Tensor) – Negative prompt embeddings of shape
(B, L, D).negative_prompt_embeds_mask (torch.Tensor) – Attention mask for negative_prompt_embeds.
micro_batch (TensorDict) – Micro-batch containing metadata such as
height,width, andvae_scale_factor.step (int) – Current denoising step index used to slice latents and timesteps.
- Returns:
- A pair of
(model_inputs, negative_model_inputs)dicts ready to be unpacked into the transformer forward call.
- A pair of
- Return type:
tuple[dict, dict]
- classmethod set_timesteps(scheduler: FlowMatchSDEDiscreteScheduler, model_config: DiffusionModelConfig, device: str)[source]
Configure timesteps and sigmas on the scheduler for Qwen-Image.
- Parameters:
scheduler (FlowMatchSDEDiscreteScheduler) – The scheduler whose timesteps and sigmas will be set.
model_config (DiffusionModelConfig) – Configuration providing height, width, and number of inference steps.
device (str) – The device (e.g.
"cuda") to move the timesteps to.
- class verl_omni.pipelines.qwen_image_flow_grpo.QwenImagePipelineWithLogProb(*args: Any, **kwargs: Any)[source]
Rollout pipeline for Qwen-Image that captures per-step log-probabilities.
Extends
QwenImagePipelinewith a custom SDE-based scheduler and additional output fields required for RL training (e.g. FlowGRPO). In addition to the final generated image the pipeline returns all intermediate latents, their log-probabilities, and the corresponding timesteps.Registered under
"QwenImagePipeline"for vllm-omni rollout dispatch.- diffuse(prompt_embeds, prompt_embeds_mask, negative_prompt_embeds, negative_prompt_embeds_mask, latents, img_shapes, txt_seq_lens, negative_txt_seq_lens, timesteps, do_true_cfg, guidance, true_cfg_scale, noise_level, sde_window, sde_type, generator, logprobs)[source]
Run the full SDE diffusion loop and collect per-step rollout data.
Iterates over all timesteps, optionally applying True-CFG guidance, and collects latents and log-probabilities within the SDE window.
- Parameters:
prompt_embeds (torch.Tensor) – Positive prompt embeddings.
prompt_embeds_mask (torch.Tensor) – Attention mask for prompt_embeds.
negative_prompt_embeds (torch.Tensor) – Negative prompt embeddings for CFG.
negative_prompt_embeds_mask (torch.Tensor) – Attention mask for negative_prompt_embeds.
latents (torch.Tensor) – Initial noisy latents.
img_shapes (list) – Per-sample image shapes used by the transformer.
txt_seq_lens (list[int]) – Sequence lengths for positive prompt embeddings.
negative_txt_seq_lens (list[int]) – Sequence lengths for negative prompt embeddings.
timesteps (torch.Tensor) – Scheduler timestep sequence.
do_true_cfg (bool) – Whether to apply True-CFG guidance.
guidance (torch.Tensor | None) – Guidance scale tensor, or
None.true_cfg_scale (float) – Classifier-free guidance scale.
noise_level (float) – SDE noise injection magnitude within the window.
sde_window (tuple[int, int]) –
(start, end)step indices defining where SDE noise is injected and rollout data is collected.sde_type (str) – SDE variant; one of
"sde"or"cps".generator (torch.Generator | None) – Optional random generator for reproducibility.
logprobs (bool) – Whether to compute and return per-step log-probabilities.
- Returns:
- A 4-tuple of
(latents, all_latents, all_log_probs, all_timesteps)where all_latents has shape(B, W+1, ...)(W = SDE-window length), all_log_probs has shape(B, W)orNonewhen logprobs isFalse, and all_timesteps has shape(B, W).
- Return type:
tuple
- forward(req: vllm_omni.diffusion.request.OmniDiffusionRequest, prompt_token_ids: Tensor | list[int] | None = None, prompt_mask: Tensor | None = None, negative_prompt_ids: Tensor | list[int] | None = None, negative_prompt_mask: Tensor | None = None, true_cfg_scale: float = 4.0, height: int | None = None, width: int | None = None, num_inference_steps: int = 50, sigmas: list[float] | None = None, guidance_scale: float = 1.0, num_images_per_prompt: int = 1, generator: Generator | list[Generator] | None = None, latents: Tensor | None = None, prompt_embeds: Tensor | None = None, prompt_embeds_mask: Tensor | None = None, negative_prompt_embeds: Tensor | None = None, negative_prompt_embeds_mask: Tensor | None = None, output_type: str | None = 'pil', attention_kwargs: dict[str, Any] | None = None, callback_on_step_end_tensor_inputs: tuple[str, ...] = ('latents',), max_sequence_length: int = 512, noise_level: float = 0.7, sde_window_size: int | None = None, sde_window_range: tuple[int, int] = (0, 5), sde_type: Literal['sde', 'cps'] = 'sde', logprobs: bool = True) vllm_omni.diffusion.data.DiffusionOutput[source]
End-to-end image generation with rollout data collection.
Encodes the prompt, prepares latents, runs the SDE diffusion loop via
diffuse(), and decodes the final latents through the VAE. Sampling parameters in req take precedence over the keyword arguments.- Parameters:
req (OmniDiffusionRequest) – Rollout request containing prompts and
OmniDiffusionSamplingParams.prompt_token_ids (torch.Tensor | list[int], optional) – Token IDs for the positive prompt.
prompt_mask (torch.Tensor, optional) – Attention mask for prompt_token_ids.
negative_prompt_ids (torch.Tensor | list[int], optional) – Token IDs for the negative prompt used in True-CFG.
negative_prompt_mask (torch.Tensor, optional) – Attention mask for negative_prompt_ids.
true_cfg_scale (float) – Classifier-free guidance scale; CFG is disabled when
<= 1.height (int, optional) – Output image height in pixels.
width (int, optional) – Output image width in pixels.
num_inference_steps (int) – Number of denoising steps.
sigmas (list[float], optional) – Custom sigmas for the scheduler.
guidance_scale (float) – Distilled guidance scale embedded in the transformer (
guidance_embedsmode).num_images_per_prompt (int) – Number of images to generate per prompt.
generator (torch.Generator | list[torch.Generator], optional) – Random generator(s) for reproducibility.
latents (torch.Tensor, optional) – Pre-generated initial latents; sampled from a Gaussian when
None.prompt_embeds (torch.Tensor, optional) – Pre-computed positive prompt embeddings; bypasses the text encoder.
prompt_embeds_mask (torch.Tensor, optional) – Attention mask for pre-computed prompt_embeds.
negative_prompt_embeds (torch.Tensor, optional) – Pre-computed negative prompt embeddings.
negative_prompt_embeds_mask (torch.Tensor, optional) – Attention mask for negative_prompt_embeds.
output_type (str, optional) – Format of the returned image;
"latent"returns raw latents, otherwise the VAE-decoded image.attention_kwargs (dict, optional) – Extra keyword arguments forwarded to the attention layers.
callback_on_step_end_tensor_inputs (tuple[str, ...]) – Names of tensors to expose in the step-end callback.
max_sequence_length (int) – Maximum prompt embedding sequence length.
noise_level (float) – SDE noise injection magnitude within the window.
sde_window_size (int, optional) – Number of SDE steps; when
Nonethe full timestep range is used.sde_window_range (tuple[int, int]) –
(start, end)range from which the SDE window start position is randomly sampled.sde_type (str) – SDE variant;
"sde"or"cps".logprobs (bool) – Whether to compute per-step log-probabilities.
- Returns:
- Contains the decoded output image and a
custom_output dict with keys
"all_latents","all_log_probs","all_timesteps","prompt_embeds","prompt_embeds_mask","negative_prompt_embeds", and"negative_prompt_embeds_mask".
- Return type:
Qwen-Image (MixGRPO)
- class verl_omni.pipelines.qwen_image_mix_grpo.QwenImageMixGRPO[source]
Training adapter for Qwen-Image with the MixGRPO algorithm.
- classmethod build_scheduler(model_config: DiffusionModelConfig)
Build and configure the SDE scheduler for the Qwen-Image model.
- Parameters:
model_config (DiffusionModelConfig) – Configuration for the diffusion model, used to determine the model path and timestep settings.
- Returns:
- Scheduler with timesteps already set
for the current device.
- Return type:
- classmethod forward_and_sample_previous_step(module: QwenImageTransformer2DModel, scheduler: FlowMatchSDEDiscreteScheduler, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)
Run the Qwen-Image transformer and sample the previous denoising step.
Used by RL algorithms (FlowGRPO) that require log-probabilities for reversed-sampling. Applies True-CFG guidance when
model_config.true_cfg_scale > 1.0.- Parameters:
module (QwenImageTransformer2DModel) – The Qwen-Image transformer module.
scheduler (FlowMatchSDEDiscreteScheduler) – Scheduler used to sample the previous step and compute log-probabilities.
model_config (DiffusionModelConfig) – Configuration providing
true_cfg_scale,algo.noise_level, andalgo.sde_type.model_inputs (dict[str, torch.Tensor]) – Positive-prompt inputs for the transformer forward pass.
negative_model_inputs (Optional[dict[str, torch.Tensor]]) – Negative-prompt inputs used for True-CFG; may be
Nonewhen CFG is disabled.scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – Must contain
"all_latents"and"all_timesteps"tensors.step (int) – Current denoising step index.
- Returns:
A 4-tuple of
(log_prob, prev_sample_mean, std_dev_t, sqrt_dt).- Return type:
tuple
- classmethod prepare_model_inputs(module: QwenImageTransformer2DModel, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict, dict]
Build Qwen-Image-specific inputs for the transformer forward pass.
- Parameters:
module (QwenImageTransformer2DModel) – The Qwen-Image transformer module.
model_config (DiffusionModelConfig) – Configuration providing guidance scale and other model settings.
latents (torch.Tensor) – Full latent tensor of shape
(B, T, ...).timesteps (torch.Tensor) – Full timestep tensor of shape
(B, T).prompt_embeds (torch.Tensor) – Positive prompt embeddings of shape
(B, L, D).prompt_embeds_mask (torch.Tensor) – Attention mask for prompt_embeds of shape
(B, L).negative_prompt_embeds (torch.Tensor) – Negative prompt embeddings of shape
(B, L, D).negative_prompt_embeds_mask (torch.Tensor) – Attention mask for negative_prompt_embeds.
micro_batch (TensorDict) – Micro-batch containing metadata such as
height,width, andvae_scale_factor.step (int) – Current denoising step index used to slice latents and timesteps.
- Returns:
- A pair of
(model_inputs, negative_model_inputs)dicts ready to be unpacked into the transformer forward call.
- A pair of
- Return type:
tuple[dict, dict]
- classmethod set_timesteps(scheduler: FlowMatchSDEDiscreteScheduler, model_config: DiffusionModelConfig, device: str)
Configure timesteps and sigmas on the scheduler for Qwen-Image.
- Parameters:
scheduler (FlowMatchSDEDiscreteScheduler) – The scheduler whose timesteps and sigmas will be set.
model_config (DiffusionModelConfig) – Configuration providing height, width, and number of inference steps.
device (str) – The device (e.g.
"cuda") to move the timesteps to.
- class verl_omni.pipelines.qwen_image_mix_grpo.QwenImageMixGRPOPipelineWithLogProb(*args: Any, **kwargs: Any)[source]
Rollout pipeline for Qwen-Image with the MixGRPO algorithm.
- forward(req: vllm_omni.diffusion.request.OmniDiffusionRequest, **kwargs: Any)[source]
End-to-end image generation with rollout data collection.
Encodes the prompt, prepares latents, runs the SDE diffusion loop via
diffuse(), and decodes the final latents through the VAE. Sampling parameters in req take precedence over the keyword arguments.- Parameters:
req (OmniDiffusionRequest) – Rollout request containing prompts and
OmniDiffusionSamplingParams.prompt_token_ids (torch.Tensor | list[int], optional) – Token IDs for the positive prompt.
prompt_mask (torch.Tensor, optional) – Attention mask for prompt_token_ids.
negative_prompt_ids (torch.Tensor | list[int], optional) – Token IDs for the negative prompt used in True-CFG.
negative_prompt_mask (torch.Tensor, optional) – Attention mask for negative_prompt_ids.
true_cfg_scale (float) – Classifier-free guidance scale; CFG is disabled when
<= 1.height (int, optional) – Output image height in pixels.
width (int, optional) – Output image width in pixels.
num_inference_steps (int) – Number of denoising steps.
sigmas (list[float], optional) – Custom sigmas for the scheduler.
guidance_scale (float) – Distilled guidance scale embedded in the transformer (
guidance_embedsmode).num_images_per_prompt (int) – Number of images to generate per prompt.
generator (torch.Generator | list[torch.Generator], optional) – Random generator(s) for reproducibility.
latents (torch.Tensor, optional) – Pre-generated initial latents; sampled from a Gaussian when
None.prompt_embeds (torch.Tensor, optional) – Pre-computed positive prompt embeddings; bypasses the text encoder.
prompt_embeds_mask (torch.Tensor, optional) – Attention mask for pre-computed prompt_embeds.
negative_prompt_embeds (torch.Tensor, optional) – Pre-computed negative prompt embeddings.
negative_prompt_embeds_mask (torch.Tensor, optional) – Attention mask for negative_prompt_embeds.
output_type (str, optional) – Format of the returned image;
"latent"returns raw latents, otherwise the VAE-decoded image.attention_kwargs (dict, optional) – Extra keyword arguments forwarded to the attention layers.
callback_on_step_end_tensor_inputs (tuple[str, ...]) – Names of tensors to expose in the step-end callback.
max_sequence_length (int) – Maximum prompt embedding sequence length.
noise_level (float) – SDE noise injection magnitude within the window.
sde_window_size (int, optional) – Number of SDE steps; when
Nonethe full timestep range is used.sde_window_range (tuple[int, int]) –
(start, end)range from which the SDE window start position is randomly sampled.sde_type (str) – SDE variant;
"sde"or"cps".logprobs (bool) – Whether to compute per-step log-probabilities.
- Returns:
- Contains the decoded output image and a
custom_output dict with keys
"all_latents","all_log_probs","all_timesteps","prompt_embeds","prompt_embeds_mask","negative_prompt_embeds", and"negative_prompt_embeds_mask".
- Return type:
SD3 DPO
- class verl_omni.pipelines.sd3_dpo.StableDiffusion3DPO[source]
Training adapter for SD3 Diffusion-DPO.
This adapter is intentionally limited to SD3-specific tensor preparation and transformer forwarding. The pairwise DPO objective itself belongs in
verl_omni.workers.utils.losses.- classmethod build_scheduler(model_config: DiffusionModelConfig)[source]
Build and configure the SD3 flow-matching scheduler.
- abstractmethod classmethod forward_and_sample_previous_step(module: ModelMixin, scheduler: SchedulerMixin, model_config: DiffusionModelConfig, model_inputs: dict[str, Tensor], negative_model_inputs: dict[str, Tensor] | None, scheduler_inputs: TensorDict | dict[str, Tensor] | None, step: int)
Forward the model and sample the previous step. Used for RL-algorithms based on reversed-sampling (FlowGRPO, DanceGRPO, etc.).
- Parameters:
module (ModelMixin) – the diffusion model to be forwarded.
scheduler (SchedulerMixin) – the scheduler used for the diffusion process.
model_config (DiffusionModelConfig) – the configuration of the diffusion model.
model_inputs (dict[str, torch.Tensor]) – the inputs to the diffusion model.
negative_model_inputs (Optional[dict[str, torch.Tensor]]) – the negative inputs for guidance.
scheduler_inputs (Optional[TensorDict | dict[str, torch.Tensor]]) – the extra inputs for the scheduler, which may contain the latents and timesteps.
step (int) – the current step in the diffusion process.
- Returns:
(log_prob, prev_sample_mean, std_dev_t, sqrt_dt)- Return type:
tuple
- classmethod prepare_model_inputs(module: ModelMixin, model_config: DiffusionModelConfig, latents: Tensor, timesteps: Tensor, prompt_embeds: Tensor, prompt_embeds_mask: Tensor, negative_prompt_embeds: Tensor, negative_prompt_embeds_mask: Tensor, micro_batch: TensorDict, step: int) tuple[dict[str, Any], dict[str, Any] | None][source]
Build SD3 transformer inputs.
For DPO-specific training, callers should normally pass already-noised latents and the sampled training timesteps. latents: (B, C, H, W) # already-noised latents timesteps: (B,)
- classmethod set_timesteps(scheduler: SchedulerMixin, model_config: DiffusionModelConfig, device: str)[source]
No-op for SD3.5 DPO training.
DPO flow-matching samples timesteps from the full
num_train_timestepsschedule (logit-normal over ~1000 steps). Rollout / offline data prep use separate diffusers pipelines with their own inference schedulers; they are not configured through this hook.