badaoui's picture
badaoui HF Staff
Add historical data visualization features
5309153
raw
history blame
31.6 kB
from huggingface_hub import HfFileSystem
import pandas as pd
from utils import logger
from datetime import datetime, timedelta
import threading
import traceback
import json
import re
from typing import List, Tuple, Optional
# NOTE: if caching is an issue, try adding `use_listings_cache=False`
fs = HfFileSystem()
IMPORTANT_MODELS = [
"auto",
"bert", # old but dominant (encoder only)
"gpt2", # old (decoder)
"t5", # old (encoder-decoder)
"modernbert", # (encoder only)
"vit", # old (vision) - fixed comma
"clip", # old but dominant (vision)
"detr", # objection detection, segmentation (vision)
"table_transformer", # objection detection (visioin) - maybe just detr?
"got_ocr2", # ocr (vision)
"whisper", # old but dominant (audio)
"wav2vec2", # old (audio)
"qwen2_audio", # (audio)
"speech_t5", # (audio)
"csm", # (audio)
"llama", # new and dominant (meta)
"gemma3", # new (google)
"qwen2", # new (Alibaba)
"mistral3", # new (Mistral) - added missing comma
"qwen2_5_vl", # new (vision)
"llava", # many models from it (vision)
"smolvlm", # new (video)
"internvl", # new (video)
"gemma3n", # new (omnimodal models)
"qwen2_5_omni", # new (omnimodal models)
# "gpt_oss", # new (quite used)
"qwen2_5_omni", # new (omnimodal models)
]
KEYS_TO_KEEP = [
"success_amd",
"success_nvidia",
"skipped_amd",
"skipped_nvidia",
"failed_multi_no_amd",
"failed_multi_no_nvidia",
"failed_single_no_amd",
"failed_single_no_nvidia",
"failures_amd",
"failures_nvidia",
"job_link_amd",
"job_link_nvidia",
]
def log_dataframe_link(link: str) -> str:
"""
Adds the link to the dataset in the logs, modifies it to get a clockable link and then returns the date of the
report.
"""
if link.startswith("sample_"):
return "9999-99-99"
logger.info(f"Reading df located at {link}")
# Make sure the links starts with an http adress
if link.startswith("hf://"):
link = "https://huggingface.co/" + link.removeprefix("hf://")
# Pattern to match transformers_daily_ci followed by any path, then a date (YYYY-MM-DD format)
pattern = r'transformers_daily_ci(.*?)/(\d{4}-\d{2}-\d{2})'
match = re.search(pattern, link)
# Failure case:
if not match:
logger.error("Could not find transformers_daily_ci and.or date in the link")
return "9999-99-99"
# Replace the path between with blob/main
path_between = match.group(1)
link = link.replace("transformers_daily_ci" + path_between, "transformers_daily_ci/blob/main")
logger.info(f"Link to data source: {link}")
# Return the date
return match.group(2)
def infer_latest_update_msg(date_df_amd: str, date_df_nvidia: str) -> str:
# Early return if one of the dates is invalid
if date_df_amd.startswith("9999") and date_df_nvidia.startswith("9999"):
return "could not find last update time"
# Warn if dates are not the same
if date_df_amd != date_df_nvidia:
logger.warning(f"Different dates found: {date_df_amd} (AMD) vs {date_df_nvidia} (NVIDIA)")
# Take the latest date and format it
try:
latest_date = max(date_df_amd, date_df_nvidia)
yyyy, mm, dd = latest_date.split("-")
return f"last updated {mm}/{dd}/{yyyy}"
except Exception as e:
logger.error(f"When trying to infer latest date, got error {e}")
return "could not find last update time"
def read_one_dataframe(json_path: str, device_label: str) -> tuple[pd.DataFrame, str]:
df_upload_date = log_dataframe_link(json_path)
df = pd.read_json(json_path, orient="index")
df.index.name = "model_name"
df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0)
df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0)
return df, df_upload_date
def get_available_dates() -> List[str]:
"""Get list of available dates from both AMD and NVIDIA datasets."""
try:
# Get AMD dates - the path structure is: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json
amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json"
files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True)
logger.info(f"Found {len(files_amd)} AMD files")
# Get NVIDIA dates - structure is: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json
nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json"
files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True)
logger.info(f"Found {len(files_nvidia)} NVIDIA files")
# Extract dates from file paths
amd_dates = set()
for file_path in files_amd:
# Pattern to match the date in the AMD path: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json
pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/runs/[^/]+/ci_results_run_models_gpu/model_results\.json'
match = re.search(pattern, file_path)
if match:
amd_dates.add(match.group(1))
else:
# Log unmatched paths for debugging
logger.debug(f"AMD file path didn't match pattern: {file_path}")
# Log a few example AMD file paths for debugging
if files_amd:
logger.info(f"Example AMD file paths: {files_amd[:3]}")
nvidia_dates = set()
for file_path in files_nvidia:
# Pattern to match the date in the NVIDIA path: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json
pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/ci_results_run_models_gpu/model_results\.json'
match = re.search(pattern, file_path)
if match:
nvidia_dates.add(match.group(1))
logger.info(f"AMD dates: {sorted(amd_dates, reverse=True)[:5]}...") # Show first 5
logger.info(f"NVIDIA dates: {sorted(nvidia_dates, reverse=True)[:5]}...") # Show first 5
# Return intersection of both datasets (dates where both have data)
common_dates = sorted(amd_dates.intersection(nvidia_dates), reverse=True)
logger.info(f"Common dates: {len(common_dates)} dates where both AMD and NVIDIA have data")
if common_dates:
return common_dates[:30] # Limit to last 30 days for performance
else:
# If no real dates available, generate fake dates for the last 7 days
logger.warning("No real dates available, generating fake dates for demo purposes")
fake_dates = []
today = datetime.now()
for i in range(7):
date = today - timedelta(days=i)
fake_dates.append(date.strftime("%Y-%m-%d"))
return fake_dates
except Exception as e:
logger.error(f"Error getting available dates: {e}")
# Generate fake dates when there's an error
logger.info("Generating fake dates due to error")
fake_dates = []
today = datetime.now()
for i in range(7):
date = today - timedelta(days=i)
fake_dates.append(date.strftime("%Y-%m-%d"))
return fake_dates
def get_data_for_date(target_date: str) -> tuple[pd.DataFrame, str]:
"""Get data for a specific date."""
try:
# For AMD, we need to find the specific run file for the date
# AMD structure: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json
amd_src = f"hf://datasets/optimum-amd/transformers_daily_ci/{target_date}/runs/*/ci_results_run_models_gpu/model_results.json"
amd_files = fs.glob(amd_src, refresh=True)
if not amd_files:
raise FileNotFoundError(f"No AMD data found for date {target_date}")
# Use the first (most recent) run for the date
amd_file = amd_files[0]
# Ensure the AMD file path has the hf:// prefix
if not amd_file.startswith("hf://"):
amd_file = f"hf://{amd_file}"
# NVIDIA structure: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json
nvidia_src = f"hf://datasets/hf-internal-testing/transformers_daily_ci/{target_date}/ci_results_run_models_gpu/model_results.json"
# Read dataframes - try each platform independently
df_amd = pd.DataFrame()
df_nvidia = pd.DataFrame()
try:
df_amd, _ = read_one_dataframe(amd_file, "amd")
logger.info(f"Successfully loaded AMD data for {target_date}")
except Exception as e:
logger.warning(f"Failed to load AMD data for {target_date}: {e}")
try:
df_nvidia, _ = read_one_dataframe(nvidia_src, "nvidia")
logger.info(f"Successfully loaded NVIDIA data for {target_date}")
except Exception as e:
logger.warning(f"Failed to load NVIDIA data for {target_date}: {e}")
# If both failed, return empty dataframe
if df_amd.empty and df_nvidia.empty:
logger.warning(f"No data available for either platform on {target_date}")
return pd.DataFrame(), target_date
# Join both dataframes (outer join to include data from either platform)
if not df_amd.empty and not df_nvidia.empty:
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer")
elif not df_amd.empty:
joined = df_amd.copy()
else:
joined = df_nvidia.copy()
joined = joined[KEYS_TO_KEEP]
joined.index = joined.index.str.replace("^models_", "", regex=True)
# Filter out all but important models
important_models_lower = [model.lower() for model in IMPORTANT_MODELS]
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)]
return filtered_joined, target_date
except Exception as e:
logger.error(f"Error getting data for date {target_date}: {e}")
# Return empty dataframe instead of sample data for historical functionality
return pd.DataFrame(), target_date
def get_historical_data(start_date: str, end_date: str, sample_data = False) -> pd.DataFrame:
"""Get historical data for a date range."""
if sample_data:
return get_fake_historical_data(start_date, end_date)
try:
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
historical_data = []
current_dt = start_dt
while current_dt <= end_dt:
date_str = current_dt.strftime("%Y-%m-%d")
try:
df, _ = get_data_for_date(date_str)
# Only add non-empty dataframes
if not df.empty:
df['date'] = date_str
historical_data.append(df)
logger.info(f"Loaded data for {date_str}")
else:
logger.warning(f"No data available for {date_str}")
except Exception as e:
logger.warning(f"Could not load data for {date_str}: {e}")
current_dt += timedelta(days=1)
# Combine all dataframes
combined_df = pd.concat(historical_data, ignore_index=False)
return combined_df
except Exception as e:
logger.error(f"Error getting historical data: {e}")
# Fall back to fake data when there's an error
logger.info("Falling back to fake historical data due to error")
return get_fake_historical_data(start_date, end_date)
def get_distant_data() -> tuple[pd.DataFrame, str]:
# Retrieve AMD dataframe
amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json"
files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True)
df_amd, date_df_amd = read_one_dataframe(f"hf://{files_amd[0]}", "amd")
# Retrieve NVIDIA dataframe, which pattern should be:
# hf://datasets/hf-internal-testing`/transformers_daily_ci/raw/main/YYYY-MM-DD/ci_results_run_models_gpu/model_results.json
nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json"
files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True)
# NOTE: should this be removeprefix instead of lstrip?
nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/')
nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path
df_nvidia, date_df_nvidia = read_one_dataframe(nvidia_path, "nvidia")
# Infer and format the latest df date
latest_update_msg = infer_latest_update_msg(date_df_amd, date_df_nvidia)
# Join both dataframes
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer")
joined = joined[KEYS_TO_KEEP]
joined.index = joined.index.str.replace("^models_", "", regex=True)
# Fitler out all but important models
important_models_lower = [model.lower() for model in IMPORTANT_MODELS]
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)]
# Warn for ach missing important models
for model in IMPORTANT_MODELS:
if model not in filtered_joined.index:
print(f"[WARNING] Model {model} was missing from index.")
return filtered_joined, latest_update_msg
def get_sample_data() -> tuple[pd.DataFrame, str]:
# Retrieve sample dataframes
df_amd, _ = read_one_dataframe("sample_amd.json", "amd")
df_nvidia, _ = read_one_dataframe("sample_nvidia.json", "nvidia")
# Join both dataframes
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer")
joined = joined[KEYS_TO_KEEP]
joined.index = joined.index.str.replace("^models_", "", regex=True)
# Fitler out all but important models
important_models_lower = [model.lower() for model in IMPORTANT_MODELS]
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)]
# Prefix all model names with "sample_"
filtered_joined.index = "sample_" + filtered_joined.index
return filtered_joined, "sample data was loaded"
def get_fake_historical_data(start_date: str, end_date: str) -> pd.DataFrame:
"""Generate fake historical data for a date range when real data loading fails."""
try:
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
# Generate fake data for each date in the range
historical_data = []
current_dt = start_dt
# Get base sample data to use as template
sample_df, _ = get_sample_data()
while current_dt <= end_dt:
date_str = current_dt.strftime("%Y-%m-%d")
# Create a copy of sample data for this date with some random variations
date_df = sample_df.copy()
date_df['date'] = date_str
# Add some random variation to make it look more realistic
import random
for idx in date_df.index:
# Vary the success/failure counts slightly (±20%)
for col in ['success_amd', 'success_nvidia', 'skipped_amd', 'skipped_nvidia']:
if col in date_df.columns:
original_val = date_df.loc[idx, col]
if pd.notna(original_val) and original_val > 0:
variation = random.uniform(0.8, 1.2)
date_df.loc[idx, col] = max(0, int(original_val * variation))
# Vary failure counts more dramatically to show trends
for col in ['failed_multi_no_amd', 'failed_multi_no_nvidia', 'failed_single_no_amd', 'failed_single_no_nvidia']:
if col in date_df.columns:
original_val = date_df.loc[idx, col]
if pd.notna(original_val):
# Sometimes have more failures, sometimes fewer
variation = random.uniform(0.5, 2.0)
date_df.loc[idx, col] = max(0, int(original_val * variation))
historical_data.append(date_df)
current_dt += timedelta(days=1)
if not historical_data:
logger.warning("No fake historical data generated")
return pd.DataFrame()
# Combine all dataframes
combined_df = pd.concat(historical_data, ignore_index=False)
logger.info(f"Generated fake historical data: {len(combined_df)} records from {start_date} to {end_date}")
return combined_df
except Exception as e:
logger.error(f"Error generating fake historical data: {e}")
return pd.DataFrame()
def safe_extract(row: pd.DataFrame, key: str) -> int:
return int(row.get(key, 0)) if pd.notna(row.get(key, 0)) else 0
def find_failure_first_seen(historical_df: pd.DataFrame, model_name: str, test_name: str, device: str, gpu_type: str) -> Optional[str]:
"""
Find the first date when a specific test failure appeared in historical data.
"""
if historical_df.empty:
return None
try:
# Normalize model name to match DataFrame index
model_name_lower = model_name.lower()
# Filter historical data for this model
model_data = historical_df[historical_df.index == model_name_lower].copy()
if model_data.empty:
return None
# Sort by date (oldest first)
model_data = model_data.sort_values('date')
# Check each date for this failure
for idx, row in model_data.iterrows():
failures = row.get(f'failures_{device}', None)
if failures is None or pd.isna(failures):
continue
# Handle case where failures might be a string (JSON)
if isinstance(failures, str):
try:
import json
failures = json.loads(failures)
except:
continue
# Check if this test appears in the failures for this gpu_type
if gpu_type in failures:
for test in failures[gpu_type]:
test_line = test.get('line', '')
if test_line == test_name:
# Found the first occurrence
return row.get('date', None)
return None
except Exception as e:
logger.error(f"Error finding first seen date for {test_name}: {e}")
return None
def find_new_regressions(current_df: pd.DataFrame, historical_df: pd.DataFrame) -> list[dict]:
"""
Compare CURRENT failures against PREVIOUS day's failures to find NEW regressions.
A regression is a test that:
- Is failing in the CURRENT/LATEST run (current_df)
- Was NOT failing in the PREVIOUS run (yesterday in historical_df)
"""
if current_df.empty or historical_df.empty:
return []
new_regressions = []
# Get the most recent date from historical data (this is "yesterday")
available_dates = sorted(historical_df['date'].unique(), reverse=True)
if len(available_dates) < 1:
# No history to compare against
return []
yesterday_date = available_dates[0]
yesterday_data = historical_df[historical_df['date'] == yesterday_date]
# For each model in current data, compare against yesterday
for model_name in current_df.index:
model_name_lower = model_name.lower()
# Get CURRENT failures from current_df
current_row = current_df.loc[model_name]
# Get YESTERDAY's failures from historical_df
yesterday_row = yesterday_data[yesterday_data.index == model_name_lower]
yesterday_failures_amd = {}
yesterday_failures_nvidia = {}
if not yesterday_row.empty:
yesterday_row = yesterday_row.iloc[0]
yesterday_failures_amd = yesterday_row.get('failures_amd', {})
yesterday_failures_nvidia = yesterday_row.get('failures_nvidia', {})
# Handle string/dict conversion
if isinstance(yesterday_failures_amd, str):
try:
yesterday_failures_amd = json.loads(yesterday_failures_amd)
except:
yesterday_failures_amd = {}
if isinstance(yesterday_failures_nvidia, str):
try:
yesterday_failures_nvidia = json.loads(yesterday_failures_nvidia)
except:
yesterday_failures_nvidia = {}
# Get CURRENT failures
current_failures_amd = current_row.get('failures_amd', {})
current_failures_nvidia = current_row.get('failures_nvidia', {})
# Handle string/dict conversion
if isinstance(current_failures_amd, str):
try:
current_failures_amd = json.loads(current_failures_amd)
except:
current_failures_amd = {}
if isinstance(current_failures_nvidia, str):
try:
current_failures_nvidia = json.loads(current_failures_nvidia)
except:
current_failures_nvidia = {}
# Check AMD failures - find tests failing NOW but NOT yesterday
for gpu_type in ['single', 'multi']:
current_tests = current_failures_amd.get(gpu_type, [])
yesterday_tests = yesterday_failures_amd.get(gpu_type, [])
# Get test names
current_test_names = {test.get('line', '') for test in current_tests}
yesterday_test_names = {test.get('line', '') for test in yesterday_tests}
# Find NEW failures: failing NOW but NOT yesterday
new_tests = current_test_names - yesterday_test_names
for test_name in new_tests:
if test_name: # Skip empty names
new_regressions.append({
'model': model_name,
'test': test_name.split('::')[-1], # Short name
'test_full': test_name, # Full name
'device': 'amd',
'gpu_type': gpu_type
})
# Check NVIDIA failures - find tests failing NOW but NOT yesterday
for gpu_type in ['single', 'multi']:
current_tests = current_failures_nvidia.get(gpu_type, [])
yesterday_tests = yesterday_failures_nvidia.get(gpu_type, [])
# Get test names
current_test_names = {test.get('line', '') for test in current_tests}
yesterday_test_names = {test.get('line', '') for test in yesterday_tests}
# Find NEW failures: failing NOW but NOT yesterday
new_tests = current_test_names - yesterday_test_names
for test_name in new_tests:
if test_name: # Skip empty names
new_regressions.append({
'model': model_name,
'test': test_name.split('::')[-1], # Short name
'test_full': test_name, # Full name
'device': 'nvidia',
'gpu_type': gpu_type
})
return new_regressions
def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]:
"""Extract and process model data from DataFrame row."""
# Handle missing values and get counts directly from dataframe
success_nvidia = safe_extract(row, "success_nvidia")
success_amd = safe_extract(row, "success_amd")
skipped_nvidia = safe_extract(row, "skipped_nvidia")
skipped_amd = safe_extract(row, "skipped_amd")
failed_multi_amd = safe_extract(row, 'failed_multi_no_amd')
failed_multi_nvidia = safe_extract(row, 'failed_multi_no_nvidia')
failed_single_amd = safe_extract(row, 'failed_single_no_amd')
failed_single_nvidia = safe_extract(row, 'failed_single_no_nvidia')
# Calculate total failures
total_failed_amd = failed_multi_amd + failed_single_amd
total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia
# Create stats dictionaries directly from dataframe values
amd_stats = {
'passed': success_amd,
'failed': total_failed_amd,
'skipped': skipped_amd,
'error': 0 # Not available in this dataset
}
nvidia_stats = {
'passed': success_nvidia,
'failed': total_failed_nvidia,
'skipped': skipped_nvidia,
'error': 0 # Not available in this dataset
}
return amd_stats, nvidia_stats, failed_multi_amd, failed_single_amd, failed_multi_nvidia, failed_single_nvidia
class CIResults:
def __init__(self):
self.df = pd.DataFrame()
self.available_models = []
self.latest_update_msg = ""
self.available_dates = []
self.historical_df = pd.DataFrame()
self.all_historical_data = pd.DataFrame() # Store all historical data at startup
self.sample_data = False
def load_data(self) -> None:
"""Load data from the data source."""
# Try loading the distant data, and fall back on sample data for local tinkering
try:
logger.info("Loading distant data...")
new_df, latest_update_msg = get_distant_data()
self.latest_update_msg = latest_update_msg
self.available_dates = get_available_dates()
logger.info(f"Available dates: {len(self.available_dates)} dates")
if self.available_dates:
logger.info(f"Date range: {self.available_dates[-1]} to {self.available_dates[0]}")
else:
logger.warning("No available dates found")
self.available_dates = []
except Exception as e:
error_msg = [
"Loading data failed:",
"-" * 120,
traceback.format_exc(),
"-" * 120,
"Falling back on sample data."
]
logger.error("\n".join(error_msg))
self.sample_data = True
new_df, latest_update_msg = get_sample_data()
self.latest_update_msg = latest_update_msg
self.available_dates = None
# Update attributes
self.df = new_df
self.available_models = new_df.index.tolist()
# Load all historical data at startup
self.load_all_historical_data()
# Log and return distant load status
logger.info(f"Data loaded successfully: {len(self.available_models)} models")
logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}")
logger.info(f"Latest update message: {self.latest_update_msg}")
# Log a preview of the df
msg = {}
for model in self.available_models[:3]:
msg[model] = {}
for col in self.df.columns:
value = self.df.loc[model, col]
if not isinstance(value, int):
value = str(value)
if len(value) > 10:
value = value[:10] + "..."
msg[model][col] = value
logger.info(json.dumps(msg, indent=4))
def load_all_historical_data(self) -> None:
"""Load all available historical data at startup."""
try:
if not self.available_dates:
# Generate fake dates when no real dates are available
fake_dates = []
today = datetime.now()
for i in range(7):
date = today - timedelta(days=i)
fake_dates.append(date.strftime("%Y-%m-%d"))
self.available_dates = fake_dates
logger.info(f"No available dates found, generated {len(self.available_dates)} sample dates.")
logger.info(f"Loading all historical data for {len(self.available_dates)} dates...")
start_date = self.available_dates[-1] # Oldest date
end_date = self.available_dates[0] # Newest date
self.all_historical_data = get_historical_data(start_date, end_date, self.sample_data)
logger.info(f"All historical data loaded: {len(self.all_historical_data)} records")
except Exception as e:
logger.error(f"Error loading all historical data: {e}")
self.all_historical_data = pd.DataFrame()
def load_historical_data(self, start_date: str, end_date: str) -> None:
"""Load historical data for a date range from pre-loaded data."""
try:
logger.info(f"Filtering historical data from {start_date} to {end_date}")
if self.all_historical_data.empty:
logger.warning("No pre-loaded historical data available")
self.historical_df = pd.DataFrame()
return
# Filter the pre-loaded data by date range
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
# Filter data within the date range
filtered_data = []
for date_str in self.all_historical_data['date'].unique():
date_dt = datetime.strptime(date_str, "%Y-%m-%d")
if start_dt <= date_dt <= end_dt:
date_data = self.all_historical_data[self.all_historical_data['date'] == date_str]
filtered_data.append(date_data)
if filtered_data:
self.historical_df = pd.concat(filtered_data, ignore_index=False)
logger.info(f"Historical data filtered: {len(self.historical_df)} records for {start_date} to {end_date}")
else:
self.historical_df = pd.DataFrame()
logger.warning(f"No historical data found for date range {start_date} to {end_date}")
except Exception as e:
logger.error(f"Error filtering historical data: {e}")
self.historical_df = pd.DataFrame()
def schedule_data_reload(self):
"""Schedule the next data reload."""
def reload_data():
self.load_data()
# Schedule the next reload in 15 minutes (900 seconds)
timer = threading.Timer(900.0, reload_data)
timer.daemon = True # Dies when main thread dies
timer.start()
logger.info("Next data reload scheduled in 15 minutes")
# Start the first reload timer
timer = threading.Timer(900.0, reload_data)
timer.daemon = True
timer.start()
logger.info("Data auto-reload scheduled every 15 minutes")