核心代码
关键实现片段,点击标签切换
# data/fetcher.py — ccxt 分批拉取 + Gap 填充
def _fetch_all(self, timeframe, years):
tf_ms = self._timeframe_to_ms(timeframe)
since_ms = int((time.time() - years * 365 * 24 * 3600) * 1000)
all_ohlcv, current_since = [], since_ms
while True:
try:
batch = self.exchange.fetch_ohlcv(
"WLD/USDT", timeframe=timeframe,
since=current_since, limit=720, # Kraken 限制
)
except ccxt.RateLimitExceeded:
time.sleep(10); continue
if not batch: break
all_ohlcv.extend(batch)
current_since = batch[-1][0] + tf_ms
df = pd.DataFrame(all_ohlcv, columns=["ts","o","h","l","c","v"])
# 检测并填充 Gap(前值填充)
full_idx = pd.date_range(df.index[0], df.index[-1], freq="15T", tz="UTC")
return df.reindex(full_idx).ffill()
# strategies/indicators.py — Supertrend 向量化状态机
def supertrend(df, atr_period=10, multiplier=3.0):
hl2 = (df["high"] + df["low"]) / 2
atr_val = atr(df, atr_period)
upper = hl2 + multiplier * atr_val # 空头通道
lower = hl2 - multiplier * atr_val # 多头通道
final_upper = upper.values.copy()
final_lower = lower.values.copy()
trend, st = np.zeros(n), np.zeros(n)
for i in range(1, n):
# 上轨只收紧,不放宽(除非价格突破)
final_upper[i] = (
upper[i] if upper[i] < final_upper[i-1]
or close[i-1] > final_upper[i-1]
else final_upper[i-1]
)
# 趋势判断
if close[i] > final_upper[i]: trend[i] = -1 # 多头
else: trend[i] = 1 # 空头
return pd.DataFrame({
"trend": trend,
"buy_signal": (trend==-1) & np.roll(trend==1, 1),
}, index=df.index)
# strategies/indicators.py — 加密货币 24h 滚动 VWAP
def rolling_vwap(df, window=96): # 15m × 96 = 24h
typical_price = (df["high"] + df["low"] + df["close"]) / 3
pv = typical_price * df["volume"]
# 滚动成交量加权均价
vwap = pv.rolling(window).sum() / df["volume"].rolling(window).sum()
vwap_std = (typical_price - vwap).rolling(window).std()
# 偏离倍数(关键信号)
dev = (df["close"] - vwap) / (vwap_std + 1e-10)
return pd.DataFrame({
"vwap": vwap,
"vwap_dev": dev, # < -2.0 → 超卖信号
"vwap_upper2": vwap + 2 * vwap_std,
"vwap_lower2": vwap - 2 * vwap_std,
}, index=df.index)
# backtest/engine.py — Maker/Taker 费率区分
def run(self, df, strategy_name, params):
capital = self.capital
for i in range(1, len(df)):
price = df.iloc[i]["close"]
# 出场检查
if current_trade:
if price <= row["stop_loss"]: # ATR 止损
exit_reason = "atr_stop"
fee_rate = self.taker_fee # 市价 0.40%
elif prev_row["signal"] == -1: # 策略出场
exit_reason = "signal"
fee_rate = self.maker_fee # 限价 0.16%
# 入场:Post-Only 限价单
if not current_trade and prev_signal == 1:
size = capital * 0.20 # 20% 仓位
fee = size * self.maker_fee # 0.16%
# data/fetcher.py — WebSocket 指数退避重连
async def run(self):
self._running = True
while self._running:
try:
await self._connect()
self._reconnect_count = 0 # 成功重置计数
except (websockets.ConnectionClosed, ConnectionResetError) as e:
logger.warning("WS 断线: %s", e)
# 指数退避:2s → 4s → 8s → ... 最大 60s
delay = min(
2.0 * (2 ** self._reconnect_count),
60.0
)
self._reconnect_count += 1
logger.info("%.1f 秒后重连 #%d", delay, self._reconnect_count)
await asyncio.sleep(delay)