Spaces:
Running
on
Zero
Running
on
Zero
File size: 5,240 Bytes
295978e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Codes adapted from [SeedVR]
# https://github.com/ByteDance-Seed/SeedVR/tree/main/common/distributed
"""
Distributed basic functions.
"""
import os
import torch
from torch import nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from torch.distributed.fsdp._common_utils import _is_fsdp_flattened
def get_global_rank() -> int:
"""
Get the global rank, the global index of the GPU.
"""
return int(os.environ.get("RANK", "0"))
def get_local_rank() -> int:
"""
Get the local rank, the local index of the GPU.
"""
return int(os.environ.get("LOCAL_RANK", "0"))
def get_world_size() -> int:
"""
Get the world size, the total amount of GPUs.
"""
return int(os.environ.get("WORLD_SIZE", "1"))
def get_device() -> torch.device:
"""
Get current rank device.
"""
return torch.device("cuda", get_local_rank())
def barrier_if_distributed(*args, **kwargs):
"""
Synchronizes all processes if under distributed context.
"""
if dist.is_initialized():
return dist.barrier(*args, **kwargs)
def init_torch(cudnn_benchmark=True):
"""
Common PyTorch initialization configuration.
"""
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
torch.backends.cudnn.benchmark = cudnn_benchmark
torch.cuda.set_device(get_local_rank())
dist.init_process_group(
backend="nccl",
rank=get_global_rank(),
world_size=get_world_size(),
)
def convert_to_ddp(module: torch.nn.Module, **kwargs) -> DistributedDataParallel:
return DistributedDataParallel(
module=module,
device_ids=[get_local_rank()],
output_device=get_local_rank(),
**kwargs,
)
def meta_param_init_fn(module: nn.Module) -> None:
"""
Used for model inited onto meta device.
Init meta param/buffer with empty tensor.
We don't care numerical correctness in this func.
FSDP will sync param/buffer state from rank0 to the other ranks.
"""
with torch.no_grad():
for submodule in module.modules():
for param_name, param in submodule.named_parameters(recurse=False):
if not _is_fsdp_flattened(param) and param.is_meta:
materialized_param = nn.Parameter(torch.empty_like(param, device="cpu"))
setattr(submodule, param_name, materialized_param)
for buffer_name, buffer in submodule.named_buffers(recurse=False):
if not _is_fsdp_flattened(buffer) and buffer.is_meta:
materialized_param = torch.empty_like(buffer, device="cpu")
setattr(submodule, buffer_name, materialized_param)
torch.cuda.empty_cache()
def meta_non_persistent_buffer_init_fn(module: nn.Module) -> nn.Module:
"""
Materialize meta device buffers that are not persistent in state_dict.
Handles special cases like RotaryEmbedding.freqs.
"""
with torch.no_grad():
for submodule in module.modules():
if hasattr(submodule, "freqs"):
freqs = getattr(submodule, "freqs")
if isinstance(freqs, torch.Tensor) and freqs.is_meta:
dim = submodule.dim
def rope_params(max_seq_len, dim, theta=10000):
assert dim % 2 == 0
freqs = torch.outer(
torch.arange(max_seq_len),
1.0 / torch.pow(theta,
torch.arange(0, dim, 2).to(torch.float32).div(dim)))
freqs = torch.polar(torch.ones_like(freqs), freqs)
return freqs
dim = 5120 # 1536
num_heads = 40 # 12
# dim = 1536
# num_heads = 12
d = dim // num_heads
freqs_tensor = torch.cat([
rope_params(1024, d - 4 * (d // 6)),
rope_params(1024, 2 * (d // 6)),
rope_params(1024, 2 * (d // 6))
], dim=1).to(dtype=torch.cfloat, device="cpu")
setattr(submodule, "freqs", freqs_tensor)
print(f"Successfully materialized freqs for {submodule.__class__.__name__}")
assert not any(b.is_meta for n, b in module.named_buffers())
return module
|