🤖 AITRADE · DevOps · 生命周期引擎

AiTradeSystem — 全天候量化生命周期

从"回测死代码"进化为"生命体":AI 从单纯程序员转变为 DevOps 运维专家兼首席风控官。 盘前剧本预测 · 盘中实时信号共振 · 盘后智能复盘,结合美东时间精准踩点美股开盘节奏, 通过 Telegram / Email / Webhook 毫秒级全渠道预警触达。

--:--:--
EST 当前时间
--
市场状态
7
修复缺陷数
3
预警渠道
6
策略(MSFT+SLNO)
08:00 – 09:20 EST
📋 盘前备战
  • 自动拉取最新日线/小时线,补齐昨日缺口
  • 计算盘前 Gap%,抓取 FMP 新闻标题
  • 基于最新 ATR 生成动态止盈止损线
  • 生成 MSFT & SLNO 今日交易剧本
  • 发送 HTML 富文本邮件报告 + Telegram 摘要
09:30 – 16:00 EST
⚡ 盘中实战
  • 每 60s 扫描最新 K 线,运行全部策略
  • ≥2 策略同方向共振 → 立即触发预警
  • SLNO RVOL>3x 且跌幅>5% → 异常放量预警
  • 30 分钟同方向去重,防止重复推送
  • 信号 → Telegram 即时推送 + Webhook 日志
16:15 EST
📈 盘后复盘
  • 结算当日 PnL,更新 JSON 数据库
  • 运行全策略回测,计算 Sharpe/MDD/胜率
  • 生成 MSFT & SLNO 每日复盘表格
  • 发送 HTML 邮件复盘 + Telegram 摘要
  • 次日数据缓存清理,等待下一个交易日

🔍 代码深度审查 — 7 大缺陷修复

CRITICALBUG-1 [MSFT] volatility() 覆写 High/Low 原始列 → 严重前视偏差

原代码直接执行 df['High'] = df['High'].rolling(20).max(),覆写了原始 OHLC 数据。后续所有用到 High/Low 的计算(ATR、止损线、K 线图)都使用了"未来已知的"滚动极值,而非真实市价。

- df['High'] = df['High'].rolling(period).max() # 覆写原始列!
+ df['Roll_High'] = rolling_high(df['High'], period) # 独立列,shift(1)内置
CRITICALBUG-2 [MSFT] run_backtest() 胜率计算完全错误

原代码 t[1] > 0 判断的是卖出价格是否大于零(恒成立),而非是否盈利。导致胜率永远接近 100%,完全失去参考意义。

- wins = len([t for t in trades if t[0]=='SELL' and t[1]>0]) # t[1]=卖价,恒>0
+ pnl_pcts = [t.pnl_pct for t in trades]; wins = pnl_pcts[pnl_pcts>0] # 正确比较PnL
MEDIUMBUG-3 [SLNO] vwap_momentum 循环赋值在 pandas 2.x 静默失败

df.iloc[i, df.columns.get_loc('Hold')] = lookback 在 pandas 2.x 中因 chained indexing 静默失败,Hold 列永远为 0,持仓逻辑完全失效。

- for i in range(len(df)): df.iloc[i, ...] = lookback # 静默失败
+ anchor_vwap = df['VWAP'].where(anchor).ffill() # 完全向量化
MEDIUMBUG-4 [SLNO] VWAP 跨日累计失真

原代码对整个 period 数据累计计算 VWAP,随时间推移 VWAP 趋近于所有历史均价,失去当日参考意义。修复为每日分组独立重置。

- df['VWAP'] = (tp * vol).cumsum() / vol.cumsum() # 全周期累计
+ vwap = daily_vwap(df) # 按 df.index.normalize() 每日重置
MEDIUMBUG-5 信号基于当根 K 线成交(前视偏差)

所有策略生成信号后在同 bar 的 close 价格成交,相当于提前知道收盘价再做决策。修复:全部信号通过 .shift(1) 确认,在下一根 K 线开盘入场。

- df.loc[df['EMA_Fast'] > df['EMA_Slow'], 'Signal'] = 1 # 当根K线
+ prev_fast = df['EMA_Fast'].shift(1); df.loc[prev_fast > prev_slow, 'Signal'] = 1
LOWBUG-6 equity_curve 长度与 df 不匹配 + BUG-7 无重试机制

原版 equity_curve 只在部分条件下 append,长度不与 df 对齐,导致 Sharpe/MDD 基于错误的收益序列计算。同时 yfinance 调用无重试,网络断线直接 crash。

+ equity[i] = capital + position * price # 每根K线统一记录
+ @with_retry # 指数退避重试3次

💻 核心代码实现

# lifecycle/scheduler.py — 状态机驱动的生命周期引擎

class MarketState(Enum):
    IDLE = auto()
    PRE_MARKET = auto()    # 08:00-09:20 EST
    TRADING = auto()       # 09:30-16:00 EST
    POST_MARKET = auto()   # 16:15 EST

class TradingLifecycleEngine:
    async def run(self):
        premarket_done = False
        while True:
            state = self._get_state()   # 基于 EST 时间判断

            if state == MarketState.PRE_MARKET and not premarket_done:
                await self._run_premarket()   # 拉数据→剧本→发邮件
                premarket_done = True

            elif state == MarketState.TRADING:
                await self._run_intraday_scan()  # 每60s扫描→共振→预警
                await asyncio.sleep(60)
                continue

            elif state == MarketState.POST_MARKET and not postmarket_done:
                await self._run_postmarket()     # 结算→复盘→发报告
                postmarket_done = True

            await asyncio.sleep(30)

    def _compute_consensus(signals: dict) -> Optional[str]:
        # 多策略共振:≥2 个策略同方向 → 触发
        buys = sum(1 for v in signals.values() if v == 1)
        if buys >= 2: return "BUY"
        sells = sum(1 for v in signals.values() if v == -1)
        if sells >= 2: return "SELL"
        return None
# alerts/notification_engine.py — NotificationEngine 核心

class NotificationEngine:
    async def send(self, alert: Alert):
        # 优先级路由:CRITICAL/HIGH → Telegram+Webhook
        #             NORMAL → Email+Webhook
        tasks = []
        if alert.priority <= Priority.HIGH:
            tasks.append(_send_telegram(alert))   # 毫秒级触达手机
        if alert.priority == Priority.NORMAL:
            tasks.append(_send_email(alert))       # HTML富文本报告
        if "webhook" in alert.channels:
            tasks.append(_send_webhook(alert))     # 接入本地大屏
        await asyncio.gather(*tasks, return_exceptions=True)

    # 快捷方法
    async def signal_alert(self, symbol, signal, price, reason, strategy):
        await self.send(Alert(
            title=f"{'🟢买入' if signal=='BUY' else '🔴卖出'} {symbol}",
            body=f"策略:{strategy}\n信号:{signal}\n价格:${price:.2f}\n理由:{reason}",
            priority=Priority.HIGH,
            channels=["telegram", "webhook"],
        ))

    async def premarket_report(self, symbol, text, html):
        await self.send(Alert(
            title=f"📋 {symbol} 盘前交易剧本",
            body=text, html_body=html,
            priority=Priority.NORMAL,        # → Email 富文本报告
            channels=["email", "telegram", "webhook"],
        ))
# strategy/indicators.py — 修复 BUG-1 (rolling_high) & BUG-4 (daily_vwap)

def rolling_high(series, period):
    """修复 BUG-1: 不覆写原始列,内置 shift(1) 防前视"""
    return series.rolling(period).max().shift(1)   # shift已内置!

def daily_vwap(df):
    """修复 BUG-4: 每日重置 VWAP(按 index.normalize() 分组)"""
    typical = (df["High"] + df["Low"] + df["Close"]) / 3
    dates = df.index.normalize()
    vwap_vals = pd.Series(index=df.index, dtype=float)
    for date, grp_idx in df.groupby(dates).groups.items():
        tp = typical.loc[grp_idx]
        vol = df["Volume"].loc[grp_idx]
        # 每个交易日独立累计,互不影响
        vwap_vals.loc[grp_idx] = (tp * vol).cumsum() / (vol.cumsum() + 1e-10)
    return vwap_vals

# 修复 BUG-5: 策略信号全部基于 shift(1)
def strategy_trend(df, fast=12, slow=26):
    prev_fast = ema(df["Close"], fast).shift(1)   # 前一根已确认
    prev_slow = ema(df["Close"], slow).shift(1)
    df["Signal"] = 0
    df.loc[prev_fast > prev_slow, "Signal"] = 1
    df.loc[prev_fast < prev_slow, "Signal"] = -1
    return df
# backtest/engine.py — 修复 BUG-2 (胜率) & BUG-6 (equity_curve)

class BacktestEngine:
    def run(self, df, symbol, strategy_name, params=None, ...):
        equity = np.empty(len(df))   # 修复 BUG-6: 与 df 等长

        for i in range(len(df)):
            ...
            # 每根 K 线统一记录权益(含未实现 PnL)
            equity[i] = capital + position * price if position > 0 else capital

    def _compute_metrics(self, result):
        pnl_pcts = np.array([t.pnl_pct for t in trades])
        wins = pnl_pcts[pnl_pcts > 0]    # 修复 BUG-2: 按 PnL% 判断
        losses = pnl_pcts[pnl_pcts <= 0]

        result.win_rate = len(wins) / len(trades) * 100
        result.profit_factor = wins.sum() / abs(losses.sum())  # 真实盈亏比

        # Sharpe / Sortino / Calmar(均基于正确 equity_curve)
        rets = pd.Series(equity).pct_change().dropna()
        result.sharpe = rets.mean() / rets.std() * np.sqrt(252)
# .env 配置说明(复制 .env.example 填入真实值)

# ── Telegram Bot ────────────────────────────────
# 1. 在 @BotFather 创建 Bot → 获取 TOKEN
# 2. 给 Bot 发消息 → 访问 /getUpdates → 获取 chat_id
TELEGRAM_TOKEN=7123456789:AAF_your_token
TELEGRAM_CHAT_ID=123456789

# ── Gmail App Password(不是登录密码!)────────
# Gmail → 账号 → 安全 → 两步验证 → 应用专用密码(16位)
EMAIL_SENDER=you@gmail.com
EMAIL_PASSWORD=xxxx xxxx xxxx xxxx
EMAIL_RECIPIENT=target@example.com

# ── Webhook(接入本地 Open WebUI / 仪表盘)──────
WEBHOOK_URL=http://localhost:8080/webhook/ait
WEBHOOK_SECRET=your_secret_here

# ── FMP API(免费250次/天,新闻+数据)──────────
FMP_API_KEY=your_fmp_key

# ── 风控参数 ─────────────────────────────────────
CAPITAL_USDC=100000

# 测试: python main.py --test-alerts
# 离线回测: python main.py --backtest
# 全天候守护: python main.py
📱
Telegram Bot
交易信号触发、止损预警、SLNO异常放量等实时事件,第一时间推送到手机。支持 HTML 格式,优先级 HIGH/CRITICAL 自动发送,LOW 静默通知。
毫秒级触达 指数退避重试
📧
Email (HTML)
盘前剧本预测报告和盘后复盘摘要通过 HTML 富文本邮件发送,支持概率进度条、策略对比表格、动态止盈止损位可视化排版。
Gmail SMTP TLS App Password
🔗
Webhook POST
标准 JSON Payload 推送到本地 Open WebUI、个人数据大屏或其他系统。包含完整信号元数据:标的、方向、价格、策略名称、时间戳。
HMAC 签名验证 可接入仪表盘

🔴 实时预警流演示

模拟 · 仅供展示

🚀 快速部署

1

安装依赖

pip install yfinance pandas numpy pytz python-dotenv requests python-telegram-bot APScheduler pyarrow

2

配置环境变量

复制 /root/ait/.env.example.env,填入 Telegram Token、Gmail App Password、Webhook URL。

3

测试预警渠道

python main.py --test-alerts,验证 Telegram 消息是否到达手机,邮件是否收到。

4

运行离线回测验证

python main.py --backtest,对比原版与修复版指标差异,确认 BUG 修复有效。

5

启动全天候守护进程

python main.py 或通过 pm2 start main.py --interpreter python3 守护运行,系统将自动按美东时间流转盘前/盘中/盘后三阶段。

asyncio EST 时区精准 Telegram Bot Gmail SMTP Webhook ATR 动态止损 多策略共振 Parquet 缓存 指数退避重试 7 BUG修复