Spaces:
Running
Running
优化应用初始化和重试机制,添加健康检查接口
Browse files- app.py +28 -5
- us_stock.py +42 -12
app.py
CHANGED
|
@@ -1,4 +1,5 @@
|
|
| 1 |
import os
|
|
|
|
| 2 |
from fastapi import FastAPI
|
| 3 |
from pydantic import BaseModel
|
| 4 |
from fastapi.middleware.cors import CORSMiddleware
|
|
@@ -15,21 +16,35 @@ initialization_lock = asyncio.Lock()
|
|
| 15 |
@asynccontextmanager
|
| 16 |
async def lifespan(app: FastAPI):
|
| 17 |
# 启动时运行
|
|
|
|
| 18 |
global is_initialized
|
| 19 |
async with initialization_lock:
|
| 20 |
if not is_initialized:
|
| 21 |
-
|
|
|
|
| 22 |
is_initialized = True
|
|
|
|
| 23 |
yield
|
| 24 |
# 关闭时运行
|
|
|
|
| 25 |
# cleanup_code_here()
|
| 26 |
|
| 27 |
async def initialize_application():
|
| 28 |
# 在这里进行所有需要的初始化
|
| 29 |
-
|
| 30 |
-
|
| 31 |
-
|
| 32 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 33 |
|
| 34 |
app = FastAPI(lifespan=lifespan)
|
| 35 |
|
|
@@ -90,4 +105,12 @@ async def predict(request: PredictRequest):
|
|
| 90 |
async def root():
|
| 91 |
return {"message": "Welcome to the API. Use /api/aaa or /api/bbb for processing."}
|
| 92 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 93 |
|
|
|
|
| 1 |
import os
|
| 2 |
+
from datetime import datetime
|
| 3 |
from fastapi import FastAPI
|
| 4 |
from pydantic import BaseModel
|
| 5 |
from fastapi.middleware.cors import CORSMiddleware
|
|
|
|
| 16 |
@asynccontextmanager
|
| 17 |
async def lifespan(app: FastAPI):
|
| 18 |
# 启动时运行
|
| 19 |
+
print("===== Application Startup at", datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "=====")
|
| 20 |
global is_initialized
|
| 21 |
async with initialization_lock:
|
| 22 |
if not is_initialized:
|
| 23 |
+
# 非阻塞初始化 - 在后台任务中执行
|
| 24 |
+
asyncio.create_task(initialize_application())
|
| 25 |
is_initialized = True
|
| 26 |
+
print("===== FastAPI Application Ready =====")
|
| 27 |
yield
|
| 28 |
# 关闭时运行
|
| 29 |
+
print("===== Application Shutdown =====")
|
| 30 |
# cleanup_code_here()
|
| 31 |
|
| 32 |
async def initialize_application():
|
| 33 |
# 在这里进行所有需要的初始化
|
| 34 |
+
print("===== Starting application initialization =====")
|
| 35 |
+
try:
|
| 36 |
+
from us_stock import fetch_symbols
|
| 37 |
+
print("Importing us_stock module...")
|
| 38 |
+
|
| 39 |
+
print("Calling fetch_symbols...")
|
| 40 |
+
await fetch_symbols()
|
| 41 |
+
print("fetch_symbols completed")
|
| 42 |
+
|
| 43 |
+
print("===== Application initialization completed =====")
|
| 44 |
+
except Exception as e:
|
| 45 |
+
print(f"Error during initialization: {e}")
|
| 46 |
+
print("===== Application initialization failed =====")
|
| 47 |
+
raise
|
| 48 |
|
| 49 |
app = FastAPI(lifespan=lifespan)
|
| 50 |
|
|
|
|
| 105 |
async def root():
|
| 106 |
return {"message": "Welcome to the API. Use /api/aaa or /api/bbb for processing."}
|
| 107 |
|
| 108 |
+
@app.get("/health")
|
| 109 |
+
async def health_check():
|
| 110 |
+
return {
|
| 111 |
+
"status": "healthy",
|
| 112 |
+
"initialized": is_initialized,
|
| 113 |
+
"timestamp": datetime.now().isoformat()
|
| 114 |
+
}
|
| 115 |
+
|
| 116 |
|
us_stock.py
CHANGED
|
@@ -57,27 +57,51 @@ def fetch_stock_us_spot_data_with_retries():
|
|
| 57 |
|
| 58 |
|
| 59 |
async def fetch_stock_us_spot_data_with_retries_async():
|
| 60 |
-
retry_intervals = [10, 20
|
| 61 |
retry_index = 0
|
|
|
|
| 62 |
|
| 63 |
-
|
| 64 |
try:
|
| 65 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 66 |
return symbols
|
|
|
|
|
|
|
| 67 |
except Exception as e:
|
| 68 |
-
print(f"Error fetching data: {e}")
|
| 69 |
-
|
|
|
|
|
|
|
| 70 |
print(f"Retrying in {wait_time} seconds...")
|
| 71 |
await asyncio.sleep(wait_time)
|
| 72 |
-
retry_index
|
|
|
|
|
|
|
|
|
|
|
|
|
| 73 |
|
| 74 |
symbols = None
|
| 75 |
|
| 76 |
async def fetch_symbols():
|
| 77 |
global symbols
|
| 78 |
-
|
| 79 |
-
|
| 80 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 81 |
|
| 82 |
|
| 83 |
# 全局变量
|
|
@@ -89,19 +113,25 @@ index_us_stock_index_NDX = None
|
|
| 89 |
def update_stock_indices():
|
| 90 |
global index_us_stock_index_INX, index_us_stock_index_DJI, index_us_stock_index_IXIC, index_us_stock_index_NDX
|
| 91 |
try:
|
|
|
|
| 92 |
index_us_stock_index_INX = ak.index_us_stock_sina(symbol=".INX")
|
| 93 |
index_us_stock_index_DJI = ak.index_us_stock_sina(symbol=".DJI")
|
| 94 |
index_us_stock_index_IXIC = ak.index_us_stock_sina(symbol=".IXIC")
|
| 95 |
index_us_stock_index_NDX = ak.index_us_stock_sina(symbol=".NDX")
|
| 96 |
-
print("Stock indices updated")
|
| 97 |
except Exception as e:
|
| 98 |
print(f"Error updating stock indices: {e}")
|
| 99 |
|
| 100 |
# 设置定时器,每隔12小时更新一次
|
| 101 |
threading.Timer(12 * 60 * 60, update_stock_indices).start()
|
| 102 |
|
| 103 |
-
#
|
| 104 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 105 |
|
| 106 |
|
| 107 |
# 创建列名转换的字典
|
|
|
|
| 57 |
|
| 58 |
|
| 59 |
async def fetch_stock_us_spot_data_with_retries_async():
|
| 60 |
+
retry_intervals = [10, 20] # 减少重试次数
|
| 61 |
retry_index = 0
|
| 62 |
+
max_retries = 2 # 最多重试2次
|
| 63 |
|
| 64 |
+
for attempt in range(max_retries + 1):
|
| 65 |
try:
|
| 66 |
+
# 添加30秒超时
|
| 67 |
+
symbols = await asyncio.wait_for(
|
| 68 |
+
asyncio.to_thread(ak.stock_us_spot_em),
|
| 69 |
+
timeout=30.0
|
| 70 |
+
)
|
| 71 |
return symbols
|
| 72 |
+
except asyncio.TimeoutError:
|
| 73 |
+
print(f"Timeout error fetching data (attempt {attempt + 1}/{max_retries + 1})")
|
| 74 |
except Exception as e:
|
| 75 |
+
print(f"Error fetching data (attempt {attempt + 1}/{max_retries + 1}): {e}")
|
| 76 |
+
|
| 77 |
+
if attempt < max_retries:
|
| 78 |
+
wait_time = retry_intervals[min(retry_index, len(retry_intervals) - 1)]
|
| 79 |
print(f"Retrying in {wait_time} seconds...")
|
| 80 |
await asyncio.sleep(wait_time)
|
| 81 |
+
retry_index += 1
|
| 82 |
+
|
| 83 |
+
# 如果所有重试都失败,返回空数据
|
| 84 |
+
print("All retries failed, returning empty data")
|
| 85 |
+
return pd.DataFrame()
|
| 86 |
|
| 87 |
symbols = None
|
| 88 |
|
| 89 |
async def fetch_symbols():
|
| 90 |
global symbols
|
| 91 |
+
try:
|
| 92 |
+
print("Starting symbols initialization...")
|
| 93 |
+
# 异步获取数据
|
| 94 |
+
symbols = await fetch_stock_us_spot_data_with_retries_async()
|
| 95 |
+
if symbols is not None and not symbols.empty:
|
| 96 |
+
print(f"Symbols initialized successfully: {len(symbols)} symbols loaded")
|
| 97 |
+
else:
|
| 98 |
+
print("Symbols initialization failed, using empty dataset")
|
| 99 |
+
symbols = pd.DataFrame()
|
| 100 |
+
except Exception as e:
|
| 101 |
+
print(f"Error in fetch_symbols: {e}")
|
| 102 |
+
symbols = pd.DataFrame()
|
| 103 |
+
finally:
|
| 104 |
+
print("Symbols initialization completed")
|
| 105 |
|
| 106 |
|
| 107 |
# 全局变量
|
|
|
|
| 113 |
def update_stock_indices():
|
| 114 |
global index_us_stock_index_INX, index_us_stock_index_DJI, index_us_stock_index_IXIC, index_us_stock_index_NDX
|
| 115 |
try:
|
| 116 |
+
print("Starting stock indices update...")
|
| 117 |
index_us_stock_index_INX = ak.index_us_stock_sina(symbol=".INX")
|
| 118 |
index_us_stock_index_DJI = ak.index_us_stock_sina(symbol=".DJI")
|
| 119 |
index_us_stock_index_IXIC = ak.index_us_stock_sina(symbol=".IXIC")
|
| 120 |
index_us_stock_index_NDX = ak.index_us_stock_sina(symbol=".NDX")
|
| 121 |
+
print("Stock indices updated successfully")
|
| 122 |
except Exception as e:
|
| 123 |
print(f"Error updating stock indices: {e}")
|
| 124 |
|
| 125 |
# 设置定时器,每隔12小时更新一次
|
| 126 |
threading.Timer(12 * 60 * 60, update_stock_indices).start()
|
| 127 |
|
| 128 |
+
# 程序开始时不立即更新,而是延迟启动
|
| 129 |
+
def start_indices_update():
|
| 130 |
+
"""延迟启动股票指数更新,避免阻塞应用启动"""
|
| 131 |
+
threading.Timer(60, update_stock_indices).start() # 60秒后开始第一次更新
|
| 132 |
+
|
| 133 |
+
# 延迟启动股票指数更新
|
| 134 |
+
start_indices_update()
|
| 135 |
|
| 136 |
|
| 137 |
# 创建列名转换的字典
|