从"回测死代码"进化为"生命体":AI 从单纯程序员转变为 DevOps 运维专家兼首席风控官。 盘前剧本预测 · 盘中实时信号共振 · 盘后智能复盘,结合美东时间精准踩点美股开盘节奏, 通过 Telegram / Email / Webhook 毫秒级全渠道预警触达。
原代码直接执行 df['High'] = df['High'].rolling(20).max(),覆写了原始 OHLC 数据。后续所有用到 High/Low 的计算(ATR、止损线、K 线图)都使用了"未来已知的"滚动极值,而非真实市价。
原代码 t[1] > 0 判断的是卖出价格是否大于零(恒成立),而非是否盈利。导致胜率永远接近 100%,完全失去参考意义。
df.iloc[i, df.columns.get_loc('Hold')] = lookback 在 pandas 2.x 中因 chained indexing 静默失败,Hold 列永远为 0,持仓逻辑完全失效。
原代码对整个 period 数据累计计算 VWAP,随时间推移 VWAP 趋近于所有历史均价,失去当日参考意义。修复为每日分组独立重置。
所有策略生成信号后在同 bar 的 close 价格成交,相当于提前知道收盘价再做决策。修复:全部信号通过 .shift(1) 确认,在下一根 K 线开盘入场。
原版 equity_curve 只在部分条件下 append,长度不与 df 对齐,导致 Sharpe/MDD 基于错误的收益序列计算。同时 yfinance 调用无重试,网络断线直接 crash。
# lifecycle/scheduler.py — 状态机驱动的生命周期引擎 class MarketState(Enum): IDLE = auto() PRE_MARKET = auto() # 08:00-09:20 EST TRADING = auto() # 09:30-16:00 EST POST_MARKET = auto() # 16:15 EST class TradingLifecycleEngine: async def run(self): premarket_done = False while True: state = self._get_state() # 基于 EST 时间判断 if state == MarketState.PRE_MARKET and not premarket_done: await self._run_premarket() # 拉数据→剧本→发邮件 premarket_done = True elif state == MarketState.TRADING: await self._run_intraday_scan() # 每60s扫描→共振→预警 await asyncio.sleep(60) continue elif state == MarketState.POST_MARKET and not postmarket_done: await self._run_postmarket() # 结算→复盘→发报告 postmarket_done = True await asyncio.sleep(30) def _compute_consensus(signals: dict) -> Optional[str]: # 多策略共振:≥2 个策略同方向 → 触发 buys = sum(1 for v in signals.values() if v == 1) if buys >= 2: return "BUY" sells = sum(1 for v in signals.values() if v == -1) if sells >= 2: return "SELL" return None
# alerts/notification_engine.py — NotificationEngine 核心 class NotificationEngine: async def send(self, alert: Alert): # 优先级路由:CRITICAL/HIGH → Telegram+Webhook # NORMAL → Email+Webhook tasks = [] if alert.priority <= Priority.HIGH: tasks.append(_send_telegram(alert)) # 毫秒级触达手机 if alert.priority == Priority.NORMAL: tasks.append(_send_email(alert)) # HTML富文本报告 if "webhook" in alert.channels: tasks.append(_send_webhook(alert)) # 接入本地大屏 await asyncio.gather(*tasks, return_exceptions=True) # 快捷方法 async def signal_alert(self, symbol, signal, price, reason, strategy): await self.send(Alert( title=f"{'🟢买入' if signal=='BUY' else '🔴卖出'} {symbol}", body=f"策略:{strategy}\n信号:{signal}\n价格:${price:.2f}\n理由:{reason}", priority=Priority.HIGH, channels=["telegram", "webhook"], )) async def premarket_report(self, symbol, text, html): await self.send(Alert( title=f"📋 {symbol} 盘前交易剧本", body=text, html_body=html, priority=Priority.NORMAL, # → Email 富文本报告 channels=["email", "telegram", "webhook"], ))
# strategy/indicators.py — 修复 BUG-1 (rolling_high) & BUG-4 (daily_vwap) def rolling_high(series, period): """修复 BUG-1: 不覆写原始列,内置 shift(1) 防前视""" return series.rolling(period).max().shift(1) # shift已内置! def daily_vwap(df): """修复 BUG-4: 每日重置 VWAP(按 index.normalize() 分组)""" typical = (df["High"] + df["Low"] + df["Close"]) / 3 dates = df.index.normalize() vwap_vals = pd.Series(index=df.index, dtype=float) for date, grp_idx in df.groupby(dates).groups.items(): tp = typical.loc[grp_idx] vol = df["Volume"].loc[grp_idx] # 每个交易日独立累计,互不影响 vwap_vals.loc[grp_idx] = (tp * vol).cumsum() / (vol.cumsum() + 1e-10) return vwap_vals # 修复 BUG-5: 策略信号全部基于 shift(1) def strategy_trend(df, fast=12, slow=26): prev_fast = ema(df["Close"], fast).shift(1) # 前一根已确认 prev_slow = ema(df["Close"], slow).shift(1) df["Signal"] = 0 df.loc[prev_fast > prev_slow, "Signal"] = 1 df.loc[prev_fast < prev_slow, "Signal"] = -1 return df
# backtest/engine.py — 修复 BUG-2 (胜率) & BUG-6 (equity_curve) class BacktestEngine: def run(self, df, symbol, strategy_name, params=None, ...): equity = np.empty(len(df)) # 修复 BUG-6: 与 df 等长 for i in range(len(df)): ... # 每根 K 线统一记录权益(含未实现 PnL) equity[i] = capital + position * price if position > 0 else capital def _compute_metrics(self, result): pnl_pcts = np.array([t.pnl_pct for t in trades]) wins = pnl_pcts[pnl_pcts > 0] # 修复 BUG-2: 按 PnL% 判断 losses = pnl_pcts[pnl_pcts <= 0] result.win_rate = len(wins) / len(trades) * 100 result.profit_factor = wins.sum() / abs(losses.sum()) # 真实盈亏比 # Sharpe / Sortino / Calmar(均基于正确 equity_curve) rets = pd.Series(equity).pct_change().dropna() result.sharpe = rets.mean() / rets.std() * np.sqrt(252)
# .env 配置说明(复制 .env.example 填入真实值) # ── Telegram Bot ──────────────────────────────── # 1. 在 @BotFather 创建 Bot → 获取 TOKEN # 2. 给 Bot 发消息 → 访问 /getUpdates → 获取 chat_id TELEGRAM_TOKEN=7123456789:AAF_your_token TELEGRAM_CHAT_ID=123456789 # ── Gmail App Password(不是登录密码!)──────── # Gmail → 账号 → 安全 → 两步验证 → 应用专用密码(16位) EMAIL_SENDER=you@gmail.com EMAIL_PASSWORD=xxxx xxxx xxxx xxxx EMAIL_RECIPIENT=target@example.com # ── Webhook(接入本地 Open WebUI / 仪表盘)────── WEBHOOK_URL=http://localhost:8080/webhook/ait WEBHOOK_SECRET=your_secret_here # ── FMP API(免费250次/天,新闻+数据)────────── FMP_API_KEY=your_fmp_key # ── 风控参数 ───────────────────────────────────── CAPITAL_USDC=100000 # 测试: python main.py --test-alerts # 离线回测: python main.py --backtest # 全天候守护: python main.py
pip install yfinance pandas numpy pytz python-dotenv requests python-telegram-bot APScheduler pyarrow
复制 /root/ait/.env.example 为 .env,填入 Telegram Token、Gmail App Password、Webhook URL。
python main.py --test-alerts,验证 Telegram 消息是否到达手机,邮件是否收到。
python main.py --backtest,对比原版与修复版指标差异,确认 BUG 修复有效。
python main.py 或通过 pm2 start main.py --interpreter python3 守护运行,系统将自动按美东时间流转盘前/盘中/盘后三阶段。