Streamlitstock / app.py
Method314's picture
Update app.py
8d93a7c verified
import streamlit as st
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
import requests
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
from catboost import CatBoostRegressor
import shap
import ta
import matplotlib.pyplot as plt
import warnings
import openai
warnings.filterwarnings('ignore')
# Initialize the OpenAI client
OPENAI_API_KEY = "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" # Replace with your actual OpenAI API key
openai.api_key = OPENAI_API_KEY
# Alpha Vantage API key
ALPHA_VANTAGE_API_KEY = "JK0DVDNTEYBTBP5L"
# GPT Assistant ID
ASSISTANT_ID = "asst_Fl3rRrRijb8FJDpqjBexfUBp"
# Custom CSS
st.markdown("""
<style>
.reportview-container {
background: linear-gradient(to bottom right, #10161e, #1f2937);
}
.main .block-container {
padding-top: 2rem;
padding-bottom: 2rem;
}
h1, h2, h3 {
color: #3db892;
}
.stButton > button {
color: white;
background-color: #3db892;
border-radius: 5px;
border: none;
padding: 0.5rem 1rem;
font-weight: bold;
transition: all 0.3s ease 0s;
}
.stButton > button:hover {
background-color: #2c8d6f;
}
.stTextInput > div > div > input,
.stDateInput > div > div > input {
background-color: #1f2937;
color: white;
border: 1px solid #3db892;
}
.stPlotlyChart {
background-color: #1f2937;
border-radius: 5px;
padding: 10px;
}
.css-1d391kg {
background-color: #1f2937;
}
.stDataFrame {
background-color: #1f2937;
}
.stTable {
background-color: #1f2937;
}
.css-1s0xp3b {
background-color: #1f2937;
border: 1px solid #3db892;
border-radius: 5px;
}
</style>
""", unsafe_allow_html=True)
def get_financial_data(ticker, end_date):
base_url = "https://www.alphavantage.co/query"
functions = ['INCOME_STATEMENT', 'BALANCE_SHEET', 'CASH_FLOW']
data = {}
for function in functions:
params = {
"function": function,
"symbol": ticker,
"apikey": ALPHA_VANTAGE_API_KEY
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
data[function] = response.json()
else:
raise Exception(f"Failed to fetch {function} data: {response.status_code}")
for function, content in data.items():
if 'quarterlyReports' in content:
content['quarterlyReports'] = [
report for report in content['quarterlyReports']
if datetime.strptime(report['fiscalDateEnding'], '%Y-%m-%d').date() <= end_date
]
if 'annualReports' in content:
content['annualReports'] = [
report for report in content['annualReports']
if datetime.strptime(report['fiscalDateEnding'], '%Y-%m-%d').date() <= end_date
]
return data
def get_earnings_dates(ticker):
url = f"https://www.alphavantage.co/query?function=EARNINGS&symbol={ticker}&apikey={ALPHA_VANTAGE_API_KEY}"
response = requests.get(url)
data = response.json()
earnings_dates = {}
for report in data.get('quarterlyEarnings', []):
fiscal_date = report['fiscalDateEnding']
reported_date = report['reportedDate']
earnings_dates[fiscal_date] = reported_date
return earnings_dates
def get_earnings_data(ticker):
url = f"https://www.alphavantage.co/query?function=EARNINGS&symbol={ticker}&apikey={ALPHA_VANTAGE_API_KEY}"
response = requests.get(url)
data = response.json()
quarterly_earnings = data.get('quarterlyEarnings', [])
df = pd.DataFrame(quarterly_earnings)
df['fiscalDateEnding'] = pd.to_datetime(df['fiscalDateEnding'])
df['reportedDate'] = pd.to_datetime(df['reportedDate'])
df = df.set_index('reportedDate')
numeric_columns = ['reportedEPS', 'estimatedEPS', 'surprise', 'surprisePercentage']
for col in numeric_columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
def process_financial_data(data, earnings_dates, earnings_data):
quarterly_data = {}
for statement_type, statement_data in data.items():
if 'quarterlyReports' in statement_data:
for report in statement_data['quarterlyReports']:
fiscal_date = report['fiscalDateEnding']
release_date = earnings_dates.get(fiscal_date, fiscal_date)
if release_date not in quarterly_data:
quarterly_data[release_date] = {}
quarterly_data[release_date].update({f"{statement_type}_{k}": v for k, v in report.items()})
df = pd.DataFrame.from_dict(quarterly_data, orient='index')
df.index = pd.to_datetime(df.index)
df = df.sort_index()
df = df.join(earnings_data, how='left')
for col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
def get_stock_data(ticker, start_date, end_date):
df = yf.download(ticker, start=start_date, end=end_date)
df['Price_Pct_Change'] = df['Close'].pct_change()
df['RSI'] = ta.momentum.RSIIndicator(df['Close']).rsi()
df['WILLR'] = ta.momentum.WilliamsRIndicator(df['High'], df['Low'], df['Close']).williams_r()
bb = ta.volatility.BollingerBands(df['Close'])
df['BB_upper'] = bb.bollinger_hband()
df['BB_middle'] = bb.bollinger_mavg()
df['BB_lower'] = bb.bollinger_lband()
df['OBV'] = ta.volume.OnBalanceVolumeIndicator(df['Close'], df['Volume']).on_balance_volume()
df['ATR'] = ta.volatility.AverageTrueRange(df['High'], df['Low'], df['Close']).average_true_range()
df['MACD'] = ta.trend.MACD(df['Close']).macd()
df['ADX'] = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close']).adx()
df['CCI'] = ta.trend.CCIIndicator(df['High'], df['Low'], df['Close']).cci()
indicator_columns = ['RSI', 'WILLR', 'BB_upper', 'BB_middle', 'BB_lower', 'OBV', 'ATR', 'MACD', 'ADX', 'CCI']
for column in indicator_columns:
df[f'{column}_ROC'] = df[column].pct_change()
return df
def add_financial_ratios(X):
def safe_divide(a, b):
return np.where(b != 0, a / b, np.nan)
X['PE_Ratio'] = safe_divide(X['BALANCE_SHEET_totalShareholderEquity'], X['INCOME_STATEMENT_netIncome'])
X['PB_Ratio'] = safe_divide(X['BALANCE_SHEET_totalAssets'], X['BALANCE_SHEET_totalShareholderEquity'])
X['Debt_to_Equity'] = safe_divide(X['BALANCE_SHEET_totalLiabilities'], X['BALANCE_SHEET_totalShareholderEquity'])
X['ROE'] = safe_divide(X['INCOME_STATEMENT_netIncome'], X['BALANCE_SHEET_totalShareholderEquity'])
X['ROA'] = safe_divide(X['INCOME_STATEMENT_netIncome'], X['BALANCE_SHEET_totalAssets'])
return X
def prepare_data(quarterly_df, stock_df, end_date):
quarterly_df.index = pd.to_datetime(quarterly_df.index).date
stock_df.index = pd.to_datetime(stock_df.index).date
quarterly_df = quarterly_df[quarterly_df.index <= end_date]
stock_df = stock_df[stock_df.index <= end_date]
start_date = min(quarterly_df.index.min(), stock_df.index.min())
all_dates = pd.date_range(start=start_date, end=end_date, freq='D').date
quarterly_df_reindexed = quarterly_df.reindex(all_dates).ffill()
stock_df_reindexed = stock_df.reindex(all_dates).ffill()
merged_df = pd.concat([stock_df_reindexed['Close'], quarterly_df_reindexed], axis=1)
merged_df = merged_df.dropna(subset=['Close'])
if merged_df.empty:
raise ValueError("No overlapping data between stock prices and financial statements.")
X = merged_df.drop('Close', axis=1)
y = merged_df['Close']
X = X.fillna(X.mean())
X['EPS_Surprise'] = X['reportedEPS'] - X['estimatedEPS']
X['EPS_Surprise_Percentage'] = X['surprisePercentage']
X = add_financial_ratios(X)
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = pd.DataFrame(scaler_X.fit_transform(X), columns=X.columns, index=X.index)
y_scaled = pd.Series(scaler_y.fit_transform(y.values.reshape(-1, 1)).flatten(), index=y.index)
return X_scaled, y_scaled, merged_df.index, scaler_X, scaler_y
def train_catboost_model(X_train, X_test, y_train, y_test):
model = CatBoostRegressor(
iterations=1000,
learning_rate=0.1,
depth=6,
loss_function='RMSE',
random_state=42,
verbose=100
)
model.fit(X_train, y_train, eval_set=(X_test, y_test), early_stopping_rounds=50)
return model
def evaluate_model(model, X_test, y_test, scaler_y):
y_pred_scaled = model.predict(X_test)
y_pred = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
y_test_unscaled = scaler_y.inverse_transform(y_test.values.reshape(-1, 1)).flatten()
mse = mean_squared_error(y_test_unscaled, y_pred)
r2 = r2_score(y_test_unscaled, y_pred)
return r2
def conformal_prediction(model, X_train, y_train, X_test, scaler_y, alpha=0.1):
model.fit(X_train, y_train)
y_pred_train = model.predict(X_train)
y_pred_train_unscaled = scaler_y.inverse_transform(y_pred_train.reshape(-1, 1)).flatten()
y_train_unscaled = scaler_y.inverse_transform(y_train.values.reshape(-1, 1)).flatten()
relative_errors = np.abs((y_train_unscaled - y_pred_train_unscaled) / y_pred_train_unscaled)
error_threshold = np.percentile(relative_errors, (1 - alpha) * 100)
y_pred_test = model.predict(X_test)
y_pred_test_unscaled = scaler_y.inverse_transform(y_pred_test.reshape(-1, 1)).flatten()
lower_bound_unscaled = y_pred_test_unscaled * (1 - error_threshold)
upper_bound_unscaled = y_pred_test_unscaled * (1 + error_threshold)
return y_pred_test_unscaled, lower_bound_unscaled, upper_bound_unscaled
def plot_results(dates, y, fair_values, lower_bound, upper_bound, scaler_y):
y_unscaled = scaler_y.inverse_transform(y.values.reshape(-1, 1)).flatten()
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02, row_heights=[0.7, 0.3])
fig.add_trace(go.Scatter(x=dates, y=y_unscaled, mode='lines', name='Actual Price', line=dict(color='blue')), row=1, col=1)
fig.add_trace(go.Scatter(x=dates, y=fair_values, mode='lines', name='Fair Value', line=dict(color='red')), row=1, col=1)
fig.add_trace(go.Scatter(x=dates, y=upper_bound, mode='lines', name='Upper Bound', line=dict(color='gray', width=0)), row=1, col=1)
fig.add_trace(go.Scatter(x=dates, y=lower_bound, mode='lines', name='Lower Bound', line=dict(color='gray', width=0), fill='tonexty'), row=1, col=1)
percent_error = ((fair_values - y_unscaled) / y_unscaled) * 100
fig.add_trace(go.Scatter(x=dates, y=percent_error, mode='lines', name='Percent Error', line=dict(color='purple')), row=2, col=1)
fig.update_layout(height=800, title_text="Stock Price, Fair Value, and Percent Error")
fig.update_xaxes(title_text="Date", row=2, col=1)
fig.update_yaxes(title_text="Price", row=1, col=1)
fig.update_yaxes(title_text="Percent Error", row=2, col=1)
return fig
def get_monthly_seasonality(ticker, start_date, end_date):
data = yf.download(ticker, start=start_date, end=end_date)
monthly_data = data['Adj Close'].resample('M').last()
monthly_returns = monthly_data.pct_change()
monthly_returns = monthly_returns.to_frame()
monthly_returns['Month'] = monthly_returns.index.month
seasonality = monthly_returns.groupby('Month')['Adj Close'].agg(['mean', 'median', 'count', lambda x: (x > 0).mean()])
seasonality.columns = ['Mean Change%', 'Median Change%', 'Count', 'Positive Periods']
return seasonality
def plot_monthly_seasonality(seasonality, ticker, start_date, end_date):
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
fig = go.Figure()
fig.add_trace(go.Bar(
x=months,
y=seasonality['Positive Periods'] * 100,
name='Positive Periods',
marker_color=['green' if x > 0.5 else 'red' for x in seasonality['Positive Periods']],
text=[f"{seasonality['Positive Periods'][i]*100:.1f}%<br>{seasonality['Mean Change%'][i]*100:.2f}%" for i in range(1, 13)],
textposition='auto'
))
fig.add_trace(go.Scatter(
x=months,
y=seasonality['Mean Change%'] * 100,
name='Mean Change%',
mode='lines+markers',
line=dict(color='yellow', width=2)
))
fig.update_layout(
title=f'Monthly Seasonality for {ticker}<br>{start_date} to {end_date}',
xaxis_title='Month',
yaxis_title='Percentage',
template='plotly_dark',
showlegend=True,
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
height=600,
margin=dict(l=50, r=50, t=100, b=50)
)
fig.add_hline(y=50, line_dash="dash", line_color="gray")
fig.add_hline(y=0, line_dash="dash", line_color="gray")
fig.update_yaxes(ticksuffix="%", range=[0, 100])
return fig
def prepare_financial_data_for_gpt(financial_data):
def format_financial_data(data, report_type):
formatted_data = f"{report_type} (Last 5 Years):\n"
if report_type in data:
reports = data[report_type].get('annualReports', [])[:5]
for report in reports:
formatted_data += f"Fiscal Date Ending: {report.get('fiscalDateEnding', 'N/A')}\n"
for key, value in report.items():
if key != 'fiscalDateEnding':
formatted_data += f"{key}: {value}\n"
formatted_data += "\n"
return formatted_data
income_statement = format_financial_data(financial_data, 'INCOME_STATEMENT')
balance_sheet = format_financial_data(financial_data, 'BALANCE_SHEET')
cash_flow = format_financial_data(financial_data, 'CASH_FLOW')
return f"{income_statement}\n{balance_sheet}\n{cash_flow}"
def get_gpt_analysis(ticker, financial_data):
formatted_data = prepare_financial_data_for_gpt(financial_data)
prompt = f"Analyze the following financial data for {ticker} and provide insights:\n\n{formatted_data}"
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a financial analyst."},
{"role": "user", "content": prompt}
],
max_tokens=500,
n=1,
stop=None,
temperature=0.5,
)
analysis = response.choices[0].message['content'].strip()
return analysis
except Exception as e:
st.error(f"OpenAI API error: {e}")
return "GPT Assistant analysis failed. Please check the API integration."
def plot_interactive_logarithmic_stock_chart(ticker, start_date, end_date):
stock = yf.Ticker(ticker)
data = stock.history(start=start_date, end=end_date)
x = (data.index - data.index[0]).days
y = np.log(data['Close'])
slope, intercept = np.polyfit(x, y, 1)
future_days = 365 * 10
all_days = np.arange(len(x) + future_days)
log_trend = np.exp(intercept + slope * all_days)
inner_upper_band = log_trend * 2
inner_lower_band = log_trend / 2
outer_upper_band = log_trend * 4
outer_lower_band = log_trend / 4
extended_dates = pd.date_range(start=data.index[0], periods=len(all_days), freq='D')
fig = go.Figure()
fig.add_trace(go.Scatter(x=data.index, y=data['Close'], mode='lines', name='Close Price', line=dict(color='blue')))
fig.add_trace(go.Scatter(x=extended_dates, y=log_trend, mode='lines', name='Log Trend', line=dict(color='red')))
fig.add_trace(go.Scatter(x=extended_dates, y=inner_upper_band, mode='lines', name='Inner Upper Band', line=dict(color='green')))
fig.add_trace(go.Scatter(x=extended_dates, y=inner_lower_band, mode='lines', name='Inner Lower Band', line=dict(color='green')))
fig.add_trace(go.Scatter(x=extended_dates, y=outer_upper_band, mode='lines', name='Outer Upper Band', line=dict(color='orange')))
fig.add_trace(go.Scatter(x=extended_dates, y=outer_lower_band, mode='lines', name='Outer Lower Band', line=dict(color='orange')))
fig.update_layout(
title=f'{ticker} Stock Price (Logarithmic Scale) with Extended Trend Lines and Outer Bands',
xaxis_title='Date',
yaxis_title='Price (Log Scale)',
yaxis_type="log",
legend=dict(x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.8)'),
hovermode='x unified',
height=800
)
fig.update_xaxes(
rangeslider_visible=True,
rangeselector=dict(
buttons=list([
dict(count=1, label="1m", step="month", stepmode="backward"),
dict(count=6, label="6m", step="month", stepmode="backward"),
dict(count=1, label="YTD", step="year", stepmode="todate"),
dict(count=1, label="1y", step="year", stepmode="backward"),
dict(step="all")
])
)
)
return fig
def analyze_stock(ticker, start_date, end_date, use_ai_assistant):
try:
financial_data = get_financial_data(ticker, end_date)
earnings_dates = get_earnings_dates(ticker)
earnings_data = get_earnings_data(ticker)
quarterly_df = process_financial_data(financial_data, earnings_dates, earnings_data)
stock_df = get_stock_data(ticker, start_date, end_date)
if quarterly_df.empty:
st.error("No financial data available for processing.")
return None
X_scaled, y_scaled, dates, scaler_X, scaler_y = prepare_data(quarterly_df, stock_df, end_date)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42)
model = train_catboost_model(X_train, X_test, y_train, y_test)
r2 = evaluate_model(model, X_test, y_test, scaler_y)
if r2 < 0.5:
st.warning("Model performance is poor. Results may not be reliable.")
fair_values, lower_bound, upper_bound = conformal_prediction(model, X_train, y_train, X_scaled, scaler_y)
fig = plot_results(dates, y_scaled, fair_values, lower_bound, upper_bound, scaler_y)
feature_importance = model.feature_importances_
feature_importance_df = pd.DataFrame({'feature': X_scaled.columns, 'importance': feature_importance})
feature_importance_df = feature_importance_df.sort_values('importance', ascending=False)
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_scaled)
shap_fig, ax = plt.subplots(figsize=(10, 6))
shap.summary_plot(shap_values, X_scaled, plot_type="bar", show=False)
plt.title("SHAP Feature Importance")
plt.tight_layout()
seasonality = get_monthly_seasonality(ticker, start_date, end_date)
seasonality_fig = plot_monthly_seasonality(seasonality, ticker, start_date, end_date)
log_chart = plot_interactive_logarithmic_stock_chart(ticker, start_date, end_date)
gpt_analysis = get_gpt_analysis(ticker, financial_data) if use_ai_assistant else "AI assistant analysis not requested."
latest_close = stock_df['Close'].iloc[-1]
latest_fair_value = fair_values[-1]
latest_lower_bound = lower_bound[-1]
latest_upper_bound = upper_bound[-1]
percentage_change = ((latest_fair_value - latest_close) / latest_close) * 100
fair_price_html = f"""
<h2 style="margin-bottom: 15px;">Fair Price Analysis</h2>
<p><strong>Current Price:</strong> ${latest_close:.2f}</p>
<p><strong>Estimated Fair Value:</strong> ${latest_fair_value:.2f}</p>
<p><strong>Price Prediction Range:</strong> ${latest_lower_bound:.2f} to ${latest_upper_bound:.2f}</p>
<p><strong>R-squared Score:</strong> {r2:.4f}</p>
<h3 style="margin-top: 20px;">Top 10 most important features for fair value prediction:</h3>
<pre>{feature_importance_df.head(10).to_string(index=False)}</pre>
"""
current_month = datetime.now().month
next_month = (current_month % 12) + 1
current_month_return = seasonality.loc[current_month, 'Mean Change%'] * 100
next_month_return = seasonality.loc[next_month, 'Mean Change%'] * 100
current_month_win_rate = seasonality.loc[current_month, 'Positive Periods'] * 100
next_month_win_rate = seasonality.loc[next_month, 'Positive Periods'] * 100
seasonality_html = f"""
<h2 style="margin-bottom: 15px;">Seasonality Analysis ({start_date} to {end_date})</h2>
<h3>Current month ({datetime.now().strftime('%B')}):</h3>
<p>Average return: {current_month_return:.2f}%</p>
<p>Probability of positive return: {current_month_win_rate:.1f}%</p>
<h3>Next month ({(datetime.now() + timedelta(days=31)).strftime('%B')}):</h3>
<p>Average return: {next_month_return:.2f}%</p>
<p>Probability of positive return: {next_month_win_rate:.1f}%</p>
"""
return {
'fair_price_html': fair_price_html,
'fig': fig,
'shap_fig': shap_fig,
'seasonality_fig': seasonality_fig,
'seasonality_html': seasonality_html,
'gpt_analysis': gpt_analysis,
'log_chart': log_chart,
'feature_importance_df': feature_importance_df.head(10),
'percentage_change': percentage_change
}
except Exception as e:
st.error(f"An error occurred: {str(e)}")
return None
def main():
st.title("Advanced Stock Analysis App")
st.markdown("Enter a stock ticker and date range to perform comprehensive stock analysis.")
col1, col2, col3, col4 = st.columns([2,2,2,1])
with col1:
ticker = st.text_input("Stock Ticker", value="MSFT")
with col2:
start_date = st.date_input("Start Date", value=datetime(2015, 1, 1))
with col3:
end_date = st.date_input("End Date", value=datetime.now())
with col4:
use_ai_assistant = st.checkbox("Use AI Assistant")
if st.button("Analyze Stock", key="analyze_button"):
with st.spinner('Analyzing stock data...'):
results = analyze_stock(ticker, start_date, end_date, use_ai_assistant)
if results:
st.header("Fair Price Analysis")
st.markdown(results['fair_price_html'], unsafe_allow_html=True)
st.subheader("Fair Price Prediction")
st.plotly_chart(results['fig'], use_container_width=True)
col1, col2 = st.columns(2)
with col1:
st.subheader("SHAP Feature Importance")
st.pyplot(results['shap_fig'])
with col2:
st.subheader("Top 10 Important Features")
st.dataframe(results['feature_importance_df'], height=400)
st.subheader("Monthly Seasonality")
st.plotly_chart(results['seasonality_fig'], use_container_width=True)
st.markdown(results['seasonality_html'], unsafe_allow_html=True)
if results['gpt_analysis'] != "AI assistant analysis not requested.":
st.subheader("AI Assistant Analysis")
st.text_area("Analysis", value=results['gpt_analysis'], height=300)
st.subheader("Logarithmic Stock Chart")
st.plotly_chart(results['log_chart'], use_container_width=True)
if __name__ == "__main__":
main()