triangle-validator/validate.py
褚宏光 59622a6ef7 feat: 增加命令行参数以支持自定义枢轴点窗口、收敛比和最小收敛比例
docs: 更新 README 示例以包含比亚迪的日、周、月 K 线图
fix: 修复趋势线绘制逻辑,支持周K/月K聚合后的日期匹配
docs: 添加 K 线形态参数调整建议文档
2026-03-04 17:39:27 +08:00

490 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
triangle-validator / validate.py
收敛三角形形态可视化验证脚本HTTP 版)
流程:
1. 调 gpt_server POST /api/gptAgent/runFormula 执行公式 → 拿到 _id
2. 调 timeSeries GET /api/timeSeries/requestIndexDetail?id=_id → 拿到完整 detail含 klines
3. 调 chart.draw_triangle_chart() 绘图
用法:
python validate.py <股票名或代码> [频率] [--save] [--no-show] [--date 日期] [--window 窗口]
示例:
python validate.py 中远海控
python validate.py 中远海控 周 --save
python validate.py SH600519 月 --no-show --save
python validate.py 贵州茅台 日 --date 20260120 --save
python validate.py SH688777 日 --window 1Y --save
python validate.py SH688777 日 --window ALL --save
"""
import sys
import json
import argparse
from pathlib import Path
import requests
from config import GPT_SERVER_URL, DATA_SERVER_URL, KLINE_API_URL, SEARCH_API_URL, AUTH_TOKEN
from datetime import datetime, timedelta
_THIS_DIR = Path(__file__).parent.resolve()
# ── 时间窗口计算(与前端一致)───────────────────────────────────────────────
def get_window_start_date(window: str) -> str:
"""
根据时间窗口字符串计算相对于今天的开始日期(与前端 getWindowStartDate 一致)
Args:
window: 时间窗口,如 '1Y', '3Y', '5Y', 'ALL'
Returns:
开始日期,格式为 YYYYMMDD'20240130'
"""
if window == 'ALL':
return ''
today = datetime.now()
# 提取数字和单位
import re
match = re.match(r'^(\d+)([YMD])$', window, re.IGNORECASE)
if not match:
print(f'[WARN] 无效的时间窗口格式: {window}使用默认3年')
window = '3Y'
match = re.match(r'^(\d+)([YMD])$', window, re.IGNORECASE)
amount = int(match.group(1))
unit = match.group(2).upper()
if unit == 'Y': # 年
start_date = today - timedelta(days=amount * 365)
elif unit == 'M': # 月
start_date = today - timedelta(days=amount * 30)
elif unit == 'D': # 日
start_date = today - timedelta(days=amount)
else:
start_date = today
return start_date.strftime('%Y%m%d')
# ── K 线数据聚合(与前端 filterAndProcessKLineData 一致)───────────────────────
def aggregate_klines(klines: dict, freq: str) -> dict:
"""
将日K数据聚合成周K或月K与前端 filterAndProcessKLineData 一致)
Args:
klines: 日K数据 { dates: [...], open: [...], high: [...], low: [...], close: [...] }
freq: 频率 '' / '' / ''
Returns:
聚合后的 klines
"""
if freq == '':
return klines
from datetime import datetime
dates = klines['dates']
open_prices = klines['open']
high_prices = klines['high']
low_prices = klines['low']
close_prices = klines['close']
# 构建日数据列表
daily_data = []
for i in range(len(dates)):
daily_data.append({
'date': dates[i],
'open': open_prices[i],
'high': high_prices[i],
'low': low_prices[i],
'close': close_prices[i],
})
# 按周/月分组
grouped_data = {}
for d in daily_data:
date_str = str(d['date']) # YYYYMMDD
year = int(date_str[:4])
month = int(date_str[4:6])
day = int(date_str[6:8])
date_obj = datetime(year, month, day)
if freq == '':
# 计算该日期所在周的周一
day_of_week = date_obj.weekday() # 0 是周一
from datetime import timedelta
monday = date_obj - timedelta(days=day_of_week)
key = monday.strftime('%Y%m%d')
else: # 月
key = date_str[:6] # YYYYMM
if key not in grouped_data:
grouped_data[key] = []
grouped_data[key].append(d)
# 聚合每组数据
result_dates = []
result_open = []
result_high = []
result_low = []
result_close = []
for key in sorted(grouped_data.keys()):
group = grouped_data[key]
# 按日期排序
group.sort(key=lambda x: x['date'])
result_dates.append(group[-1]['date']) # 取最后一天作为标签
result_open.append(group[0]['open']) # 开盘取第一天
result_high.append(max(d['high'] for d in group)) # 最高取最高
result_low.append(min(d['low'] for d in group)) # 最低取最低
result_close.append(group[-1]['close']) # 收盘取最后一天
print(f'[INFO] K 线聚合: {len(dates)} 条日K → {len(result_dates)}{freq}K')
return {
'dates': result_dates,
'open': result_open,
'high': result_high,
'low': result_low,
'close': result_close,
'volume': [0] * len(result_dates),
}
# ── CLI 解析 ──────────────────────────────────────────────────────────────────
def parse_args():
parser = argparse.ArgumentParser(
description='收敛三角形形态可视化验证',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument('ticker', help='股票名称或代码,如:中远海控 / SH601919')
parser.add_argument(
'freq', nargs='?', default='',
choices=['', '', ''],
help='K 线频率(默认:日)',
)
parser.add_argument('--save', action='store_true', help='保存图片到 output/ 目录')
parser.add_argument('--no-show', action='store_true', help='不弹出图形窗口')
parser.add_argument('--date', type=int, default=None, metavar='YYYYMMDD',
help='截止日期,如 20260120')
parser.add_argument('--mode', type=int, default=0, choices=[0, 1, 2, 3],
help='强度模式0=等权 1=激进 2=保守 3=放量默认0')
parser.add_argument('--window', type=str, default='3Y',
choices=['1Y', '3Y', '5Y', 'ALL'],
help='K线时间窗口1Y=1年 3Y=3年 5Y=5年 ALL=全部默认3Y与前端一致')
parser.add_argument('--pivot-k', type=int, default=-1, metavar='N',
help='枢轴点窗口默认日K=15, 周K=5, 月K=3')
parser.add_argument('--shrink-ratio', type=float, default=-1.0, metavar='R',
help='收敛比三角形开口缩小程度默认0.8')
parser.add_argument('--min-convergence', type=float, default=-1.0, metavar='C',
help='最小收敛比例默认日K=0.45, 周K=0.45, 月K=0.50')
return parser.parse_args()
# ── Step 1执行公式拿 _id ──────────────────────────────────────────────────
def run_formula(ticker: str, freq: str, mode: int, date,
pivot_k: int = -1, shrink_ratio: float = -1.0,
min_convergence: float = -1.0) -> str:
"""
调 innerServer/runOneFormula 接口执行收敛三角形详情公式。
Returns:
index_info._id (str) — 直接从 response.output 取
"""
import uuid
# 构建参数列表
params = [ticker, freq, 'True', str(mode)]
# target_date
if date:
params.append(str(date))
else:
params.append('None')
# window, min_convergence, breakout_threshold, volume_multiplier 使用默认值
params.extend(['-1', '-1', '-1', '-1'])
# pivot_k, shrink_ratio
params.append(str(pivot_k))
params.append(str(shrink_ratio))
# 如果有自定义 min_convergence需要调整参数位置
# 参数顺序: ticker, freq, include_klines, mode, target_date,
# window, min_convergence, breakout_threshold, volume_multiplier,
# pivot_k, shrink_ratio, display_days, custom_weights, fast
if min_convergence > 0:
params[6] = str(min_convergence) # min_convergence 在索引 6
formula = f'结果=收敛三角形详情({",".join(params)})'
url = f'{GPT_SERVER_URL}/innerServer/runOneFormula'
payload = {
'formula_str': formula,
'begin_date': 0,
'user_name': 'mcp',
'tool_name': 'triangle_validate',
'session_id': f'tv-{uuid.uuid4().hex[:8]}',
}
print(f'[validate] 执行公式: {formula}')
resp = requests.post(url, json=payload, timeout=120)
resp.raise_for_status()
body = resp.json()
_id = (body.get('response', {}) or {}).get('output')
if not _id:
_id = (
(body.get('response', {}) or {})
.get('data', {})
.get('index_info', {})
.get('_id')
)
if not _id:
raise RuntimeError(
f'公式执行失败,未返回 _id\n响应: {json.dumps(body, ensure_ascii=False)[:500]}'
)
print(f'[validate] 公式执行完成_id = {_id}')
return str(_id)
# ── Step 2读取完整 detail含 klines───────────────────────────────────────
def fetch_detail(index_id: str) -> dict:
"""
调 requestIndexDetail 接口读取 detail含 chart_data.klines
Returns:
detail dict包含 strength / chart_data / klines 等所有字段
"""
url = f'{DATA_SERVER_URL}/api/timeSeries/requestIndexDetail'
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'}
params = {'id': index_id}
print(f'[validate] 读取详情: {url}?id={index_id}')
resp = requests.get(url, headers=headers, params=params, timeout=30)
resp.raise_for_status()
body = resp.json()
if body.get('code') != 0:
raise RuntimeError(f'requestIndexDetail 返回错误: {body}')
return body['data']['data_info']['detail']
# ── Step 2.4:搜索资产获取 ticker与前端一致───────────────────────────────────
def search_asset(keyword: str) -> dict:
"""
调用与前端相同的资产搜索 API。
Args:
keyword: 搜索关键词,如 "中控技术""SH688777""英伟达"
Returns:
dict: { code, name, ticker, type } 或 None
"""
url = f'{SEARCH_API_URL}/smartstock/assets/search'
params = {'q': keyword}
print(f'[validate] 搜索资产: {url}?q={keyword}')
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
body = resp.json()
if body.get('code') != 0:
raise RuntimeError(f'搜索返回错误: {body}')
items = body.get('data', {}).get('items', [])
if items:
result = items[0]
print(f'[validate] 搜索结果: {result}')
return result
else:
print(f'[validate] 搜索无结果')
return None
# ── Step 2.5:单独获取 K 线数据(与前端一致)───────────────────────────────────
def fetch_kline_data(ticker: str, begin_date: int | str = None) -> dict:
"""
调用与前端相同的 K 线数据 API。
Args:
ticker: 股票代码,如 "SH601919"
begin_date: 开始日期 YYYYMMDD 格式,如 20250101
Returns:
dict: { labels: [...], open: [...], high: [...], low: [...], close: [...] }
"""
url = f'{KLINE_API_URL}/dataSupport/getKLineDataByTicker'
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'}
params = {'ticker': ticker}
if begin_date:
params['begin_date'] = begin_date
print(f'[validate] 获取K线数据: {url}?ticker={ticker}&begin_date={begin_date}')
resp = requests.get(url, headers=headers, params=params, timeout=30)
resp.raise_for_status()
body = resp.json()
if body.get('code') != 0:
raise RuntimeError(f'getKLineDataByTicker 返回错误: {body}')
# 数据嵌套在 body['data'] 中
data = body.get('data', {})
print(f'[validate] K线数据获取成功{len(data.get("labels", []))}')
return data
# ── Step 3打印摘要 ──────────────────────────────────────────────────────────
def print_summary(detail: dict, ticker: str, freq: str):
cd = detail.get('chart_data', {})
strength = detail.get('strength', 0)
direction = detail.get('direction', 'none')
is_valid = detail.get('is_valid', False)
bp_up = detail.get('breakout_price_up', '-')
bp_down = detail.get('breakout_price_down', '-')
klines = cd.get('klines', {})
print(f'\n{"=" * 52}')
print(f' {ticker} {freq}K 收敛三角形')
print(f'{"=" * 52}')
print(f' 有效形态 : {is_valid}')
print(f' 综合强度 : {strength:.4f}')
print(f' 突破方向 : {direction}')
print(f' 上突破价 : {bp_up}')
print(f' 下突破价 : {bp_down}')
print(f' 上沿触碰 : {detail.get("touches_upper", "?")}')
print(f' 下沿触碰 : {detail.get("touches_lower", "?")}')
print(f' 收敛比例 : {detail.get("width_ratio", 0):.4f}')
print(f' 窗口日期 : {cd.get("window_start_date", "?")} ~ {cd.get("window_end_date", "?")}')
print(f' K 线根数 : {len(klines.get("dates", []))}')
print(f'{"=" * 52}\n')
# ── 主函数 ────────────────────────────────────────────────────────────────────
def run(args):
# 0. 搜索资产获取正确的 ticker与前端一致
search_result = search_asset(args.ticker)
if search_result:
kline_ticker = search_result.get('ticker') # 用于 K 线 API
asset_name = search_result.get('name') # 资产名称
print(f'[validate] 资产: {asset_name}, Ticker: {kline_ticker}')
else:
# 搜索失败,使用原始输入
kline_ticker = args.ticker
print(f'[WARN] 搜索失败,使用原始输入: {kline_ticker}')
# 1. 执行公式
try:
index_id = run_formula(
args.ticker, args.freq, args.mode, args.date,
pivot_k=args.pivot_k,
shrink_ratio=args.shrink_ratio,
min_convergence=args.min_convergence,
)
except Exception as e:
print(f'[ERROR] 公式执行失败: {e}')
sys.exit(1)
# 2. 读取结果
try:
detail = fetch_detail(index_id)
except Exception as e:
print(f'[ERROR] 读取详情失败: {e}')
sys.exit(1)
# 3. 摘要
print_summary(detail, args.ticker, args.freq)
cd = detail.get('chart_data', {})
# 4. 获取 K 线数据(使用搜索到的 ticker
# 根据 window 参数计算开始日期(与前端一致)
begin_date = get_window_start_date(args.window)
print(f'[validate] 时间窗口: {args.window}, 开始日期: {begin_date or "(全部)"}')
# 获取 K 线数据
klines = None
try:
kline_data = fetch_kline_data(kline_ticker, begin_date=begin_date)
# 🔧 过滤掉 None 值(非交易日),与前端 filterAndProcessKLineData 一致
raw_labels = kline_data.get('labels', [])
raw_open = kline_data.get('open', [])
raw_high = kline_data.get('high', [])
raw_low = kline_data.get('low', [])
raw_close = kline_data.get('close', [])
valid_indices = [
i for i in range(len(raw_labels))
if raw_open[i] is not None
and raw_high[i] is not None
and raw_low[i] is not None
and raw_close[i] is not None
]
if valid_indices:
# 转换为过滤后的 klines 格式
klines = {
'dates': [raw_labels[i] for i in valid_indices],
'open': [raw_open[i] for i in valid_indices],
'high': [raw_high[i] for i in valid_indices],
'low': [raw_low[i] for i in valid_indices],
'close': [raw_close[i] for i in valid_indices],
'volume': [0] * len(valid_indices), # 暂时用0填充
}
print(f'[INFO] K 线数据已获取,原始 {len(raw_labels)} 条,有效 {len(klines["dates"])}')
else:
print(f'[WARN] K 线 API 返回空数据')
except Exception as e:
print(f'[WARN] 获取 K 线数据失败: {e}')
# 如果 K 线 API 失败或返回空数据,使用公式返回的 klines
if not klines or not klines.get('dates'):
print('[INFO] 使用公式返回的 K 线数据')
klines = cd.get('klines')
if not klines or not klines.get('dates'):
print('[ERROR] 没有 K 线数据,跳过绘图。')
return
# 根据频率聚合 K 线数据(与前端 filterAndProcessKLineData 一致)
klines = aggregate_klines(klines, args.freq)
cd['klines'] = klines
# 5. 绘图
from chart import draw_triangle_chart
safe_ticker = args.ticker.replace('/', '_').replace('\\', '_')
output_path = None
if args.save:
output_dir = _THIS_DIR / 'output'
fname = f'{safe_ticker}_{args.freq}K_{cd.get("target_date", "")}.png'
output_path = str(output_dir / fname)
try:
draw_triangle_chart(
detail=detail,
output_path=output_path,
show=not args.no_show,
)
except Exception as e:
print(f'[ERROR] 绘图失败: {e}')
import traceback
traceback.print_exc()
sys.exit(1)
# ── 入口 ──────────────────────────────────────────────────────────────────────
if __name__ == '__main__':
run(parse_args())