Spaces:
Paused
Paused
| # Copyright 2020-2025 The HuggingFace Team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import gc | |
| import os | |
| import tempfile | |
| import unittest | |
| import warnings | |
| import numpy as np | |
| import pytest | |
| import torch | |
| from accelerate.utils.memory import release_memory | |
| from datasets import Dataset, Features, Image, Value, load_dataset | |
| from parameterized import parameterized | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoModelForImageTextToText, | |
| AutoProcessor, | |
| AutoTokenizer, | |
| BitsAndBytesConfig, | |
| ) | |
| from transformers.testing_utils import ( | |
| backend_empty_cache, | |
| require_bitsandbytes, | |
| require_flash_attn, | |
| require_liger_kernel, | |
| require_peft, | |
| require_torch_accelerator, | |
| torch_device, | |
| ) | |
| from transformers.utils import is_peft_available | |
| from trl import GRPOConfig, GRPOTrainer | |
| from trl.trainer.utils import get_kbit_device_map | |
| from ..testing_utils import require_vllm | |
| from .testing_constants import MODELS_TO_TEST | |
| if is_peft_available(): | |
| from peft import LoraConfig, PeftModel | |
| class GRPOTrainerSlowTester(unittest.TestCase): | |
| def setUp(self): | |
| self.train_dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") | |
| self.eval_dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="test") | |
| self.max_length = 128 | |
| def tearDown(self): | |
| gc.collect() | |
| backend_empty_cache(torch_device) | |
| gc.collect() | |
| def test_training_with_liger_grpo_loss(self, model_name): | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| training_args = GRPOConfig( | |
| output_dir=tmp_dir, | |
| per_device_train_batch_size=3, | |
| num_generations=3, | |
| use_liger_loss=True, | |
| max_completion_length=self.max_length, | |
| report_to="none", | |
| logging_strategy="no", | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained(model_name) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| tokenizer.pad_token = tokenizer.eos_token if tokenizer.pad_token is None else tokenizer.pad_token | |
| trainer = GRPOTrainer( | |
| model=model, | |
| reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", | |
| args=training_args, | |
| train_dataset=self.train_dataset, | |
| eval_dataset=self.eval_dataset, | |
| processing_class=tokenizer, | |
| ) | |
| from liger_kernel.chunked_loss import LigerFusedLinearGRPOLoss | |
| assert isinstance(trainer.liger_grpo_loss, LigerFusedLinearGRPOLoss) | |
| previous_trainable_params = {n: param.clone() for n, param in model.named_parameters()} | |
| trainer.train() | |
| for n, param in previous_trainable_params.items(): | |
| new_param = model.get_parameter(n) | |
| self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") | |
| release_memory(model, trainer) | |
| def test_training_with_liger_grpo_loss_and_peft(self, model_name): | |
| from peft import LoraConfig, TaskType | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| training_args = GRPOConfig( | |
| output_dir=tmp_dir, | |
| per_device_train_batch_size=3, | |
| num_generations=3, | |
| use_liger_loss=True, | |
| max_completion_length=self.max_length, | |
| report_to="none", | |
| logging_strategy="no", | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained(model_name) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| tokenizer.pad_token = tokenizer.eos_token if tokenizer.pad_token is None else tokenizer.pad_token | |
| # Configure PEFT with LoRA | |
| peft_config = LoraConfig( | |
| task_type=TaskType.CAUSAL_LM, | |
| inference_mode=False, | |
| r=8, | |
| lora_alpha=32, | |
| lora_dropout=0.1, | |
| target_modules=["q_proj", "v_proj"], | |
| ) | |
| trainer = GRPOTrainer( | |
| model=model, | |
| reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", | |
| args=training_args, | |
| train_dataset=self.train_dataset, | |
| eval_dataset=self.eval_dataset, | |
| processing_class=tokenizer, | |
| peft_config=peft_config, | |
| ) | |
| from liger_kernel.chunked_loss import LigerFusedLinearGRPOLoss | |
| assert isinstance(trainer.liger_grpo_loss, LigerFusedLinearGRPOLoss) | |
| # Verify PEFT adapter is properly initialized | |
| from peft import PeftModel | |
| self.assertTrue(isinstance(trainer.model, PeftModel), "Model should be wrapped with PEFT") | |
| # Store adapter weights before training | |
| previous_trainable_params = { | |
| n: param.clone() for n, param in trainer.model.named_parameters() if param.requires_grad | |
| } | |
| self.assertTrue(len(previous_trainable_params) > 0, "No trainable parameters found in PEFT model") | |
| trainer.train() | |
| # Verify adapter weights have changed after training | |
| for n, param in previous_trainable_params.items(): | |
| new_param = trainer.model.get_parameter(n) | |
| self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") | |
| release_memory(model, trainer) | |
| def test_training_with_transformers_paged(self, model_name): | |
| """Test that training works with transformers paged implementation (requires GPU).""" | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| training_args = GRPOConfig( | |
| output_dir=tmp_dir, | |
| learning_rate=0.1, # increase the learning rate to speed up the test | |
| per_device_train_batch_size=3, # reduce the batch size to reduce memory usage | |
| num_generations=3, # reduce the number of generations to reduce memory usage | |
| max_completion_length=8, # reduce the completion length to reduce memory usage | |
| use_transformers_paged=True, # Enable transformers paged implementation | |
| report_to="none", | |
| logging_strategy="no", | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained(model_name) | |
| trainer = GRPOTrainer( | |
| model=model, | |
| reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", | |
| args=training_args, | |
| train_dataset=self.train_dataset, | |
| ) | |
| previous_trainable_params = {n: param.clone() for n, param in model.named_parameters()} | |
| trainer.train() | |
| self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) | |
| # Check that the params have changed | |
| for n, param in previous_trainable_params.items(): | |
| new_param = model.get_parameter(n) | |
| self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") | |
| release_memory(model, trainer) | |
| def test_vlm_training(self, model_name): | |
| """ | |
| Test VLM training with aggressive memory optimization. | |
| This test uses multiple memory reduction techniques: | |
| - 4-bit quantization with double quantization | |
| - LoRA with very low rank (r=4) | |
| - Minimal batch size (1) with gradient accumulation | |
| - Small images (64x64 instead of 224x224) | |
| - Short sequences (max_completion_length=8) | |
| - Only 4 training samples | |
| - Only 1 training step | |
| - Gradient checkpointing and bfloat16 | |
| """ | |
| # Create processor once outside the data generator | |
| processor = AutoProcessor.from_pretrained(model_name, use_fast=True, padding_side="left") | |
| conversation = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image"}, | |
| {"type": "text", "text": "What is in the image?"}, | |
| ], | |
| }, | |
| ] | |
| prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) | |
| def data_gen(num_samples): | |
| for _ in range(num_samples): | |
| yield { | |
| "prompt": prompt, | |
| "image": np.random.uniform(low=0.0, high=255.0, size=(64, 64, 3)).astype( | |
| np.uint8 | |
| ), # Much smaller images | |
| } | |
| dataset = Dataset.from_generator( | |
| data_gen, gen_kwargs={"num_samples": 4}, features=Features(image=Image(), prompt=Value(dtype="string")) | |
| ) | |
| # reduce memory requirements as much as possible | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype="bfloat16", | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_storage="bfloat16", | |
| ) | |
| model = AutoModelForImageTextToText.from_pretrained( | |
| model_name, | |
| attn_implementation="flash_attention_2", | |
| torch_dtype="bfloat16", | |
| device_map=get_kbit_device_map(), | |
| quantization_config=quantization_config, | |
| ) | |
| def reward_func(prompts, completions, **kwargs): | |
| # simple nonsensical reward | |
| return [-((len(c) - 25) ** 2) + 100 for c in completions] | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| training_args = GRPOConfig( | |
| output_dir=tmp_dir, | |
| learning_rate=0.1, | |
| per_device_train_batch_size=1, # Minimal batch size | |
| gradient_accumulation_steps=2, # Maintain effective batch size | |
| num_generations=2, | |
| max_completion_length=8, # Much shorter completions | |
| max_prompt_length=None, # Don't limit prompt length for VLM | |
| bf16=True, # Use bfloat16 precision | |
| max_steps=1, # Only do 1 training step to save time and memory | |
| report_to="none", | |
| logging_strategy="no", | |
| ) | |
| lora_config = LoraConfig( | |
| task_type="CAUSAL_LM", | |
| r=4, # Much lower rank for minimal memory | |
| lora_alpha=8, # Reduced alpha proportionally | |
| lora_dropout=0.1, | |
| target_modules=["q_proj", "v_proj"], # Minimal target modules | |
| # For VLM models, we typically want to freeze the vision encoder | |
| # and only adapt the language model parameters | |
| modules_to_save=None, | |
| ) | |
| try: | |
| trainer = GRPOTrainer( | |
| model=model, | |
| processing_class=processor, | |
| reward_funcs=[reward_func], | |
| args=training_args, | |
| train_dataset=dataset, | |
| peft_config=lora_config, | |
| ) | |
| self.assertIsInstance(trainer.model, PeftModel) | |
| previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} | |
| trainer.train() | |
| self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) | |
| # Check that LoRA parameters have changed | |
| # For VLM models, we're more permissive about which parameters can change | |
| lora_params_changed = False | |
| for n, param in previous_trainable_params.items(): | |
| new_param = trainer.model.get_parameter(n) | |
| if "lora" in n.lower(): # LoRA parameters should change | |
| if not torch.equal(param, new_param): | |
| lora_params_changed = True | |
| # At least some LoRA parameters should have changed during training | |
| self.assertTrue(lora_params_changed, "No LoRA parameters were updated during training.") | |
| except torch.OutOfMemoryError as e: | |
| self.skipTest(f"Skipping VLM training test due to insufficient GPU memory: {e}") | |
| except Exception as e: | |
| # Check for other memory-related errors | |
| if any(keyword in str(e).lower() for keyword in ["memory", "cuda", "out of memory", "insufficient"]): | |
| self.skipTest(f"Skipping VLM training test due to hardware constraints: {e}") | |
| else: | |
| raise | |
| release_memory(model, trainer) | |
| def test_vlm_processor_vllm_colocate_mode(self): | |
| """ | |
| Test that VLM processors work with vLLM in colocate mode. | |
| This test uses multiple memory optimization techniques to ensure it runs on limited hardware: | |
| - LoRA (Low-Rank Adaptation) with minimal rank (r=4) | |
| - 4-bit quantization with BitsAndBytesConfig | |
| - Gradient checkpointing | |
| - bfloat16 precision | |
| - Minimal batch sizes and sequence lengths | |
| - Very low GPU memory utilization (5%) | |
| """ | |
| dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| config = GRPOConfig( | |
| output_dir=tmp_dir, | |
| per_device_train_batch_size=1, # Minimal batch size | |
| gradient_accumulation_steps=2, # Make effective batch size 2, divisible by num_generations | |
| num_generations=2, | |
| max_completion_length=4, # Very short completions to reduce memory | |
| max_prompt_length=32, # Very short prompts to reduce memory | |
| use_vllm=True, # Enable vLLM | |
| vllm_mode="colocate", # Use colocate mode to avoid server dependency | |
| vllm_gpu_memory_utilization=0.05, # Use minimal GPU memory (5%) | |
| gradient_checkpointing=True, # Enable gradient checkpointing to save memory | |
| bf16=True, # Use bfloat16 to reduce memory | |
| report_to="none", | |
| logging_strategy="no", | |
| ) | |
| # Create a VLM processor | |
| processor = AutoProcessor.from_pretrained( | |
| "HuggingFaceTB/SmolVLM-Instruct", use_fast=True, padding_side="left" | |
| ) | |
| # Verify processor has both required attributes for VLM detection | |
| self.assertTrue(hasattr(processor, "tokenizer")) | |
| self.assertTrue(hasattr(processor, "image_processor")) | |
| def dummy_reward_func(completions, **kwargs): | |
| return [1.0] * len(completions) | |
| # Use LoRA configuration for memory efficiency | |
| lora_config = LoraConfig( | |
| r=4, # Very low rank for minimal memory | |
| lora_alpha=8, | |
| target_modules=["q_proj", "v_proj"], # Minimal target modules | |
| lora_dropout=0.1, | |
| bias="none", | |
| task_type="CAUSAL_LM", | |
| ) | |
| # Use 4-bit quantization for further memory reduction | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.bfloat16, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True, | |
| ) | |
| original_env = {} | |
| required_env_vars = { | |
| "RANK": "0", | |
| "LOCAL_RANK": "0", | |
| "WORLD_SIZE": "1", | |
| "LOCAL_WORLD_SIZE": "1", | |
| "MASTER_ADDR": "localhost", | |
| "MASTER_PORT": "12355", | |
| } | |
| for key, value in required_env_vars.items(): | |
| original_env[key] = os.environ.get(key) | |
| os.environ[key] = value | |
| try: | |
| # Test VLM processor with vLLM colocate mode | |
| with warnings.catch_warnings(record=True) as w: | |
| warnings.simplefilter("always") | |
| try: | |
| # Load model with quantization for memory efficiency | |
| model = AutoModelForCausalLM.from_pretrained( | |
| "trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", | |
| quantization_config=quantization_config, | |
| torch_dtype=torch.bfloat16, | |
| ) | |
| trainer = GRPOTrainer( | |
| model=model, | |
| reward_funcs=dummy_reward_func, | |
| args=config, | |
| train_dataset=dataset, | |
| processing_class=processor, # VLM processor | |
| peft_config=lora_config, # Use LoRA for memory efficiency | |
| ) | |
| # Should detect VLM processor correctly and allow vLLM | |
| self.assertTrue(trainer.use_vllm, "vLLM should be enabled for VLM processors in colocate mode") | |
| self.assertEqual(trainer.vllm_mode, "colocate", "Should use colocate mode") | |
| # Check if signature columns were set properly | |
| if trainer._signature_columns is not None: | |
| # Should include 'image' in signature columns for VLM processors | |
| self.assertIn( | |
| "image", | |
| trainer._signature_columns, | |
| "Should include 'image' in signature columns for VLM", | |
| ) | |
| # Should not emit any warnings about VLM incompatibility | |
| incompatibility_warnings = [ | |
| str(w_item.message) | |
| for w_item in w | |
| if "does not support VLMs" in str(w_item.message) | |
| or "not compatible" in str(w_item.message).lower() | |
| ] | |
| self.assertEqual( | |
| len(incompatibility_warnings), | |
| 0, | |
| f"Should not emit VLM incompatibility warnings, but got: {incompatibility_warnings}", | |
| ) | |
| # Test passes if we get this far without exceptions | |
| except Exception as e: | |
| # If vLLM fails to initialize due to hardware constraints or other issues, that's expected | |
| if any( | |
| keyword in str(e).lower() | |
| for keyword in [ | |
| "outofmemoryerror", | |
| "cuda", | |
| "memory", | |
| "insufficient", | |
| "no such device", | |
| "free memory", | |
| "gpu memory utilization", | |
| "decrease gpu memory", | |
| ] | |
| ): | |
| self.skipTest(f"Skipping vLLM colocate test due to hardware constraints: {e}") | |
| elif "KeyError" in str(e) and "RANK" in str(e): | |
| self.skipTest(f"Skipping vLLM colocate test due to environment setup issues: {e}") | |
| elif "ValueError" in str(e) and "memory" in str(e).lower(): | |
| self.skipTest(f"Skipping vLLM colocate test due to memory constraints: {e}") | |
| else: | |
| raise | |
| finally: | |
| # Restore original environment variables | |
| for key, original_value in original_env.items(): | |
| if original_value is None: | |
| os.environ.pop(key, None) | |
| else: | |
| os.environ[key] = original_value | |
| release_memory(model, trainer) | |
| def test_training_vllm(self): | |
| """Test that training works with vLLM for generation.""" | |
| dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| training_args = GRPOConfig( | |
| output_dir=tmp_dir, | |
| learning_rate=0.1, # increase the learning rate to speed up the test | |
| per_device_train_batch_size=3, # reduce the batch size to reduce memory usage | |
| num_generations=3, # reduce the number of generations to reduce memory usage | |
| max_completion_length=8, # reduce the completion length to reduce memory usage | |
| report_to="none", | |
| logging_strategy="no", | |
| use_vllm=True, | |
| ) | |
| try: | |
| trainer = GRPOTrainer( | |
| model="Qwen/Qwen2.5-0.5B-Instruct", # tiny models are too small for vLLM | |
| reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", | |
| args=training_args, | |
| train_dataset=dataset, | |
| ) | |
| previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} | |
| trainer.train() | |
| self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) | |
| # Check that the params have changed | |
| for n, param in previous_trainable_params.items(): | |
| new_param = trainer.model.get_parameter(n) | |
| self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") | |
| except Exception as e: | |
| # If vLLM fails to initialize due to hardware constraints or other issues, that's expected | |
| if any( | |
| keyword in str(e).lower() | |
| for keyword in [ | |
| "outofmemoryerror", | |
| "cuda", | |
| "memory", | |
| "insufficient", | |
| "no such device", | |
| "free memory", | |
| "gpu memory utilization", | |
| "decrease gpu memory", | |
| ] | |
| ): | |
| self.skipTest(f"Skipping vLLM training test due to hardware constraints: {e}") | |
| elif "KeyError" in str(e) and "RANK" in str(e): | |
| self.skipTest(f"Skipping vLLM training test due to environment setup issues: {e}") | |
| elif "ValueError" in str(e) and "memory" in str(e).lower(): | |
| self.skipTest(f"Skipping vLLM training test due to memory constraints: {e}") | |
| else: | |
| raise | |
| release_memory(trainer.model, trainer) | |