# Copyright 2026 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Preprocess the OCR dataset to parquet format (for Qwen-Image training).
You can obtain the raw dataset from https://github.com/yifan123/flow_grpo/tree/main/dataset/ocr
"""

import argparse
import os

import datasets
from verl.utils.hdfs_io import copy, makedirs


def extract_solution(solution_str):
    # The solution is stored in the format: 'The image displays "xxx".'
    return solution_str.split('"')[1]


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--hdfs_dir", default=None)
    parser.add_argument("--input_dir", default="~/data/ocr/", help="Path to the raw OCR dataset directory.")
    parser.add_argument(
        "--output_dir", default="~/data/ocr/qwen_image", help="Directory to save the preprocessed parquet files."
    )

    args = parser.parse_args()
    local_dataset_path = os.path.expanduser(args.input_dir)

    data_source = "flow_grpo/ocr"

    if local_dataset_path is not None:
        dataset = datasets.load_dataset(local_dataset_path)
    else:
        raise NotImplementedError(
            "It is not existed in huggingface hub. "
            "Please get dataset from https://github.com/yifan123/flow_grpo/tree/main/dataset/ocr"
        )

    train_dataset = dataset["train"]
    test_dataset = dataset["test"]

    system_prompt = (
        "Describe the image by detailing the color, shape, size, "
        "texture, quantity, text, spatial relationships of the objects and background:"
    )
    negative_user_prompt = " "

    def make_map_fn(split):
        def process_fn(example, idx):
            text = example.pop("text")
            solution = extract_solution(text)
            data = {
                "data_source": data_source,
                "prompt": [
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": text},
                ],
                "negative_prompt": [
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": negative_user_prompt},
                ],
                "ability": "ocr",
                "reward_model": {"style": "model", "ground_truth": solution},
                "extra_info": {"split": split, "index": idx},
            }
            return data

        return process_fn

    train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True)
    test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True)

    hdfs_dir = args.hdfs_dir
    local_save_dir = args.output_dir

    local_save_dir = os.path.expanduser(local_save_dir)
    train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))
    test_dataset.to_parquet(os.path.join(local_save_dir, "test.parquet"))

    if hdfs_dir is not None:
        makedirs(hdfs_dir)
        copy(src=local_save_dir, dst=hdfs_dir)
