How to Integrate a New Policy-Gradient Algorithm for Diffusion Model
Last updated: 06/02/2026.
This guide explains how to add a new PPO-like policy-gradient algorithm to VeRL-Omni’s diffusion trainer. Policy-gradient diffusion algorithms operate on the reverse denoising trajectory and train from log-probability ratios plus advantages. Examples include FlowGRPO, MixGRPO, and GRPO-Guard.
If your algorithm trains from final samples, rewards, or chosen/rejected
preferences instead of reverse-step logprobs, use
integrating_a_new_direct_preference_algorithm_for_diffusion_model.md
instead. That guide covers direct-preference algorithms such as offline DPO
and online DiffusionNFT.
The contracts described here are orthogonal to model integration: a single
policy-gradient algorithm can be extended to any number of model architectures
by pairing it with the DiffusionModelBase / VllmOmniPipelineBase adapters
described in
integrating_a_diffusion_model.md.
We use FlowGRPO
(Liu et al., 2025,
verl_omni/pipelines/qwen_image_flow_grpo/)
as the worked example throughout. It is the reference policy-gradient
algorithm in this repository and exercises every extension point.
TL;DR
A new policy-gradient algorithm needs four pieces:
An SDE step formula for the rollout — usually a new
sde_typeinFlowMatchSDEDiscreteScheduler, or a brand-new scheduler if the family changes.An advantage estimator registered with
@register_diffusion_adv_est(...).A loss function registered with
@register_diffusion_loss(...).One adapter pair per (architecture, algorithm) combination — a
DiffusionModelBasesubclass and aVllmOmniPipelineBasesubclass, both decorated with@register(architecture, algorithm="<name>").
Set algorithm.trainer_type=policy_gradient in launch scripts for
policy-gradient algorithms. The trainer entrypoint
(main_diffusion.py) then
selects PolicyGradientRayTrainer.
Mental Model
VeRL-Omni layers algorithm dispatch on top of model dispatch. At runtime:
actor_rollout_ref.model.algorithm = "flow_grpo" ← primary CLI flag
↓ (OmegaConf template) ↓ (OmegaConf template)
algorithm.adv_estimator = "flow_grpo" actor_rollout_ref.actor.diffusion_loss.loss_mode = "flow_grpo"
↓ ↓ ↓
DiffusionModelBase.get_class(arch, algo) VllmOmniPipelineBase.get_class(arch, algo)
↓ ↓
QwenImage (training adapter) QwenImagePipelineWithLogProb (rollout adapter)
loss_mode
↓
FlowGRPOLoss
All four registries (DiffusionModelBase, VllmOmniPipelineBase,
register_diffusion_adv_est, register_diffusion_loss) are wired to
actor_rollout_ref.model.algorithm via OmegaConf templates, so a single
CLI flag selects everything provided every site recognises the new
name. If your algorithm reuses an existing estimator or loss without
registering an alias, you must explicitly pin those sites back to the
existing name on the CLI; see
Reusing an existing estimator or loss
below.
Step 1 — Pick or Add an SDE Step Formula
The training and rollout sides must agree on the formula used to sample
the previous denoising step under the policy. FlowGRPO uses
FlowMatchSDEDiscreteScheduler
with sde_type="sde", which implements the standard flow-matching SDE
from the paper:
where sigma_t = sqrt(σ_t/(1-σ_t)) · noise_level.
If your algorithm reuses this family, simply call
scheduler.sample_previous_step(..., sde_type="sde", noise_level=..., ...)
from your training adapter and pass sde_type=... through to the rollout
loop. If your algorithm needs a different formula:
Preferred — add a new branch to
FlowMatchSDEDiscreteScheduler.sample_previous_stepkeyed on a newsde_typeliteral. Keep all branches numerically consistent (computepred_original_sample, thenprev_sample_mean, then optionally a Gaussian log-prob).Fallback — write a brand-new scheduler under
verl_omni/pipelines/schedulers/. This is rarely necessary; the flow-matching family covers most published PPO-like diffusion algorithms.
The scheduler must always return
(prev_sample, log_prob, prev_sample_mean, std_dev_t) in that order so
the trainer can compute the importance ratio without algorithm-specific
glue.
Step 2 — Register the Advantage Estimator
Open
verl_omni/trainer/diffusion/diffusion_algos.py
and add a member to the DiffusionAdvantageEstimator enum, then register
your function with @register_diffusion_adv_est(...):
class DiffusionAdvantageEstimator(str, Enum):
FLOW_GRPO = "flow_grpo"
# ... add new entries here
@register_diffusion_adv_est(DiffusionAdvantageEstimator.FLOW_GRPO)
def compute_flow_grpo_outcome_advantage(
sample_level_rewards: torch.Tensor,
index: np.ndarray,
norm_adv_by_std_in_grpo: bool = True,
global_std: bool = True,
config: DiffusionAlgoConfig | None = None,
**kwargs,
) -> tuple[torch.Tensor, torch.Tensor]:
"""Group-normalised outcome advantage used by FlowGRPO."""
...
return advantages, returns
The estimator receives sample_level_rewards (shape (B,)) and the
group index (the prompt UID). Return the (advantages, returns) pair
as full-batch tensors.
If your new algorithm reuses an existing estimator verbatim, just set
algorithm.adv_estimator=<existing_name> in your launch script.
If your estimator needs additional kwargs that are not already wired by
compute_advantage,
extend the if adv_estimator == DiffusionAdvantageEstimator.<NAME>: branch in
ray_diffusion_trainer.compute_advantage to forward them.
Step 3 — Register the Loss
Open
verl_omni/trainer/diffusion/diffusion_algos.py
and add the pure loss function plus the registered worker-side adapter:
@register_diffusion_loss("flow_grpo")
class FlowGRPOLoss(DiffusionLossFn):
"""Flow-GRPO clipped policy objective."""
required_model_output_keys = ("log_probs",)
required_data_keys = ("old_log_probs", "advantages")
@classmethod
def compute_loss(
cls,
*,
old_log_prob: torch.Tensor,
log_prob: torch.Tensor,
advantages: torch.Tensor,
config: DiffusionActorConfig,
) -> tuple[torch.Tensor, dict[str, Any]]:
"""Clipped-PPO objective averaged across denoising steps."""
...
return pg_loss, pg_metrics
def __call__(self, *, config, model_output, data) -> DiffusionLossResult:
pg_loss, pg_metrics = self.compute_loss(
old_log_prob=data["old_log_probs"],
log_prob=model_output["log_probs"],
advantages=data["advantages"],
config=config,
)
return DiffusionLossResult(loss=pg_loss, metrics=pg_metrics)
Finally, add the loss name to the validation list in
DiffusionLossConfig.__post_init__:
valid_modes = ["flow_grpo", "<your_new_algo>"]
Step 4 — Write the (Architecture, Algorithm) Adapter Pair
For each model architecture you want to train under the new algorithm,
add a package under
verl_omni/pipelines/<arch>_<algo>/ and register both adapters:
# verl_omni/pipelines/qwen_image_flow_grpo/diffusers_training_adapter.py
@DiffusionModelBase.register("QwenImagePipeline", algorithm="flow_grpo")
class QwenImage(DiffusionModelBase):
...
# verl_omni/pipelines/qwen_image_flow_grpo/vllm_omni_rollout_adapter.py
@VllmOmniPipelineBase.register("QwenImagePipeline", algorithm="flow_grpo")
class QwenImagePipelineWithLogProb(QwenImagePipeline):
...
The adapter contracts (the four DiffusionModelBase classmethods, the
rollout forward() shape) are documented in
integrating_a_diffusion_model.md;
nothing about them changes when you swap algorithms.
Code reuse. Algorithms in the same family typically share most adapter code. Two patterns work well:
Promote helpers. If FlowGRPO and your new algorithm share input preparation, move the common code to a shared module inside one of the packages (e.g.
verl_omni/pipelines/qwen_image_flow_grpo/common.py) and import it from both packages.Subclass the rollout. Rollout adapters are deep enough that subclassing is usually cleanest:
from verl_omni.pipelines.qwen_image_flow_grpo.vllm_omni_rollout_adapter import ( QwenImagePipelineWithLogProb, ) @VllmOmniPipelineBase.register("QwenImagePipeline", algorithm="my_algo") class QwenImageMyAlgoPipelineWithLogProb(QwenImagePipelineWithLogProb): def forward(self, req, *, sde_type="my_sde", sde_window_size=None, **kw): return super().forward(req, sde_type=sde_type, sde_window_size=sde_window_size, **kw)
Finally, add a star-import to
verl_omni/pipelines/__init__.py
so the registries learn about your package on import.
Step 5 — Wire the Config Knobs
If your algorithm exposes new rollout knobs (e.g. an sde_window_size),
add them to the DiffusionRolloutAlgoConfig block in
diffusion_rollout.yaml
and to the matching dataclass in
verl_omni/workers/config/diffusion/rollout.py.
Mirror them to the model-side block in
diffusion_model.yaml
using the ${oc.select:actor_rollout_ref.rollout.algo.<field>,<default>}
pattern so a single CLI flag toggles both contexts.
The algorithm dispatch is already wired. Setting
actor_rollout_ref.model.algorithm=<your_algo> on the CLI:
selects the
(architecture, algorithm)adapter pair (Step 4),propagates to
algorithm.adv_estimatorvia${oc.select:actor_rollout_ref.model.algorithm,flow_grpo}, andpropagates to
actor_rollout_ref.actor.diffusion_loss.loss_modevia the same pattern.
A single flag covers all four dispatch points only when every site recognises the new name — see the next subsection for the alternative.
Reusing an existing estimator or loss
If your algorithm reuses an existing estimator and/or loss (for example, MixGRPO uses FlowGRPO’s verbatim), the cascade above will propagate your new algorithm name to those sites, and the validators will reject it:
DiffusionAdvantageEstimatoris a closed enum —compute_advantagefails to look up an unknown name.DiffusionLossConfig.__post_init__checksloss_mode in valid_modesand raisesValueErrorfor anything not in the allowlist.
You have two ways out, pick whichever is cleaner for your algorithm:
Pin the cascaded fields back to the existing name. Add explicit overrides to your launch script and any documented YAML examples:
algorithm.adv_estimator=<existing_estimator> actor_rollout_ref.model.algorithm=<your_algo> actor_rollout_ref.actor.diffusion_loss.loss_mode=<existing_loss>
This is what
examples/mixgrpo_trainer/run_qwen_image_ocr_lora_mixgrpo.shdoes.Register your name as an alias in
diffusion_algos.py(decorate the existing function with both names) and add<your_algo>toDiffusionLossConfig.valid_modes. The cascade then “just works” without per-launch overrides.
Step 6 — Example Launch Script
Add a runnable example under examples/<algo>_trainer/. Copy
examples/flowgrpo_trainer/run_qwen_image_ocr_lora.sh
and update the algorithm dispatch flags:
actor_rollout_ref.model.algorithm=<your_algo> \
actor_rollout_ref.rollout.algo.sde_type=<your_sde_type> \
actor_rollout_ref.rollout.algo.noise_level=<noise_level> \
Document any algorithm-specific knobs in the example’s README.md.
Step 7 — Smoke Test
Add an end-to-end smoke test under tests/special_e2e/ modelled on
tests/special_e2e/run_flowgrpo_qwen_image.sh
and register it in
tests/gpu_smoke/run_gpu_smoke_tests.sh
as a new numbered test entry. The script must exercise the full
algorithm dispatch chain (adv estimator + loss + adapter pair + SDE
step) against a tiny-random/<ModelName> checkpoint.
Final Checklist
[ ] SDE step formula available — either an existing
sde_typeworks, or a new branch / scheduler is added underverl_omni/pipelines/schedulers/.[ ]
DiffusionAdvantageEstimator.<NAME>enum entry added and the estimator function is registered with@register_diffusion_adv_est(...).[ ] Loss function registered with
@register_diffusion_loss("<name>")and added toDiffusionLossConfig.valid_modes.[ ] One
(architecture, algorithm)adapter pair per supported model, both decorated with@register(architecture, algorithm="<name>").[ ]
verl_omni/pipelines/__init__.pystar-imports the new package.[ ] Any new
DiffusionAlgoConfigfield is mirrored in bothdiffusion_rollout.yamlanddiffusion_model.yaml.[ ] Example launch script under
examples/<algo>_trainer/.[ ] Smoke test under
tests/special_e2e/run_<algo>_<model>.shwired intotests/gpu_smoke/run_gpu_smoke_tests.sh.[ ] If the registry or adapter contract changed, update
integrating_a_diffusion_model.mdto match.