|
|
import os, io, math, json, traceback, warnings |
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
from typing import List, Tuple, Dict, Optional |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import matplotlib.pyplot as plt |
|
|
from PIL import Image |
|
|
import gradio as gr |
|
|
import requests |
|
|
import yfinance as yf |
|
|
|
|
|
from sentence_transformers import SentenceTransformer, util as st_util |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DATA_DIR = "data" |
|
|
os.makedirs(DATA_DIR, exist_ok=True) |
|
|
|
|
|
DEFAULT_LOOKBACK_YEARS = 5 |
|
|
MAX_TICKERS = 30 |
|
|
MARKET_TICKER = "VOO" |
|
|
BILLS_TICKER = "BILLS" |
|
|
|
|
|
EMBED_MODEL_NAME = "BAAI/bge-base-en-v1.5" |
|
|
|
|
|
POS_COLS = ["ticker", "amount_usd", "weight_exposure", "beta"] |
|
|
SUG_COLS = ["ticker", "weight_%", "amount_$"] |
|
|
EFF_COLS = ["asset", "weight_%", "amount_$"] |
|
|
|
|
|
N_SYNTH = 1000 |
|
|
MMR_K = 40 |
|
|
MMR_LAMBDA = 0.65 |
|
|
|
|
|
DEBUG = True |
|
|
|
|
|
|
|
|
FRED_MAP = [ |
|
|
(1, "DGS1"), |
|
|
(2, "DGS2"), |
|
|
(3, "DGS3"), |
|
|
(5, "DGS5"), |
|
|
(7, "DGS7"), |
|
|
(10, "DGS10"), |
|
|
(20, "DGS20"), |
|
|
(30, "DGS30"), |
|
|
(100,"DGS30"), |
|
|
] |
|
|
|
|
|
def fred_series_for_horizon(years: float) -> str: |
|
|
y = max(1.0, min(100.0, float(years))) |
|
|
for cutoff, code in FRED_MAP: |
|
|
if y <= cutoff: |
|
|
return code |
|
|
return "DGS30" |
|
|
|
|
|
def fetch_fred_yield_annual(code: str) -> float: |
|
|
url = f"https://fred.stlouisfed.org/graph/fredgraph.csv?id={code}" |
|
|
try: |
|
|
r = requests.get(url, timeout=10) |
|
|
r.raise_for_status() |
|
|
df = pd.read_csv(io.StringIO(r.text)) |
|
|
s = pd.to_numeric(df.iloc[:, 1], errors="coerce").dropna() |
|
|
return float(s.iloc[-1] / 100.0) if len(s) else 0.03 |
|
|
except Exception: |
|
|
return 0.03 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _to_cols_close(df: pd.DataFrame, tickers: List[str]) -> pd.DataFrame: |
|
|
""" |
|
|
Coerce yfinance download to single-level columns of closes/adj closes. |
|
|
Handles Series, single-level, and MultiIndex frames safely. |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return pd.DataFrame() |
|
|
|
|
|
|
|
|
if isinstance(df, pd.Series): |
|
|
df = df.to_frame("Close") |
|
|
|
|
|
|
|
|
if isinstance(df.columns, pd.MultiIndex): |
|
|
fields = df.columns.get_level_values(1).unique().tolist() |
|
|
field = "Adj Close" if "Adj Close" in fields else ("Close" if "Close" in fields else fields[0]) |
|
|
out = {} |
|
|
for t in dict.fromkeys(tickers): |
|
|
col = (t, field) |
|
|
if col in df.columns: |
|
|
out[t] = pd.to_numeric(df[col], errors="coerce") |
|
|
return pd.DataFrame(out) |
|
|
|
|
|
|
|
|
if "Adj Close" in df.columns: |
|
|
col = pd.to_numeric(df["Adj Close"], errors="coerce") |
|
|
col.name = tickers[0] if tickers else "SINGLE" |
|
|
return col.to_frame() |
|
|
if "Close" in df.columns: |
|
|
col = pd.to_numeric(df["Close"], errors="coerce") |
|
|
col.name = tickers[0] if tickers else "SINGLE" |
|
|
return col.to_frame() |
|
|
|
|
|
|
|
|
num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] |
|
|
if num_cols: |
|
|
col = pd.to_numeric(df[num_cols[0]], errors="coerce") |
|
|
col.name = tickers[0] if tickers else "SINGLE" |
|
|
return col.to_frame() |
|
|
|
|
|
return pd.DataFrame() |
|
|
|
|
|
def fetch_prices_monthly(tickers: List[str], years: int) -> pd.DataFrame: |
|
|
tickers = [t for t in dict.fromkeys(tickers) if t] |
|
|
if not tickers: |
|
|
return pd.DataFrame() |
|
|
|
|
|
start = (pd.Timestamp.today(tz="UTC") - pd.DateOffset(years=int(years), days=7)).date() |
|
|
end = pd.Timestamp.today(tz="UTC").date() |
|
|
|
|
|
df_raw = yf.download( |
|
|
tickers, start=start, end=end, |
|
|
interval="1mo", auto_adjust=True, progress=False, group_by="ticker", |
|
|
threads=True, |
|
|
) |
|
|
df = _to_cols_close(df_raw, tickers) |
|
|
if df.empty: |
|
|
return df |
|
|
df = df.dropna(how="all").fillna(method="ffill") |
|
|
|
|
|
keep = [t for t in tickers if t in df.columns] |
|
|
if not keep and df.shape[1] == 1: |
|
|
|
|
|
df.columns = [tickers[0]] |
|
|
keep = [tickers[0]] |
|
|
return df[keep] if keep else pd.DataFrame() |
|
|
|
|
|
def monthly_returns(prices: pd.DataFrame) -> pd.DataFrame: |
|
|
if prices is None or prices.empty: |
|
|
return pd.DataFrame() |
|
|
return prices.pct_change().dropna(how="all") |
|
|
|
|
|
def validate_tickers(symbols: List[str], years: int) -> List[str]: |
|
|
"""Return subset of symbols that have monthly data.""" |
|
|
symbols = [s.strip().upper() for s in symbols if s and isinstance(s, str)] |
|
|
if not symbols: |
|
|
return [] |
|
|
base = [s for s in symbols if s != MARKET_TICKER] |
|
|
px = fetch_prices_monthly(base + [MARKET_TICKER], years) |
|
|
if px.empty: |
|
|
return [s for s in symbols if s == MARKET_TICKER] |
|
|
ok = [s for s in symbols if s in px.columns] |
|
|
return ok |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def annualize_mean(m): return np.asarray(m, dtype=float) * 12.0 |
|
|
def annualize_sigma(s): return np.asarray(s, dtype=float) * math.sqrt(12.0) |
|
|
|
|
|
def get_aligned_monthly_returns(symbols: List[str], years: int) -> pd.DataFrame: |
|
|
uniq = [c for c in dict.fromkeys(symbols)] |
|
|
if MARKET_TICKER not in uniq: |
|
|
uniq.append(MARKET_TICKER) |
|
|
px = fetch_prices_monthly(uniq, years) |
|
|
rets = monthly_returns(px) |
|
|
if rets.empty: |
|
|
return pd.DataFrame() |
|
|
cols = [c for c in uniq if c in rets.columns] |
|
|
R = rets[cols].dropna(how="any") |
|
|
return R.loc[:, ~R.columns.duplicated()] |
|
|
|
|
|
def estimate_all_moments_aligned(symbols: List[str], years: int, rf_ann: float): |
|
|
R = get_aligned_monthly_returns(symbols + [MARKET_TICKER], years) |
|
|
if R.empty or MARKET_TICKER not in R.columns or R.shape[0] < 3: |
|
|
raise ValueError("Not enough aligned data to estimate moments.") |
|
|
rf_m = rf_ann / 12.0 |
|
|
|
|
|
m = R[MARKET_TICKER] |
|
|
if isinstance(m, pd.DataFrame): |
|
|
m = m.iloc[:, 0].squeeze() |
|
|
|
|
|
mu_m_ann = float(annualize_mean(m.mean())) |
|
|
sigma_m_ann = float(annualize_sigma(m.std(ddof=1))) |
|
|
erp_ann = float(mu_m_ann - rf_ann) |
|
|
|
|
|
ex_m = m - rf_m |
|
|
var_m = float(np.var(ex_m.values, ddof=1)) |
|
|
var_m = max(var_m, 1e-9) |
|
|
|
|
|
betas: Dict[str, float] = {} |
|
|
for s in [c for c in R.columns if c != MARKET_TICKER]: |
|
|
ex_s = R[s] - rf_m |
|
|
cov_sm = float(np.cov(ex_s.values, ex_m.values, ddof=1)[0, 1]) |
|
|
betas[s] = cov_sm / var_m |
|
|
betas[MARKET_TICKER] = 1.0 |
|
|
|
|
|
asset_cols = [c for c in R.columns if c != MARKET_TICKER] |
|
|
cov_m = np.cov(R[asset_cols].values.T, ddof=1) if asset_cols else np.zeros((0, 0)) |
|
|
covA = pd.DataFrame(cov_m * 12.0, index=asset_cols, columns=asset_cols) |
|
|
|
|
|
return {"betas": betas, "cov_ann": covA, "erp_ann": erp_ann, "sigma_m_ann": sigma_m_ann} |
|
|
|
|
|
def capm_er(beta: float, rf_ann: float, erp_ann: float) -> float: |
|
|
return float(rf_ann + beta * erp_ann) |
|
|
|
|
|
def portfolio_stats(weights: Dict[str, float], |
|
|
cov_ann: pd.DataFrame, |
|
|
betas: Dict[str, float], |
|
|
rf_ann: float, |
|
|
erp_ann: float) -> Tuple[float, float, float]: |
|
|
tickers = list(weights.keys()) |
|
|
if not tickers: |
|
|
return 0.0, rf_ann, 0.0 |
|
|
w = np.array([weights[t] for t in tickers], dtype=float) |
|
|
gross = float(np.sum(np.abs(w))) |
|
|
if gross <= 1e-12: |
|
|
return 0.0, rf_ann, 0.0 |
|
|
w_expo = w / gross |
|
|
beta_p = float(np.dot([betas.get(t, 0.0) for t in tickers], w_expo)) |
|
|
er_capm = capm_er(beta_p, rf_ann, erp_ann) |
|
|
cov = cov_ann.reindex(index=tickers, columns=tickers).fillna(0.0).to_numpy() |
|
|
sigma_p = math.sqrt(max(float(w_expo.T @ cov @ w_expo), 0.0)) |
|
|
return beta_p, er_capm, sigma_p |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def efficient_same_sigma(sigma_target: float, rf_ann: float, erp_ann: float, sigma_mkt: float): |
|
|
if sigma_mkt <= 1e-12: |
|
|
return 0.0, 1.0, rf_ann |
|
|
a = sigma_target / sigma_mkt |
|
|
return a, 1.0 - a, rf_ann + a * erp_ann |
|
|
|
|
|
def efficient_same_return(mu_target: float, rf_ann: float, erp_ann: float, sigma_mkt: float): |
|
|
if abs(erp_ann) <= 1e-12: |
|
|
return 0.0, 1.0, 0.0 |
|
|
a = (mu_target - rf_ann) / erp_ann |
|
|
return a, 1.0 - a, abs(a) * sigma_mkt |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _pct_arr(x): |
|
|
return np.asarray(x, dtype=float) * 100.0 |
|
|
|
|
|
def plot_cml(rf_ann, erp_ann, sigma_mkt, |
|
|
pt_sigma_hist, pt_mu_capm, |
|
|
same_sigma_sigma, same_sigma_mu, |
|
|
same_mu_sigma, same_mu_mu) -> Image.Image: |
|
|
fig = plt.figure(figsize=(6.6, 4.4), dpi=130) |
|
|
|
|
|
xmax = max(0.3, sigma_mkt * 2.0, pt_sigma_hist * 1.4, same_mu_sigma * 1.4, same_sigma_sigma * 1.4) |
|
|
xs = np.linspace(0, xmax, 160) |
|
|
slope = erp_ann / max(sigma_mkt, 1e-12) |
|
|
cml = rf_ann + slope * xs |
|
|
|
|
|
plt.plot(_pct_arr(xs), _pct_arr(cml), label="CML via VOO", linewidth=1.8) |
|
|
plt.scatter([0.0], [_pct_arr(rf_ann)], label="Risk-free", zorder=5) |
|
|
plt.scatter([_pct_arr(sigma_mkt)], [_pct_arr(rf_ann + erp_ann)], label="Market (VOO)", zorder=5) |
|
|
plt.scatter([_pct_arr(pt_sigma_hist)], [_pct_arr(pt_mu_capm)], label="Your portfolio (CAPM)", zorder=6) |
|
|
plt.scatter([_pct_arr(same_sigma_sigma)], [_pct_arr(same_sigma_mu)], label="Efficient: same σ", zorder=5) |
|
|
plt.scatter([_pct_arr(same_mu_sigma)], [_pct_arr(same_mu_mu)], label="Efficient: same μ", zorder=5) |
|
|
|
|
|
|
|
|
plt.plot([_pct_arr(pt_sigma_hist), _pct_arr(same_sigma_sigma)], |
|
|
[_pct_arr(pt_mu_capm), _pct_arr(same_sigma_mu)], |
|
|
ls="--", lw=1.1, alpha=0.7, color="gray") |
|
|
plt.plot([_pct_arr(pt_sigma_hist), _pct_arr(same_mu_sigma)], |
|
|
[_pct_arr(pt_mu_capm), _pct_arr(same_mu_mu)], |
|
|
ls="--", lw=1.1, alpha=0.7, color="gray") |
|
|
|
|
|
plt.xlabel("σ (annual, %)") |
|
|
plt.ylabel("E[return] (annual, %)") |
|
|
plt.legend(loc="best", fontsize=8) |
|
|
plt.tight_layout() |
|
|
|
|
|
buf = io.BytesIO() |
|
|
plt.savefig(buf, format="png") |
|
|
plt.close(fig) |
|
|
buf.seek(0) |
|
|
return Image.open(buf) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dirichlet_signed(k, rng): |
|
|
signs = rng.choice([-1.0, 1.0], size=k, p=[0.25, 0.75]) |
|
|
raw = rng.dirichlet(np.ones(k)) |
|
|
gross = 1.0 + float(rng.gamma(2.0, 0.5)) |
|
|
return gross * signs * raw |
|
|
|
|
|
def build_synth_dataset(universe: List[str], |
|
|
cov_ann: pd.DataFrame, |
|
|
betas: Dict[str, float], |
|
|
rf_ann: float, erp_ann: float, |
|
|
n_rows: int = N_SYNTH, |
|
|
seed: int = 123) -> pd.DataFrame: |
|
|
rng = np.random.default_rng(seed) |
|
|
U = [u for u in universe if u != MARKET_TICKER] + [MARKET_TICKER] |
|
|
rows = [] |
|
|
if not U: |
|
|
return pd.DataFrame() |
|
|
for i in range(n_rows): |
|
|
k = int(rng.integers(low=max(1, min(2, len(U))), high=min(8, len(U)) + 1)) |
|
|
picks = list(rng.choice(U, size=k, replace=False)) |
|
|
w = dirichlet_signed(k, rng) |
|
|
gross = float(np.sum(np.abs(w))) |
|
|
if gross <= 1e-12: |
|
|
continue |
|
|
w_expo = w / gross |
|
|
weights = {picks[j]: float(w_expo[j]) for j in range(k)} |
|
|
beta_i, er_capm_i, sigma_i = portfolio_stats(weights, cov_ann, betas, rf_ann, erp_ann) |
|
|
rows.append({ |
|
|
"id": int(i), |
|
|
"tickers": ",".join(picks), |
|
|
"weights": ",".join(f"{x:.6f}" for x in w_expo), |
|
|
"beta": float(beta_i), |
|
|
"er_capm": float(er_capm_i), |
|
|
"sigma": float(sigma_i), |
|
|
}) |
|
|
return pd.DataFrame(rows) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_embedder = None |
|
|
def get_embedder(): |
|
|
global _embedder |
|
|
if _embedder is None: |
|
|
_embedder = SentenceTransformer(EMBED_MODEL_NAME) |
|
|
return _embedder |
|
|
|
|
|
def row_to_sentence(row: pd.Series) -> str: |
|
|
try: |
|
|
ts = row["tickers"].split(",") |
|
|
ws = [float(x) for x in row["weights"].split(",")] |
|
|
pairs = ", ".join([f"{ts[i]} {ws[i]:+.2f}" for i in range(min(len(ts), len(ws)))]) |
|
|
except Exception: |
|
|
pairs = "" |
|
|
return (f"portfolio with sigma {row['sigma']:.4f}, " |
|
|
f"capm_return {row['er_capm']:.4f}, " |
|
|
f"beta {row['beta']:.3f}, " |
|
|
f"exposures {pairs}") |
|
|
|
|
|
def mmr_select(query_emb, cand_embs, k: int = 3, lambda_param: float = MMR_LAMBDA) -> List[int]: |
|
|
if cand_embs.shape[0] <= k: |
|
|
return list(range(cand_embs.shape[0])) |
|
|
sim_to_query = st_util.cos_sim(query_emb, cand_embs).cpu().numpy().reshape(-1) |
|
|
chosen = [] |
|
|
candidate_indices = list(range(cand_embs.shape[0])) |
|
|
first = int(np.argmax(sim_to_query)) |
|
|
chosen.append(first) |
|
|
candidate_indices.remove(first) |
|
|
while len(chosen) < k and candidate_indices: |
|
|
max_score = -1e9 |
|
|
max_idx = candidate_indices[0] |
|
|
|
|
|
chosen_stack = cand_embs[chosen] |
|
|
for idx in candidate_indices: |
|
|
sim_q = sim_to_query[idx] |
|
|
sim_d = float(st_util.cos_sim(cand_embs[idx], chosen_stack).max().cpu().numpy()) |
|
|
mmr_score = lambda_param * sim_q - (1.0 - lambda_param) * sim_d |
|
|
if mmr_score > max_score: |
|
|
max_score = mmr_score |
|
|
max_idx = idx |
|
|
chosen.append(max_idx) |
|
|
candidate_indices.remove(max_idx) |
|
|
return chosen |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def yahoo_search(query: str): |
|
|
if not query or len(query.strip()) == 0: |
|
|
return [] |
|
|
url = "https://query1.finance.yahoo.com/v1/finance/search" |
|
|
params = {"q": query.strip(), "quotesCount": 10, "newsCount": 0} |
|
|
headers = {"User-Agent": "Mozilla/5.0"} |
|
|
try: |
|
|
r = requests.get(url, params=params, headers=headers, timeout=10) |
|
|
r.raise_for_status() |
|
|
data = r.json() |
|
|
out = [] |
|
|
for q in data.get("quotes", []): |
|
|
sym = q.get("symbol") |
|
|
name = q.get("shortname") or q.get("longname") or "" |
|
|
exch = q.get("exchDisp") or "" |
|
|
if sym and sym.isascii(): |
|
|
out.append(f"{sym} | {name} | {exch}") |
|
|
if not out: |
|
|
out = [f"{query.strip().upper()} | typed symbol | n/a"] |
|
|
return out[:10] |
|
|
except Exception: |
|
|
return [f"{query.strip().upper()} | typed symbol | n/a"] |
|
|
|
|
|
_last_matches = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fmt_pct(x: float) -> str: |
|
|
try: |
|
|
return f"{float(x)*100:.2f}%" |
|
|
except Exception: |
|
|
return "n/a" |
|
|
|
|
|
def fmt_money(x: float) -> str: |
|
|
try: |
|
|
return f"${float(x):,.0f}" |
|
|
except Exception: |
|
|
return "n/a" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
HORIZON_YEARS = 5.0 |
|
|
RF_CODE = fred_series_for_horizon(HORIZON_YEARS) |
|
|
RF_ANN = fetch_fred_yield_annual(RF_CODE) |
|
|
|
|
|
def do_search(query): |
|
|
global _last_matches |
|
|
_last_matches = yahoo_search(query) |
|
|
note = "Select a symbol from Matches, then click Add." |
|
|
return note, gr.update(choices=_last_matches, value=None) |
|
|
|
|
|
def add_symbol(selection: str, table: pd.DataFrame): |
|
|
if selection and " | " in selection: |
|
|
symbol = selection.split(" | ")[0].strip().upper() |
|
|
elif isinstance(selection, str) and selection.strip(): |
|
|
symbol = selection.strip().upper() |
|
|
else: |
|
|
return table, "Pick a row from Matches first." |
|
|
|
|
|
current = [] |
|
|
if isinstance(table, pd.DataFrame) and len(table) > 0 and "ticker" in table.columns: |
|
|
current = [str(x).upper() for x in table["ticker"].tolist() if str(x) != "nan"] |
|
|
|
|
|
tickers = current if symbol in current else current + [symbol] |
|
|
tickers = [t for t in tickers if t] |
|
|
|
|
|
val = validate_tickers(tickers, years=DEFAULT_LOOKBACK_YEARS) |
|
|
tickers = [t for t in tickers if t in val] |
|
|
|
|
|
amt_map = {} |
|
|
if isinstance(table, pd.DataFrame) and len(table) > 0: |
|
|
for _, r in table.iterrows(): |
|
|
t = str(r.get("ticker", "")).upper() |
|
|
if t in tickers: |
|
|
amt_map[t] = float(pd.to_numeric(r.get("amount_usd", 0.0), errors="coerce") or 0.0) |
|
|
|
|
|
new_table = pd.DataFrame({"ticker": tickers, "amount_usd": [amt_map.get(t, 0.0) for t in tickers]}) |
|
|
msg = f"Added {symbol}" if symbol in tickers else f"{symbol} not valid or no data" |
|
|
if len(new_table) > MAX_TICKERS: |
|
|
new_table = new_table.iloc[:MAX_TICKERS] |
|
|
msg = f"Reached max of {MAX_TICKERS}" |
|
|
return new_table, msg |
|
|
|
|
|
def lock_ticker_column(tb: pd.DataFrame): |
|
|
if tb is None or len(tb) == 0: |
|
|
return pd.DataFrame(columns=["ticker", "amount_usd"]) |
|
|
tickers = [str(x).upper() for x in tb["ticker"].tolist()] |
|
|
amounts = pd.to_numeric(tb["amount_usd"], errors="coerce").fillna(0.0).tolist() |
|
|
val = validate_tickers(tickers, years=DEFAULT_LOOKBACK_YEARS) |
|
|
tickers = [t for t in tickers if t in val] |
|
|
amounts = amounts[:len(tickers)] + [0.0] * max(0, len(tickers) - len(amounts)) |
|
|
return pd.DataFrame({"ticker": tickers, "amount_usd": amounts}) |
|
|
|
|
|
def set_horizon(years: float): |
|
|
y = max(1.0, min(100.0, float(years))) |
|
|
code = fred_series_for_horizon(y) |
|
|
rf = fetch_fred_yield_annual(code) |
|
|
global HORIZON_YEARS, RF_CODE, RF_ANN |
|
|
HORIZON_YEARS = y |
|
|
RF_CODE = code |
|
|
RF_ANN = rf |
|
|
return f"Risk-free series {code}. Latest annual rate {rf:.2%}. Computations will use this." |
|
|
|
|
|
def _table_from_weights(weights: Dict[str, float], gross_amt: float) -> pd.DataFrame: |
|
|
items = [] |
|
|
for t, w in weights.items(): |
|
|
pct = float(w) |
|
|
amt = float(w) * gross_amt |
|
|
items.append({"ticker": t, "weight_%": round(pct * 100.0, 2), "amount_$": round(amt, 2)}) |
|
|
df = pd.DataFrame(items, columns=SUG_COLS) |
|
|
if df.empty: |
|
|
return pd.DataFrame(columns=SUG_COLS) |
|
|
df["absw"] = df["weight_%"].abs() |
|
|
df = df.sort_values("absw", ascending=False).drop(columns=["absw"]) |
|
|
return df |
|
|
|
|
|
def _weights_dict_from_row(r: pd.Series) -> Dict[str, float]: |
|
|
ts = [t.strip().upper() for t in str(r.get("tickers","")).split(",") if t] |
|
|
ws = [] |
|
|
for x in str(r.get("weights","")).split(","): |
|
|
try: |
|
|
ws.append(float(x)) |
|
|
except Exception: |
|
|
ws.append(0.0) |
|
|
wmap = {} |
|
|
for i in range(min(len(ts), len(ws))): |
|
|
wmap[ts[i]] = ws[i] |
|
|
gross = sum(abs(v) for v in wmap.values()) |
|
|
if gross <= 1e-12: |
|
|
return {} |
|
|
return {k: v / gross for k, v in wmap.items()} |
|
|
|
|
|
def compute(lookback_years: int, |
|
|
table: Optional[pd.DataFrame], |
|
|
risk_bucket: str, |
|
|
horizon_years: float): |
|
|
|
|
|
try: |
|
|
|
|
|
if table is None or len(table) == 0: |
|
|
empty = pd.DataFrame(columns=POS_COLS) |
|
|
emptyS = pd.DataFrame(columns=SUG_COLS) |
|
|
emptyE = pd.DataFrame(columns=EFF_COLS) |
|
|
return (None, "Add at least one ticker", "", empty, |
|
|
emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions yet.") |
|
|
|
|
|
df = table.copy().dropna(how="all") |
|
|
if df.empty or "ticker" not in df.columns or "amount_usd" not in df.columns: |
|
|
empty = pd.DataFrame(columns=POS_COLS) |
|
|
emptyS = pd.DataFrame(columns=SUG_COLS) |
|
|
emptyE = pd.DataFrame(columns=EFF_COLS) |
|
|
return (None, "Positions table is empty or malformed.", "", empty, |
|
|
emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions yet.") |
|
|
|
|
|
df["ticker"] = df["ticker"].astype(str).str.upper().str.strip() |
|
|
df["amount_usd"] = pd.to_numeric(df["amount_usd"], errors="coerce").fillna(0.0) |
|
|
|
|
|
symbols = [t for t in df["ticker"].tolist() if t] |
|
|
symbols = validate_tickers(symbols, lookback_years) |
|
|
if len(symbols) == 0: |
|
|
empty = pd.DataFrame(columns=POS_COLS) |
|
|
emptyS = pd.DataFrame(columns=SUG_COLS) |
|
|
emptyE = pd.DataFrame(columns=EFF_COLS) |
|
|
return (None, "Could not validate any tickers", "Universe invalid", |
|
|
empty, emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions.") |
|
|
|
|
|
|
|
|
universe = sorted(set([s for s in symbols if s != MARKET_TICKER] + [MARKET_TICKER])) |
|
|
df = df[df["ticker"].isin(symbols)].copy() |
|
|
amounts = {r["ticker"]: float(r["amount_usd"]) for _, r in df.iterrows()} |
|
|
gross_amt = sum(abs(v) for v in amounts.values()) |
|
|
if gross_amt <= 1e-9: |
|
|
empty = pd.DataFrame(columns=POS_COLS) |
|
|
emptyS = pd.DataFrame(columns=SUG_COLS) |
|
|
emptyE = pd.DataFrame(columns=EFF_COLS) |
|
|
return (None, "All amounts are zero", "Universe ok", |
|
|
empty, emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions.") |
|
|
|
|
|
weights = {k: v / gross_amt for k, v in amounts.items()} |
|
|
|
|
|
|
|
|
rf_code = fred_series_for_horizon(horizon_years) |
|
|
rf_ann = fetch_fred_yield_annual(rf_code) |
|
|
moms = estimate_all_moments_aligned(universe, lookback_years, rf_ann) |
|
|
betas, covA, erp_ann, sigma_mkt = moms["betas"], moms["cov_ann"], moms["erp_ann"], moms["sigma_m_ann"] |
|
|
|
|
|
|
|
|
beta_p, er_capm_p, sigma_p = portfolio_stats(weights, covA, betas, rf_ann, erp_ann) |
|
|
|
|
|
|
|
|
a_sigma, b_sigma, mu_eff_sigma = efficient_same_sigma(sigma_p, rf_ann, erp_ann, sigma_mkt) |
|
|
a_mu, b_mu, sigma_eff_mu = efficient_same_return(er_capm_p, rf_ann, erp_ann, sigma_mkt) |
|
|
|
|
|
eff_same_sigma_tbl = _table_from_weights({MARKET_TICKER: a_sigma, BILLS_TICKER: b_sigma}, gross_amt) |
|
|
eff_same_mu_tbl = _table_from_weights({MARKET_TICKER: a_mu, BILLS_TICKER: b_mu}, gross_amt) |
|
|
|
|
|
|
|
|
synth = build_synth_dataset(universe, covA, betas, rf_ann, erp_ann, n_rows=N_SYNTH, seed=777) |
|
|
if synth.empty: |
|
|
|
|
|
fallback = [] |
|
|
for a in [0.2, 0.5, 0.8]: |
|
|
w = {MARKET_TICKER: a, BILLS_TICKER: 1-a} |
|
|
beta_i, er_capm_i, sigma_i = portfolio_stats(w, pd.DataFrame(), {MARKET_TICKER:1.0}, rf_ann, erp_ann) |
|
|
fallback.append({"tickers": ",".join(w.keys()), |
|
|
"weights": ",".join(f"{v:.6f}" for v in w.values()), |
|
|
"beta": beta_i, "er_capm": er_capm_i, "sigma": sigma_i}) |
|
|
synth = pd.DataFrame(fallback) |
|
|
|
|
|
|
|
|
median_sigma = float(synth["sigma"].median()) |
|
|
low_max = max(float(synth["sigma"].min()), median_sigma - 0.05) |
|
|
high_min = median_sigma + 0.05 |
|
|
|
|
|
if risk_bucket == "Low": |
|
|
cand_df = synth[synth["sigma"] <= low_max].copy() |
|
|
elif risk_bucket == "High": |
|
|
cand_df = synth[synth["sigma"] >= high_min].copy() |
|
|
else: |
|
|
cand_df = synth[(synth["sigma"] > low_max) & (synth["sigma"] < high_min)].copy() |
|
|
if len(cand_df) == 0: |
|
|
cand_df = synth.copy() |
|
|
|
|
|
|
|
|
embed = get_embedder() |
|
|
cand_sentences = cand_df.apply(row_to_sentence, axis=1).tolist() |
|
|
cur_pairs = ", ".join([f"{k}:{v:+.2f}" for k, v in sorted(weights.items())]) |
|
|
q_sentence = f"user portfolio ({risk_bucket} risk); capm_target {er_capm_p:.4f}; sigma_hist {sigma_p:.4f}; exposures {cur_pairs}" |
|
|
|
|
|
cand_embs = embed.encode(cand_sentences, convert_to_tensor=True, normalize_embeddings=True, batch_size=64, show_progress_bar=False) |
|
|
q_emb = embed.encode([q_sentence], convert_to_tensor=True, normalize_embeddings=True)[0] |
|
|
|
|
|
sims = st_util.cos_sim(q_emb, cand_embs)[0] |
|
|
top_idx = sims.topk(k=min(MMR_K, len(cand_df))).indices.cpu().numpy().tolist() |
|
|
shortlist_embs = cand_embs[top_idx] |
|
|
mmr_local = mmr_select(q_emb, shortlist_embs, k=3, lambda_param=MMR_LAMBDA) |
|
|
chosen = [top_idx[i] for i in mmr_local] |
|
|
recs = cand_df.iloc[chosen].reset_index(drop=True) |
|
|
|
|
|
|
|
|
sugg_tables = [] |
|
|
sugg_meta = [] |
|
|
for _, r in recs.iterrows(): |
|
|
wmap = _weights_dict_from_row(r) |
|
|
sugg_tables.append(_table_from_weights(wmap, gross_amt)) |
|
|
sugg_meta.append({"er_capm": float(r["er_capm"]), "sigma": float(r["sigma"]), "beta": float(r["beta"])}) |
|
|
|
|
|
|
|
|
img = plot_cml( |
|
|
rf_ann, erp_ann, sigma_mkt, |
|
|
sigma_p, er_capm_p, |
|
|
same_sigma_sigma=sigma_p, same_sigma_mu=mu_eff_sigma, |
|
|
same_mu_sigma=sigma_eff_mu, same_mu_mu=er_capm_p |
|
|
) |
|
|
|
|
|
|
|
|
rows = [] |
|
|
for t in universe: |
|
|
if t == MARKET_TICKER: |
|
|
continue |
|
|
rows.append({ |
|
|
"ticker": t, |
|
|
"amount_usd": round(amounts.get(t, 0.0), 2), |
|
|
"weight_exposure": round(weights.get(t, 0.0), 6), |
|
|
"beta": round(betas.get(t, np.nan), 4) if t != MARKET_TICKER else 1.0 |
|
|
}) |
|
|
pos_table = pd.DataFrame(rows, columns=POS_COLS) |
|
|
|
|
|
|
|
|
info_lines = [] |
|
|
info_lines.append("### Inputs") |
|
|
info_lines.append(f"- Lookback years **{int(lookback_years)}**") |
|
|
info_lines.append(f"- Horizon years **{int(round(horizon_years))}**") |
|
|
info_lines.append(f"- Risk-free **{fmt_pct(rf_ann)}** from **{rf_code}**") |
|
|
info_lines.append(f"- Market ERP **{fmt_pct(erp_ann)}**") |
|
|
info_lines.append(f"- Market σ **{fmt_pct(sigma_mkt)}**") |
|
|
info_lines.append("") |
|
|
info_lines.append("### Your portfolio (plotted as CAPM return, historical σ)") |
|
|
info_lines.append(f"- Beta **{beta_p:.2f}**") |
|
|
info_lines.append(f"- σ (historical) **{fmt_pct(sigma_p)}**") |
|
|
info_lines.append(f"- E[return] (CAPM / SML) **{fmt_pct(er_capm_p)}**") |
|
|
info_lines.append("") |
|
|
info_lines.append("### Efficient alternatives on CML") |
|
|
info_lines.append(f"- Same σ → Market **{a_sigma:.2f}**, Bills **{b_sigma:.2f}**, Return **{fmt_pct(mu_eff_sigma)}**") |
|
|
info_lines.append(f"- Same μ → Market **{a_mu:.2f}**, Bills **{b_mu:.2f}**, σ **{fmt_pct(sigma_eff_mu)}**") |
|
|
info_lines.append("") |
|
|
info_lines.append(f"### Dataset-based suggestions (risk: **{risk_bucket}**)") |
|
|
info_lines.append("Use the selector to flip between **Pick #1 / #2 / #3**. Table shows % exposure and $ amounts.") |
|
|
|
|
|
|
|
|
while len(sugg_tables) < 3: |
|
|
sugg_tables.append(pd.DataFrame(columns=SUG_COLS)) |
|
|
|
|
|
pick_idx_default = 1 |
|
|
pick_msg_default = (f"Pick #1 — E[μ] {fmt_pct(sugg_meta[0]['er_capm'])}, " |
|
|
f"σ {fmt_pct(sugg_meta[0]['sigma'])}, β {sugg_meta[0]['beta']:.2f}") if sugg_meta else "No suggestion." |
|
|
|
|
|
return (img, |
|
|
"\n".join(info_lines), |
|
|
f"Universe set to {', '.join(universe)}", |
|
|
pos_table, |
|
|
sugg_tables[0], sugg_tables[1], sugg_tables[2], |
|
|
eff_same_sigma_tbl, eff_same_mu_tbl, |
|
|
json.dumps(sugg_meta), pick_idx_default, pick_msg_default) |
|
|
|
|
|
except Exception as e: |
|
|
empty = pd.DataFrame(columns=POS_COLS) |
|
|
emptyS = pd.DataFrame(columns=SUG_COLS) |
|
|
emptyE = pd.DataFrame(columns=EFF_COLS) |
|
|
msg = f"⚠️ Compute failed: {e}" |
|
|
if DEBUG: |
|
|
msg += "\n\n```\n" + traceback.format_exc() + "\n```" |
|
|
return (None, msg, "Error", empty, emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions.") |
|
|
|
|
|
def on_pick_change(idx: int, meta_json: str): |
|
|
try: |
|
|
data = json.loads(meta_json) |
|
|
except Exception: |
|
|
data = [] |
|
|
if not data: |
|
|
return "No suggestion." |
|
|
i = int(idx) - 1 |
|
|
i = max(0, min(i, len(data)-1)) |
|
|
s = data[i] |
|
|
return f"Pick #{i+1} — E[μ] {fmt_pct(s['er_capm'])}, σ {fmt_pct(s['sigma'])}, β {s['beta']:.2f}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Efficient Portfolio Advisor", css="#small-note {font-size: 12px; color:#666;}") as demo: |
|
|
|
|
|
gr.Markdown("## Efficient Portfolio Advisor\n" |
|
|
"Search symbols, enter **$ amounts**, set your **horizon**. " |
|
|
"The plot shows your **CAPM expected return** vs **historical σ**, alongside the **CML**. " |
|
|
"Recommendations are generated from a **synthetic dataset (1000 portfolios)** and ranked with **local embeddings (BGE-base)** for relevance + diversity.") |
|
|
|
|
|
with gr.Tab("Build Portfolio"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
q = gr.Textbox(label="Search symbol") |
|
|
search_note = gr.Markdown(elem_id="small-note") |
|
|
matches = gr.Dropdown(choices=[], label="Matches", value=None) |
|
|
search_btn = gr.Button("Search") |
|
|
add_btn = gr.Button("Add selected to portfolio") |
|
|
|
|
|
gr.Markdown("### Positions (enter dollars; negatives allowed for shorts)") |
|
|
table = gr.Dataframe( |
|
|
headers=["ticker", "amount_usd"], |
|
|
datatype=["str", "number"], |
|
|
row_count=0, |
|
|
col_count=(2, "fixed"), |
|
|
wrap=True |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
horizon = gr.Slider(1, 30, value=DEFAULT_LOOKBACK_YEARS, step=1, label="Investment horizon (years)") |
|
|
lookback = gr.Slider(1, 10, value=DEFAULT_LOOKBACK_YEARS, step=1, label="Lookback (years) for β and σ") |
|
|
risk_bucket = gr.Radio(["Low", "Medium", "High"], value="Medium", label="Recommendation risk level") |
|
|
run_btn = gr.Button("Compute") |
|
|
|
|
|
rf_msg = gr.Textbox(label="Risk-free source / status", interactive=False) |
|
|
search_btn.click(fn=do_search, inputs=q, outputs=[search_note, matches]) |
|
|
add_btn.click(fn=add_symbol, inputs=[matches, table], outputs=[table, search_note]) |
|
|
table.change(fn=lock_ticker_column, inputs=table, outputs=table) |
|
|
horizon.change(fn=set_horizon, inputs=horizon, outputs=[rf_msg]) |
|
|
|
|
|
with gr.Tab("Results"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
plot = gr.Image(label="Capital Market Line", type="pil") |
|
|
summary = gr.Markdown(label="Summary") |
|
|
universe_msg = gr.Textbox(label="Universe status", interactive=False) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
positions = gr.Dataframe( |
|
|
label="Computed positions", |
|
|
headers=POS_COLS, |
|
|
datatype=["str", "number", "number", "number"], |
|
|
col_count=(len(POS_COLS), "fixed"), |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
gr.Markdown("### Recommendations (always from embeddings)") |
|
|
with gr.Row(): |
|
|
sugg1 = gr.Dataframe(label="Pick #1", interactive=False) |
|
|
sugg2 = gr.Dataframe(label="Pick #2", interactive=False) |
|
|
sugg3 = gr.Dataframe(label="Pick #3", interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
|
pick_idx = gr.Slider(1, 3, value=1, step=1, label="Carousel: show Pick #") |
|
|
pick_meta = gr.Textbox(value="[]", visible=False) |
|
|
pick_msg = gr.Markdown("") |
|
|
|
|
|
gr.Markdown("### Efficient alternatives on the CML") |
|
|
eff_same_sigma_tbl = gr.Dataframe(label="Efficient: Same σ", interactive=False) |
|
|
eff_same_mu_tbl = gr.Dataframe(label="Efficient: Same μ", interactive=False) |
|
|
|
|
|
run_btn.click( |
|
|
fn=compute, |
|
|
inputs=[lookback, table, risk_bucket, horizon], |
|
|
outputs=[ |
|
|
plot, summary, universe_msg, positions, |
|
|
sugg1, sugg2, sugg3, |
|
|
eff_same_sigma_tbl, eff_same_mu_tbl, |
|
|
pick_meta, pick_idx, pick_msg |
|
|
] |
|
|
) |
|
|
pick_idx.change(fn=on_pick_change, inputs=[pick_idx, pick_meta], outputs=pick_msg) |
|
|
|
|
|
with gr.Tab("About"): |
|
|
gr.Markdown( |
|
|
"### Modality & Model\n" |
|
|
"- **Modality**: Text (portfolio → text descriptions) powering **embeddings**\n" |
|
|
"- **Embedding model**: `BAAI/bge-base-en-v1.5` (local, downloaded once; no API)\n\n" |
|
|
"### Use case\n" |
|
|
"Given a portfolio, we build a synthetic dataset of 1,000 alternative mixes **using the same tickers**, " |
|
|
"compute each mix’s **CAPM return, σ, and β**, and rank candidates with embeddings to return **3 diverse, relevant suggestions** " |
|
|
"for **Low / Medium / High** risk.\n\n" |
|
|
"### Theory links\n" |
|
|
"- Portfolio expected return in the plot uses **CAPM (SML)**, while σ is historical.\n" |
|
|
"- The **CML** and the two **efficient alternatives** (same σ, same μ) use a mix of **Market (VOO)** and **Bills**." |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|