triangle-validator/validate.py
褚宏光 604e88df4c fix: 添加日K/周K/月K聚合功能
- 新增 aggregate_klines() 函数,与前端 filterAndProcessKLineData 一致
- 周K:将同一周的日K聚合成一根K线
- 月K:将同一月的日K聚合成一根K线

测试结果:
- 日K:724 条
- 周K:154 条
- 月K:37 条

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 15:14:29 +08:00

457 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
triangle-validator / validate.py
收敛三角形形态可视化验证脚本HTTP 版)
流程:
1. 调 gpt_server POST /api/gptAgent/runFormula 执行公式 → 拿到 _id
2. 调 timeSeries GET /api/timeSeries/requestIndexDetail?id=_id → 拿到完整 detail含 klines
3. 调 chart.draw_triangle_chart() 绘图
用法:
python validate.py <股票名或代码> [频率] [--save] [--no-show] [--date 日期] [--window 窗口]
示例:
python validate.py 中远海控
python validate.py 中远海控 周 --save
python validate.py SH600519 月 --no-show --save
python validate.py 贵州茅台 日 --date 20260120 --save
python validate.py SH688777 日 --window 1Y --save
python validate.py SH688777 日 --window ALL --save
"""
import sys
import json
import argparse
from pathlib import Path
import requests
from config import GPT_SERVER_URL, DATA_SERVER_URL, KLINE_API_URL, SEARCH_API_URL, AUTH_TOKEN
from datetime import datetime, timedelta
_THIS_DIR = Path(__file__).parent.resolve()
# ── 时间窗口计算(与前端一致)───────────────────────────────────────────────
def get_window_start_date(window: str) -> str:
"""
根据时间窗口字符串计算相对于今天的开始日期(与前端 getWindowStartDate 一致)
Args:
window: 时间窗口,如 '1Y', '3Y', '5Y', 'ALL'
Returns:
开始日期,格式为 YYYYMMDD'20240130'
"""
if window == 'ALL':
return ''
today = datetime.now()
# 提取数字和单位
import re
match = re.match(r'^(\d+)([YMD])$', window, re.IGNORECASE)
if not match:
print(f'[WARN] 无效的时间窗口格式: {window}使用默认3年')
window = '3Y'
match = re.match(r'^(\d+)([YMD])$', window, re.IGNORECASE)
amount = int(match.group(1))
unit = match.group(2).upper()
if unit == 'Y': # 年
start_date = today - timedelta(days=amount * 365)
elif unit == 'M': # 月
start_date = today - timedelta(days=amount * 30)
elif unit == 'D': # 日
start_date = today - timedelta(days=amount)
else:
start_date = today
return start_date.strftime('%Y%m%d')
# ── K 线数据聚合(与前端 filterAndProcessKLineData 一致)───────────────────────
def aggregate_klines(klines: dict, freq: str) -> dict:
"""
将日K数据聚合成周K或月K与前端 filterAndProcessKLineData 一致)
Args:
klines: 日K数据 { dates: [...], open: [...], high: [...], low: [...], close: [...] }
freq: 频率 '' / '' / ''
Returns:
聚合后的 klines
"""
if freq == '':
return klines
from datetime import datetime
dates = klines['dates']
open_prices = klines['open']
high_prices = klines['high']
low_prices = klines['low']
close_prices = klines['close']
# 构建日数据列表
daily_data = []
for i in range(len(dates)):
daily_data.append({
'date': dates[i],
'open': open_prices[i],
'high': high_prices[i],
'low': low_prices[i],
'close': close_prices[i],
})
# 按周/月分组
grouped_data = {}
for d in daily_data:
date_str = str(d['date']) # YYYYMMDD
year = int(date_str[:4])
month = int(date_str[4:6])
day = int(date_str[6:8])
date_obj = datetime(year, month, day)
if freq == '':
# 计算该日期所在周的周一
day_of_week = date_obj.weekday() # 0 是周一
from datetime import timedelta
monday = date_obj - timedelta(days=day_of_week)
key = monday.strftime('%Y%m%d')
else: # 月
key = date_str[:6] # YYYYMM
if key not in grouped_data:
grouped_data[key] = []
grouped_data[key].append(d)
# 聚合每组数据
result_dates = []
result_open = []
result_high = []
result_low = []
result_close = []
for key in sorted(grouped_data.keys()):
group = grouped_data[key]
# 按日期排序
group.sort(key=lambda x: x['date'])
result_dates.append(group[-1]['date']) # 取最后一天作为标签
result_open.append(group[0]['open']) # 开盘取第一天
result_high.append(max(d['high'] for d in group)) # 最高取最高
result_low.append(min(d['low'] for d in group)) # 最低取最低
result_close.append(group[-1]['close']) # 收盘取最后一天
print(f'[INFO] K 线聚合: {len(dates)} 条日K → {len(result_dates)}{freq}K')
return {
'dates': result_dates,
'open': result_open,
'high': result_high,
'low': result_low,
'close': result_close,
'volume': [0] * len(result_dates),
}
# ── CLI 解析 ──────────────────────────────────────────────────────────────────
def parse_args():
parser = argparse.ArgumentParser(
description='收敛三角形形态可视化验证',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument('ticker', help='股票名称或代码,如:中远海控 / SH601919')
parser.add_argument(
'freq', nargs='?', default='',
choices=['', '', ''],
help='K 线频率(默认:日)',
)
parser.add_argument('--save', action='store_true', help='保存图片到 output/ 目录')
parser.add_argument('--no-show', action='store_true', help='不弹出图形窗口')
parser.add_argument('--date', type=int, default=None, metavar='YYYYMMDD',
help='截止日期,如 20260120')
parser.add_argument('--mode', type=int, default=0, choices=[0, 1, 2, 3],
help='强度模式0=等权 1=激进 2=保守 3=放量默认0')
parser.add_argument('--window', type=str, default='3Y',
choices=['1Y', '3Y', '5Y', 'ALL'],
help='K线时间窗口1Y=1年 3Y=3年 5Y=5年 ALL=全部默认3Y与前端一致')
return parser.parse_args()
# ── Step 1执行公式拿 _id ──────────────────────────────────────────────────
def run_formula(ticker: str, freq: str, mode: int, date) -> str:
"""
调 innerServer/runOneFormula 接口执行收敛三角形详情公式。
Returns:
index_info._id (str) — 直接从 response.output 取
"""
import uuid
if date:
formula = f'结果=收敛三角形详情({ticker},{freq},True,{mode},{date})'
else:
formula = f'结果=收敛三角形详情({ticker},{freq},True,{mode})'
url = f'{GPT_SERVER_URL}/innerServer/runOneFormula'
payload = {
'formula_str': formula,
'begin_date': 0,
'user_name': 'mcp',
'tool_name': 'triangle_validate',
'session_id': f'tv-{uuid.uuid4().hex[:8]}',
}
print(f'[validate] 执行公式: {formula}')
resp = requests.post(url, json=payload, timeout=120)
resp.raise_for_status()
body = resp.json()
_id = (body.get('response', {}) or {}).get('output')
if not _id:
_id = (
(body.get('response', {}) or {})
.get('data', {})
.get('index_info', {})
.get('_id')
)
if not _id:
raise RuntimeError(
f'公式执行失败,未返回 _id\n响应: {json.dumps(body, ensure_ascii=False)[:500]}'
)
print(f'[validate] 公式执行完成_id = {_id}')
return str(_id)
# ── Step 2读取完整 detail含 klines───────────────────────────────────────
def fetch_detail(index_id: str) -> dict:
"""
调 requestIndexDetail 接口读取 detail含 chart_data.klines
Returns:
detail dict包含 strength / chart_data / klines 等所有字段
"""
url = f'{DATA_SERVER_URL}/api/timeSeries/requestIndexDetail'
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'}
params = {'id': index_id}
print(f'[validate] 读取详情: {url}?id={index_id}')
resp = requests.get(url, headers=headers, params=params, timeout=30)
resp.raise_for_status()
body = resp.json()
if body.get('code') != 0:
raise RuntimeError(f'requestIndexDetail 返回错误: {body}')
return body['data']['data_info']['detail']
# ── Step 2.4:搜索资产获取 ticker与前端一致───────────────────────────────────
def search_asset(keyword: str) -> dict:
"""
调用与前端相同的资产搜索 API。
Args:
keyword: 搜索关键词,如 "中控技术""SH688777""英伟达"
Returns:
dict: { code, name, ticker, type } 或 None
"""
url = f'{SEARCH_API_URL}/smartstock/assets/search'
params = {'q': keyword}
print(f'[validate] 搜索资产: {url}?q={keyword}')
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
body = resp.json()
if body.get('code') != 0:
raise RuntimeError(f'搜索返回错误: {body}')
items = body.get('data', {}).get('items', [])
if items:
result = items[0]
print(f'[validate] 搜索结果: {result}')
return result
else:
print(f'[validate] 搜索无结果')
return None
# ── Step 2.5:单独获取 K 线数据(与前端一致)───────────────────────────────────
def fetch_kline_data(ticker: str, begin_date: int | str = None) -> dict:
"""
调用与前端相同的 K 线数据 API。
Args:
ticker: 股票代码,如 "SH601919"
begin_date: 开始日期 YYYYMMDD 格式,如 20250101
Returns:
dict: { labels: [...], open: [...], high: [...], low: [...], close: [...] }
"""
url = f'{KLINE_API_URL}/dataSupport/getKLineDataByTicker'
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'}
params = {'ticker': ticker}
if begin_date:
params['begin_date'] = begin_date
print(f'[validate] 获取K线数据: {url}?ticker={ticker}&begin_date={begin_date}')
resp = requests.get(url, headers=headers, params=params, timeout=30)
resp.raise_for_status()
body = resp.json()
if body.get('code') != 0:
raise RuntimeError(f'getKLineDataByTicker 返回错误: {body}')
# 数据嵌套在 body['data'] 中
data = body.get('data', {})
print(f'[validate] K线数据获取成功{len(data.get("labels", []))}')
return data
# ── Step 3打印摘要 ──────────────────────────────────────────────────────────
def print_summary(detail: dict, ticker: str, freq: str):
cd = detail.get('chart_data', {})
strength = detail.get('strength', 0)
direction = detail.get('direction', 'none')
is_valid = detail.get('is_valid', False)
bp_up = detail.get('breakout_price_up', '-')
bp_down = detail.get('breakout_price_down', '-')
klines = cd.get('klines', {})
print(f'\n{"=" * 52}')
print(f' {ticker} {freq}K 收敛三角形')
print(f'{"=" * 52}')
print(f' 有效形态 : {is_valid}')
print(f' 综合强度 : {strength:.4f}')
print(f' 突破方向 : {direction}')
print(f' 上突破价 : {bp_up}')
print(f' 下突破价 : {bp_down}')
print(f' 上沿触碰 : {detail.get("touches_upper", "?")}')
print(f' 下沿触碰 : {detail.get("touches_lower", "?")}')
print(f' 收敛比例 : {detail.get("width_ratio", 0):.4f}')
print(f' 窗口日期 : {cd.get("window_start_date", "?")} ~ {cd.get("window_end_date", "?")}')
print(f' K 线根数 : {len(klines.get("dates", []))}')
print(f'{"=" * 52}\n')
# ── 主函数 ────────────────────────────────────────────────────────────────────
def run(args):
# 0. 搜索资产获取正确的 ticker与前端一致
search_result = search_asset(args.ticker)
if search_result:
kline_ticker = search_result.get('ticker') # 用于 K 线 API
asset_name = search_result.get('name') # 资产名称
print(f'[validate] 资产: {asset_name}, Ticker: {kline_ticker}')
else:
# 搜索失败,使用原始输入
kline_ticker = args.ticker
print(f'[WARN] 搜索失败,使用原始输入: {kline_ticker}')
# 1. 执行公式
try:
index_id = run_formula(args.ticker, args.freq, args.mode, args.date)
except Exception as e:
print(f'[ERROR] 公式执行失败: {e}')
sys.exit(1)
# 2. 读取结果
try:
detail = fetch_detail(index_id)
except Exception as e:
print(f'[ERROR] 读取详情失败: {e}')
sys.exit(1)
# 3. 摘要
print_summary(detail, args.ticker, args.freq)
cd = detail.get('chart_data', {})
# 4. 获取 K 线数据(使用搜索到的 ticker
# 根据 window 参数计算开始日期(与前端一致)
begin_date = get_window_start_date(args.window)
print(f'[validate] 时间窗口: {args.window}, 开始日期: {begin_date or "(全部)"}')
# 获取 K 线数据
klines = None
try:
kline_data = fetch_kline_data(kline_ticker, begin_date=begin_date)
# 🔧 过滤掉 None 值(非交易日),与前端 filterAndProcessKLineData 一致
raw_labels = kline_data.get('labels', [])
raw_open = kline_data.get('open', [])
raw_high = kline_data.get('high', [])
raw_low = kline_data.get('low', [])
raw_close = kline_data.get('close', [])
valid_indices = [
i for i in range(len(raw_labels))
if raw_open[i] is not None
and raw_high[i] is not None
and raw_low[i] is not None
and raw_close[i] is not None
]
if valid_indices:
# 转换为过滤后的 klines 格式
klines = {
'dates': [raw_labels[i] for i in valid_indices],
'open': [raw_open[i] for i in valid_indices],
'high': [raw_high[i] for i in valid_indices],
'low': [raw_low[i] for i in valid_indices],
'close': [raw_close[i] for i in valid_indices],
'volume': [0] * len(valid_indices), # 暂时用0填充
}
print(f'[INFO] K 线数据已获取,原始 {len(raw_labels)} 条,有效 {len(klines["dates"])}')
else:
print(f'[WARN] K 线 API 返回空数据')
except Exception as e:
print(f'[WARN] 获取 K 线数据失败: {e}')
# 如果 K 线 API 失败或返回空数据,使用公式返回的 klines
if not klines or not klines.get('dates'):
print('[INFO] 使用公式返回的 K 线数据')
klines = cd.get('klines')
if not klines or not klines.get('dates'):
print('[ERROR] 没有 K 线数据,跳过绘图。')
return
# 根据频率聚合 K 线数据(与前端 filterAndProcessKLineData 一致)
klines = aggregate_klines(klines, args.freq)
cd['klines'] = klines
# 5. 绘图
from chart import draw_triangle_chart
safe_ticker = args.ticker.replace('/', '_').replace('\\', '_')
output_path = None
if args.save:
output_dir = _THIS_DIR / 'output'
fname = f'{safe_ticker}_{args.freq}K_{cd.get("target_date", "")}.png'
output_path = str(output_dir / fname)
try:
draw_triangle_chart(
detail=detail,
output_path=output_path,
show=not args.no_show,
)
except Exception as e:
print(f'[ERROR] 绘图失败: {e}')
import traceback
traceback.print_exc()
sys.exit(1)
# ── 入口 ──────────────────────────────────────────────────────────────────────
if __name__ == '__main__':
run(parse_args())