|
|
from huggingface_hub import HfFileSystem |
|
|
import pandas as pd |
|
|
from utils import logger |
|
|
from datetime import datetime, timedelta |
|
|
import threading |
|
|
import traceback |
|
|
import json |
|
|
import re |
|
|
from typing import List, Tuple, Optional |
|
|
|
|
|
|
|
|
fs = HfFileSystem() |
|
|
|
|
|
IMPORTANT_MODELS = [ |
|
|
"auto", |
|
|
"bert", |
|
|
"gpt2", |
|
|
"t5", |
|
|
"modernbert", |
|
|
"vit", |
|
|
"clip", |
|
|
"detr", |
|
|
"table_transformer", |
|
|
"got_ocr2", |
|
|
"whisper", |
|
|
"wav2vec2", |
|
|
"qwen2_audio", |
|
|
"speech_t5", |
|
|
"csm", |
|
|
"llama", |
|
|
"gemma3", |
|
|
"qwen2", |
|
|
"mistral3", |
|
|
"qwen2_5_vl", |
|
|
"llava", |
|
|
"smolvlm", |
|
|
"internvl", |
|
|
"gemma3n", |
|
|
"qwen2_5_omni", |
|
|
|
|
|
"qwen2_5_omni", |
|
|
] |
|
|
|
|
|
KEYS_TO_KEEP = [ |
|
|
"success_amd", |
|
|
"success_nvidia", |
|
|
"skipped_amd", |
|
|
"skipped_nvidia", |
|
|
"failed_multi_no_amd", |
|
|
"failed_multi_no_nvidia", |
|
|
"failed_single_no_amd", |
|
|
"failed_single_no_nvidia", |
|
|
"failures_amd", |
|
|
"failures_nvidia", |
|
|
"job_link_amd", |
|
|
"job_link_nvidia", |
|
|
] |
|
|
|
|
|
|
|
|
def log_dataframe_link(link: str) -> str: |
|
|
""" |
|
|
Adds the link to the dataset in the logs, modifies it to get a clockable link and then returns the date of the |
|
|
report. |
|
|
""" |
|
|
if link.startswith("sample_"): |
|
|
return "9999-99-99" |
|
|
logger.info(f"Reading df located at {link}") |
|
|
|
|
|
if link.startswith("hf://"): |
|
|
link = "https://huggingface.co/" + link.removeprefix("hf://") |
|
|
|
|
|
pattern = r'transformers_daily_ci(.*?)/(\d{4}-\d{2}-\d{2})' |
|
|
match = re.search(pattern, link) |
|
|
|
|
|
if not match: |
|
|
logger.error("Could not find transformers_daily_ci and.or date in the link") |
|
|
return "9999-99-99" |
|
|
|
|
|
path_between = match.group(1) |
|
|
link = link.replace("transformers_daily_ci" + path_between, "transformers_daily_ci/blob/main") |
|
|
logger.info(f"Link to data source: {link}") |
|
|
|
|
|
return match.group(2) |
|
|
|
|
|
def infer_latest_update_msg(date_df_amd: str, date_df_nvidia: str) -> str: |
|
|
|
|
|
if date_df_amd.startswith("9999") and date_df_nvidia.startswith("9999"): |
|
|
return "could not find last update time" |
|
|
|
|
|
if date_df_amd != date_df_nvidia: |
|
|
logger.warning(f"Different dates found: {date_df_amd} (AMD) vs {date_df_nvidia} (NVIDIA)") |
|
|
|
|
|
try: |
|
|
latest_date = max(date_df_amd, date_df_nvidia) |
|
|
yyyy, mm, dd = latest_date.split("-") |
|
|
return f"last updated {mm}/{dd}/{yyyy}" |
|
|
except Exception as e: |
|
|
logger.error(f"When trying to infer latest date, got error {e}") |
|
|
return "could not find last update time" |
|
|
|
|
|
def read_one_dataframe(json_path: str, device_label: str) -> tuple[pd.DataFrame, str]: |
|
|
df_upload_date = log_dataframe_link(json_path) |
|
|
df = pd.read_json(json_path, orient="index") |
|
|
df.index.name = "model_name" |
|
|
df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0) |
|
|
df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0) |
|
|
return df, df_upload_date |
|
|
|
|
|
def get_available_dates() -> List[str]: |
|
|
"""Get list of available dates from both AMD and NVIDIA datasets.""" |
|
|
try: |
|
|
|
|
|
amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" |
|
|
files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) |
|
|
logger.info(f"Found {len(files_amd)} AMD files") |
|
|
|
|
|
|
|
|
nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" |
|
|
files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) |
|
|
logger.info(f"Found {len(files_nvidia)} NVIDIA files") |
|
|
|
|
|
|
|
|
amd_dates = set() |
|
|
for file_path in files_amd: |
|
|
|
|
|
pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/runs/[^/]+/ci_results_run_models_gpu/model_results\.json' |
|
|
match = re.search(pattern, file_path) |
|
|
if match: |
|
|
amd_dates.add(match.group(1)) |
|
|
else: |
|
|
|
|
|
logger.debug(f"AMD file path didn't match pattern: {file_path}") |
|
|
|
|
|
|
|
|
if files_amd: |
|
|
logger.info(f"Example AMD file paths: {files_amd[:3]}") |
|
|
|
|
|
nvidia_dates = set() |
|
|
for file_path in files_nvidia: |
|
|
|
|
|
pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/ci_results_run_models_gpu/model_results\.json' |
|
|
match = re.search(pattern, file_path) |
|
|
if match: |
|
|
nvidia_dates.add(match.group(1)) |
|
|
|
|
|
logger.info(f"AMD dates: {sorted(amd_dates, reverse=True)[:5]}...") |
|
|
logger.info(f"NVIDIA dates: {sorted(nvidia_dates, reverse=True)[:5]}...") |
|
|
|
|
|
|
|
|
common_dates = sorted(amd_dates.intersection(nvidia_dates), reverse=True) |
|
|
logger.info(f"Common dates: {len(common_dates)} dates where both AMD and NVIDIA have data") |
|
|
|
|
|
if common_dates: |
|
|
return common_dates[:30] |
|
|
else: |
|
|
|
|
|
logger.warning("No real dates available, generating fake dates for demo purposes") |
|
|
fake_dates = [] |
|
|
today = datetime.now() |
|
|
for i in range(7): |
|
|
date = today - timedelta(days=i) |
|
|
fake_dates.append(date.strftime("%Y-%m-%d")) |
|
|
return fake_dates |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting available dates: {e}") |
|
|
|
|
|
logger.info("Generating fake dates due to error") |
|
|
fake_dates = [] |
|
|
today = datetime.now() |
|
|
for i in range(7): |
|
|
date = today - timedelta(days=i) |
|
|
fake_dates.append(date.strftime("%Y-%m-%d")) |
|
|
return fake_dates |
|
|
|
|
|
|
|
|
def get_data_for_date(target_date: str) -> tuple[pd.DataFrame, str]: |
|
|
"""Get data for a specific date.""" |
|
|
try: |
|
|
|
|
|
|
|
|
amd_src = f"hf://datasets/optimum-amd/transformers_daily_ci/{target_date}/runs/*/ci_results_run_models_gpu/model_results.json" |
|
|
amd_files = fs.glob(amd_src, refresh=True) |
|
|
|
|
|
if not amd_files: |
|
|
raise FileNotFoundError(f"No AMD data found for date {target_date}") |
|
|
|
|
|
|
|
|
amd_file = amd_files[0] |
|
|
|
|
|
if not amd_file.startswith("hf://"): |
|
|
amd_file = f"hf://{amd_file}" |
|
|
|
|
|
|
|
|
nvidia_src = f"hf://datasets/hf-internal-testing/transformers_daily_ci/{target_date}/ci_results_run_models_gpu/model_results.json" |
|
|
|
|
|
|
|
|
df_amd = pd.DataFrame() |
|
|
df_nvidia = pd.DataFrame() |
|
|
|
|
|
try: |
|
|
df_amd, _ = read_one_dataframe(amd_file, "amd") |
|
|
logger.info(f"Successfully loaded AMD data for {target_date}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load AMD data for {target_date}: {e}") |
|
|
|
|
|
try: |
|
|
df_nvidia, _ = read_one_dataframe(nvidia_src, "nvidia") |
|
|
logger.info(f"Successfully loaded NVIDIA data for {target_date}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load NVIDIA data for {target_date}: {e}") |
|
|
|
|
|
|
|
|
if df_amd.empty and df_nvidia.empty: |
|
|
logger.warning(f"No data available for either platform on {target_date}") |
|
|
return pd.DataFrame(), target_date |
|
|
|
|
|
|
|
|
if not df_amd.empty and not df_nvidia.empty: |
|
|
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") |
|
|
elif not df_amd.empty: |
|
|
joined = df_amd.copy() |
|
|
else: |
|
|
joined = df_nvidia.copy() |
|
|
|
|
|
joined = joined[KEYS_TO_KEEP] |
|
|
joined.index = joined.index.str.replace("^models_", "", regex=True) |
|
|
|
|
|
|
|
|
important_models_lower = [model.lower() for model in IMPORTANT_MODELS] |
|
|
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] |
|
|
|
|
|
return filtered_joined, target_date |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting data for date {target_date}: {e}") |
|
|
|
|
|
return pd.DataFrame(), target_date |
|
|
|
|
|
|
|
|
def get_historical_data(start_date: str, end_date: str, sample_data = False) -> pd.DataFrame: |
|
|
"""Get historical data for a date range.""" |
|
|
if sample_data: |
|
|
return get_fake_historical_data(start_date, end_date) |
|
|
try: |
|
|
start_dt = datetime.strptime(start_date, "%Y-%m-%d") |
|
|
end_dt = datetime.strptime(end_date, "%Y-%m-%d") |
|
|
|
|
|
historical_data = [] |
|
|
current_dt = start_dt |
|
|
|
|
|
while current_dt <= end_dt: |
|
|
date_str = current_dt.strftime("%Y-%m-%d") |
|
|
try: |
|
|
df, _ = get_data_for_date(date_str) |
|
|
|
|
|
if not df.empty: |
|
|
df['date'] = date_str |
|
|
historical_data.append(df) |
|
|
logger.info(f"Loaded data for {date_str}") |
|
|
else: |
|
|
logger.warning(f"No data available for {date_str}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not load data for {date_str}: {e}") |
|
|
|
|
|
current_dt += timedelta(days=1) |
|
|
|
|
|
|
|
|
combined_df = pd.concat(historical_data, ignore_index=False) |
|
|
return combined_df |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting historical data: {e}") |
|
|
|
|
|
logger.info("Falling back to fake historical data due to error") |
|
|
return get_fake_historical_data(start_date, end_date) |
|
|
|
|
|
|
|
|
def get_distant_data() -> tuple[pd.DataFrame, str]: |
|
|
|
|
|
amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" |
|
|
files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) |
|
|
df_amd, date_df_amd = read_one_dataframe(f"hf://{files_amd[0]}", "amd") |
|
|
|
|
|
|
|
|
nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" |
|
|
files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) |
|
|
|
|
|
nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/') |
|
|
nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path |
|
|
df_nvidia, date_df_nvidia = read_one_dataframe(nvidia_path, "nvidia") |
|
|
|
|
|
latest_update_msg = infer_latest_update_msg(date_df_amd, date_df_nvidia) |
|
|
|
|
|
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") |
|
|
joined = joined[KEYS_TO_KEEP] |
|
|
joined.index = joined.index.str.replace("^models_", "", regex=True) |
|
|
|
|
|
important_models_lower = [model.lower() for model in IMPORTANT_MODELS] |
|
|
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] |
|
|
|
|
|
for model in IMPORTANT_MODELS: |
|
|
if model not in filtered_joined.index: |
|
|
print(f"[WARNING] Model {model} was missing from index.") |
|
|
return filtered_joined, latest_update_msg |
|
|
|
|
|
|
|
|
def get_sample_data() -> tuple[pd.DataFrame, str]: |
|
|
|
|
|
df_amd, _ = read_one_dataframe("sample_amd.json", "amd") |
|
|
df_nvidia, _ = read_one_dataframe("sample_nvidia.json", "nvidia") |
|
|
|
|
|
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") |
|
|
joined = joined[KEYS_TO_KEEP] |
|
|
joined.index = joined.index.str.replace("^models_", "", regex=True) |
|
|
|
|
|
important_models_lower = [model.lower() for model in IMPORTANT_MODELS] |
|
|
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] |
|
|
|
|
|
filtered_joined.index = "sample_" + filtered_joined.index |
|
|
return filtered_joined, "sample data was loaded" |
|
|
|
|
|
|
|
|
def get_fake_historical_data(start_date: str, end_date: str) -> pd.DataFrame: |
|
|
"""Generate fake historical data for a date range when real data loading fails.""" |
|
|
try: |
|
|
start_dt = datetime.strptime(start_date, "%Y-%m-%d") |
|
|
end_dt = datetime.strptime(end_date, "%Y-%m-%d") |
|
|
|
|
|
|
|
|
historical_data = [] |
|
|
current_dt = start_dt |
|
|
|
|
|
|
|
|
sample_df, _ = get_sample_data() |
|
|
|
|
|
while current_dt <= end_dt: |
|
|
date_str = current_dt.strftime("%Y-%m-%d") |
|
|
|
|
|
|
|
|
date_df = sample_df.copy() |
|
|
date_df['date'] = date_str |
|
|
|
|
|
|
|
|
import random |
|
|
for idx in date_df.index: |
|
|
|
|
|
for col in ['success_amd', 'success_nvidia', 'skipped_amd', 'skipped_nvidia']: |
|
|
if col in date_df.columns: |
|
|
original_val = date_df.loc[idx, col] |
|
|
if pd.notna(original_val) and original_val > 0: |
|
|
variation = random.uniform(0.8, 1.2) |
|
|
date_df.loc[idx, col] = max(0, int(original_val * variation)) |
|
|
|
|
|
|
|
|
for col in ['failed_multi_no_amd', 'failed_multi_no_nvidia', 'failed_single_no_amd', 'failed_single_no_nvidia']: |
|
|
if col in date_df.columns: |
|
|
original_val = date_df.loc[idx, col] |
|
|
if pd.notna(original_val): |
|
|
|
|
|
variation = random.uniform(0.5, 2.0) |
|
|
date_df.loc[idx, col] = max(0, int(original_val * variation)) |
|
|
|
|
|
historical_data.append(date_df) |
|
|
current_dt += timedelta(days=1) |
|
|
|
|
|
if not historical_data: |
|
|
logger.warning("No fake historical data generated") |
|
|
return pd.DataFrame() |
|
|
|
|
|
|
|
|
combined_df = pd.concat(historical_data, ignore_index=False) |
|
|
logger.info(f"Generated fake historical data: {len(combined_df)} records from {start_date} to {end_date}") |
|
|
return combined_df |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error generating fake historical data: {e}") |
|
|
return pd.DataFrame() |
|
|
|
|
|
def safe_extract(row: pd.DataFrame, key: str) -> int: |
|
|
return int(row.get(key, 0)) if pd.notna(row.get(key, 0)) else 0 |
|
|
|
|
|
|
|
|
def find_failure_first_seen(historical_df: pd.DataFrame, model_name: str, test_name: str, device: str, gpu_type: str) -> Optional[str]: |
|
|
""" |
|
|
Find the first date when a specific test failure appeared in historical data. |
|
|
""" |
|
|
if historical_df.empty: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
model_name_lower = model_name.lower() |
|
|
|
|
|
|
|
|
model_data = historical_df[historical_df.index == model_name_lower].copy() |
|
|
|
|
|
if model_data.empty: |
|
|
return None |
|
|
|
|
|
|
|
|
model_data = model_data.sort_values('date') |
|
|
|
|
|
|
|
|
for idx, row in model_data.iterrows(): |
|
|
failures = row.get(f'failures_{device}', None) |
|
|
|
|
|
if failures is None or pd.isna(failures): |
|
|
continue |
|
|
|
|
|
|
|
|
if isinstance(failures, str): |
|
|
try: |
|
|
import json |
|
|
failures = json.loads(failures) |
|
|
except: |
|
|
continue |
|
|
|
|
|
|
|
|
if gpu_type in failures: |
|
|
for test in failures[gpu_type]: |
|
|
test_line = test.get('line', '') |
|
|
if test_line == test_name: |
|
|
|
|
|
return row.get('date', None) |
|
|
|
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error finding first seen date for {test_name}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def find_new_regressions(current_df: pd.DataFrame, historical_df: pd.DataFrame) -> list[dict]: |
|
|
""" |
|
|
Compare CURRENT failures against PREVIOUS day's failures to find NEW regressions. |
|
|
|
|
|
A regression is a test that: |
|
|
- Is failing in the CURRENT/LATEST run (current_df) |
|
|
- Was NOT failing in the PREVIOUS run (yesterday in historical_df) |
|
|
""" |
|
|
if current_df.empty or historical_df.empty: |
|
|
return [] |
|
|
|
|
|
new_regressions = [] |
|
|
|
|
|
|
|
|
available_dates = sorted(historical_df['date'].unique(), reverse=True) |
|
|
if len(available_dates) < 1: |
|
|
|
|
|
return [] |
|
|
|
|
|
yesterday_date = available_dates[0] |
|
|
yesterday_data = historical_df[historical_df['date'] == yesterday_date] |
|
|
|
|
|
|
|
|
for model_name in current_df.index: |
|
|
model_name_lower = model_name.lower() |
|
|
|
|
|
|
|
|
current_row = current_df.loc[model_name] |
|
|
|
|
|
|
|
|
yesterday_row = yesterday_data[yesterday_data.index == model_name_lower] |
|
|
yesterday_failures_amd = {} |
|
|
yesterday_failures_nvidia = {} |
|
|
|
|
|
if not yesterday_row.empty: |
|
|
yesterday_row = yesterday_row.iloc[0] |
|
|
yesterday_failures_amd = yesterday_row.get('failures_amd', {}) |
|
|
yesterday_failures_nvidia = yesterday_row.get('failures_nvidia', {}) |
|
|
|
|
|
|
|
|
if isinstance(yesterday_failures_amd, str): |
|
|
try: |
|
|
yesterday_failures_amd = json.loads(yesterday_failures_amd) |
|
|
except: |
|
|
yesterday_failures_amd = {} |
|
|
if isinstance(yesterday_failures_nvidia, str): |
|
|
try: |
|
|
yesterday_failures_nvidia = json.loads(yesterday_failures_nvidia) |
|
|
except: |
|
|
yesterday_failures_nvidia = {} |
|
|
|
|
|
|
|
|
current_failures_amd = current_row.get('failures_amd', {}) |
|
|
current_failures_nvidia = current_row.get('failures_nvidia', {}) |
|
|
|
|
|
|
|
|
if isinstance(current_failures_amd, str): |
|
|
try: |
|
|
current_failures_amd = json.loads(current_failures_amd) |
|
|
except: |
|
|
current_failures_amd = {} |
|
|
if isinstance(current_failures_nvidia, str): |
|
|
try: |
|
|
current_failures_nvidia = json.loads(current_failures_nvidia) |
|
|
except: |
|
|
current_failures_nvidia = {} |
|
|
|
|
|
|
|
|
for gpu_type in ['single', 'multi']: |
|
|
current_tests = current_failures_amd.get(gpu_type, []) |
|
|
yesterday_tests = yesterday_failures_amd.get(gpu_type, []) |
|
|
|
|
|
|
|
|
current_test_names = {test.get('line', '') for test in current_tests} |
|
|
yesterday_test_names = {test.get('line', '') for test in yesterday_tests} |
|
|
|
|
|
|
|
|
new_tests = current_test_names - yesterday_test_names |
|
|
for test_name in new_tests: |
|
|
if test_name: |
|
|
new_regressions.append({ |
|
|
'model': model_name, |
|
|
'test': test_name.split('::')[-1], |
|
|
'test_full': test_name, |
|
|
'device': 'amd', |
|
|
'gpu_type': gpu_type |
|
|
}) |
|
|
|
|
|
|
|
|
for gpu_type in ['single', 'multi']: |
|
|
current_tests = current_failures_nvidia.get(gpu_type, []) |
|
|
yesterday_tests = yesterday_failures_nvidia.get(gpu_type, []) |
|
|
|
|
|
|
|
|
current_test_names = {test.get('line', '') for test in current_tests} |
|
|
yesterday_test_names = {test.get('line', '') for test in yesterday_tests} |
|
|
|
|
|
|
|
|
new_tests = current_test_names - yesterday_test_names |
|
|
for test_name in new_tests: |
|
|
if test_name: |
|
|
new_regressions.append({ |
|
|
'model': model_name, |
|
|
'test': test_name.split('::')[-1], |
|
|
'test_full': test_name, |
|
|
'device': 'nvidia', |
|
|
'gpu_type': gpu_type |
|
|
}) |
|
|
|
|
|
return new_regressions |
|
|
|
|
|
|
|
|
def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]: |
|
|
"""Extract and process model data from DataFrame row.""" |
|
|
|
|
|
success_nvidia = safe_extract(row, "success_nvidia") |
|
|
success_amd = safe_extract(row, "success_amd") |
|
|
|
|
|
skipped_nvidia = safe_extract(row, "skipped_nvidia") |
|
|
skipped_amd = safe_extract(row, "skipped_amd") |
|
|
|
|
|
failed_multi_amd = safe_extract(row, 'failed_multi_no_amd') |
|
|
failed_multi_nvidia = safe_extract(row, 'failed_multi_no_nvidia') |
|
|
failed_single_amd = safe_extract(row, 'failed_single_no_amd') |
|
|
failed_single_nvidia = safe_extract(row, 'failed_single_no_nvidia') |
|
|
|
|
|
total_failed_amd = failed_multi_amd + failed_single_amd |
|
|
total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia |
|
|
|
|
|
amd_stats = { |
|
|
'passed': success_amd, |
|
|
'failed': total_failed_amd, |
|
|
'skipped': skipped_amd, |
|
|
'error': 0 |
|
|
} |
|
|
nvidia_stats = { |
|
|
'passed': success_nvidia, |
|
|
'failed': total_failed_nvidia, |
|
|
'skipped': skipped_nvidia, |
|
|
'error': 0 |
|
|
} |
|
|
return amd_stats, nvidia_stats, failed_multi_amd, failed_single_amd, failed_multi_nvidia, failed_single_nvidia |
|
|
|
|
|
|
|
|
|
|
|
class CIResults: |
|
|
|
|
|
def __init__(self): |
|
|
self.df = pd.DataFrame() |
|
|
self.available_models = [] |
|
|
self.latest_update_msg = "" |
|
|
self.available_dates = [] |
|
|
self.historical_df = pd.DataFrame() |
|
|
self.all_historical_data = pd.DataFrame() |
|
|
self.sample_data = False |
|
|
|
|
|
def load_data(self) -> None: |
|
|
"""Load data from the data source.""" |
|
|
|
|
|
try: |
|
|
logger.info("Loading distant data...") |
|
|
new_df, latest_update_msg = get_distant_data() |
|
|
self.latest_update_msg = latest_update_msg |
|
|
self.available_dates = get_available_dates() |
|
|
logger.info(f"Available dates: {len(self.available_dates)} dates") |
|
|
if self.available_dates: |
|
|
logger.info(f"Date range: {self.available_dates[-1]} to {self.available_dates[0]}") |
|
|
else: |
|
|
logger.warning("No available dates found") |
|
|
self.available_dates = [] |
|
|
except Exception as e: |
|
|
error_msg = [ |
|
|
"Loading data failed:", |
|
|
"-" * 120, |
|
|
traceback.format_exc(), |
|
|
"-" * 120, |
|
|
"Falling back on sample data." |
|
|
] |
|
|
logger.error("\n".join(error_msg)) |
|
|
self.sample_data = True |
|
|
new_df, latest_update_msg = get_sample_data() |
|
|
self.latest_update_msg = latest_update_msg |
|
|
self.available_dates = None |
|
|
|
|
|
|
|
|
self.df = new_df |
|
|
self.available_models = new_df.index.tolist() |
|
|
|
|
|
|
|
|
self.load_all_historical_data() |
|
|
|
|
|
|
|
|
logger.info(f"Data loaded successfully: {len(self.available_models)} models") |
|
|
logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}") |
|
|
logger.info(f"Latest update message: {self.latest_update_msg}") |
|
|
|
|
|
msg = {} |
|
|
for model in self.available_models[:3]: |
|
|
msg[model] = {} |
|
|
for col in self.df.columns: |
|
|
value = self.df.loc[model, col] |
|
|
if not isinstance(value, int): |
|
|
value = str(value) |
|
|
if len(value) > 10: |
|
|
value = value[:10] + "..." |
|
|
msg[model][col] = value |
|
|
logger.info(json.dumps(msg, indent=4)) |
|
|
|
|
|
def load_all_historical_data(self) -> None: |
|
|
"""Load all available historical data at startup.""" |
|
|
try: |
|
|
if not self.available_dates: |
|
|
|
|
|
fake_dates = [] |
|
|
today = datetime.now() |
|
|
for i in range(7): |
|
|
date = today - timedelta(days=i) |
|
|
fake_dates.append(date.strftime("%Y-%m-%d")) |
|
|
self.available_dates = fake_dates |
|
|
logger.info(f"No available dates found, generated {len(self.available_dates)} sample dates.") |
|
|
|
|
|
logger.info(f"Loading all historical data for {len(self.available_dates)} dates...") |
|
|
start_date = self.available_dates[-1] |
|
|
end_date = self.available_dates[0] |
|
|
|
|
|
self.all_historical_data = get_historical_data(start_date, end_date, self.sample_data) |
|
|
logger.info(f"All historical data loaded: {len(self.all_historical_data)} records") |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading all historical data: {e}") |
|
|
self.all_historical_data = pd.DataFrame() |
|
|
|
|
|
def load_historical_data(self, start_date: str, end_date: str) -> None: |
|
|
"""Load historical data for a date range from pre-loaded data.""" |
|
|
try: |
|
|
logger.info(f"Filtering historical data from {start_date} to {end_date}") |
|
|
|
|
|
if self.all_historical_data.empty: |
|
|
logger.warning("No pre-loaded historical data available") |
|
|
self.historical_df = pd.DataFrame() |
|
|
return |
|
|
|
|
|
|
|
|
start_dt = datetime.strptime(start_date, "%Y-%m-%d") |
|
|
end_dt = datetime.strptime(end_date, "%Y-%m-%d") |
|
|
|
|
|
|
|
|
filtered_data = [] |
|
|
for date_str in self.all_historical_data['date'].unique(): |
|
|
date_dt = datetime.strptime(date_str, "%Y-%m-%d") |
|
|
if start_dt <= date_dt <= end_dt: |
|
|
date_data = self.all_historical_data[self.all_historical_data['date'] == date_str] |
|
|
filtered_data.append(date_data) |
|
|
|
|
|
if filtered_data: |
|
|
self.historical_df = pd.concat(filtered_data, ignore_index=False) |
|
|
logger.info(f"Historical data filtered: {len(self.historical_df)} records for {start_date} to {end_date}") |
|
|
else: |
|
|
self.historical_df = pd.DataFrame() |
|
|
logger.warning(f"No historical data found for date range {start_date} to {end_date}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error filtering historical data: {e}") |
|
|
self.historical_df = pd.DataFrame() |
|
|
|
|
|
def schedule_data_reload(self): |
|
|
"""Schedule the next data reload.""" |
|
|
def reload_data(): |
|
|
self.load_data() |
|
|
|
|
|
timer = threading.Timer(900.0, reload_data) |
|
|
timer.daemon = True |
|
|
timer.start() |
|
|
logger.info("Next data reload scheduled in 15 minutes") |
|
|
|
|
|
|
|
|
timer = threading.Timer(900.0, reload_data) |
|
|
timer.daemon = True |
|
|
timer.start() |
|
|
logger.info("Data auto-reload scheduled every 15 minutes") |
|
|
|