docs: 更新 README 示例以包含比亚迪的日、周、月 K 线图 fix: 修复趋势线绘制逻辑,支持周K/月K聚合后的日期匹配 docs: 添加 K 线形态参数调整建议文档
490 lines
18 KiB
Python
490 lines
18 KiB
Python
"""
|
||
triangle-validator / validate.py
|
||
|
||
收敛三角形形态可视化验证脚本(HTTP 版)
|
||
|
||
流程:
|
||
1. 调 gpt_server POST /api/gptAgent/runFormula 执行公式 → 拿到 _id
|
||
2. 调 timeSeries GET /api/timeSeries/requestIndexDetail?id=_id → 拿到完整 detail(含 klines)
|
||
3. 调 chart.draw_triangle_chart() 绘图
|
||
|
||
用法:
|
||
python validate.py <股票名或代码> [频率] [--save] [--no-show] [--date 日期] [--window 窗口]
|
||
|
||
示例:
|
||
python validate.py 中远海控
|
||
python validate.py 中远海控 周 --save
|
||
python validate.py SH600519 月 --no-show --save
|
||
python validate.py 贵州茅台 日 --date 20260120 --save
|
||
python validate.py SH688777 日 --window 1Y --save
|
||
python validate.py SH688777 日 --window ALL --save
|
||
"""
|
||
|
||
import sys
|
||
import json
|
||
import argparse
|
||
from pathlib import Path
|
||
|
||
import requests
|
||
|
||
from config import GPT_SERVER_URL, DATA_SERVER_URL, KLINE_API_URL, SEARCH_API_URL, AUTH_TOKEN
|
||
from datetime import datetime, timedelta
|
||
|
||
_THIS_DIR = Path(__file__).parent.resolve()
|
||
|
||
|
||
# ── 时间窗口计算(与前端一致)───────────────────────────────────────────────
|
||
def get_window_start_date(window: str) -> str:
|
||
"""
|
||
根据时间窗口字符串计算相对于今天的开始日期(与前端 getWindowStartDate 一致)
|
||
|
||
Args:
|
||
window: 时间窗口,如 '1Y', '3Y', '5Y', 'ALL'
|
||
|
||
Returns:
|
||
开始日期,格式为 YYYYMMDD,如 '20240130'
|
||
"""
|
||
if window == 'ALL':
|
||
return ''
|
||
|
||
today = datetime.now()
|
||
|
||
# 提取数字和单位
|
||
import re
|
||
match = re.match(r'^(\d+)([YMD])$', window, re.IGNORECASE)
|
||
if not match:
|
||
print(f'[WARN] 无效的时间窗口格式: {window},使用默认3年')
|
||
window = '3Y'
|
||
match = re.match(r'^(\d+)([YMD])$', window, re.IGNORECASE)
|
||
|
||
amount = int(match.group(1))
|
||
unit = match.group(2).upper()
|
||
|
||
if unit == 'Y': # 年
|
||
start_date = today - timedelta(days=amount * 365)
|
||
elif unit == 'M': # 月
|
||
start_date = today - timedelta(days=amount * 30)
|
||
elif unit == 'D': # 日
|
||
start_date = today - timedelta(days=amount)
|
||
else:
|
||
start_date = today
|
||
|
||
return start_date.strftime('%Y%m%d')
|
||
|
||
|
||
# ── K 线数据聚合(与前端 filterAndProcessKLineData 一致)───────────────────────
|
||
def aggregate_klines(klines: dict, freq: str) -> dict:
|
||
"""
|
||
将日K数据聚合成周K或月K(与前端 filterAndProcessKLineData 一致)
|
||
|
||
Args:
|
||
klines: 日K数据 { dates: [...], open: [...], high: [...], low: [...], close: [...] }
|
||
freq: 频率 '日' / '周' / '月'
|
||
|
||
Returns:
|
||
聚合后的 klines
|
||
"""
|
||
if freq == '日':
|
||
return klines
|
||
|
||
from datetime import datetime
|
||
|
||
dates = klines['dates']
|
||
open_prices = klines['open']
|
||
high_prices = klines['high']
|
||
low_prices = klines['low']
|
||
close_prices = klines['close']
|
||
|
||
# 构建日数据列表
|
||
daily_data = []
|
||
for i in range(len(dates)):
|
||
daily_data.append({
|
||
'date': dates[i],
|
||
'open': open_prices[i],
|
||
'high': high_prices[i],
|
||
'low': low_prices[i],
|
||
'close': close_prices[i],
|
||
})
|
||
|
||
# 按周/月分组
|
||
grouped_data = {}
|
||
|
||
for d in daily_data:
|
||
date_str = str(d['date']) # YYYYMMDD
|
||
year = int(date_str[:4])
|
||
month = int(date_str[4:6])
|
||
day = int(date_str[6:8])
|
||
|
||
date_obj = datetime(year, month, day)
|
||
|
||
if freq == '周':
|
||
# 计算该日期所在周的周一
|
||
day_of_week = date_obj.weekday() # 0 是周一
|
||
from datetime import timedelta
|
||
monday = date_obj - timedelta(days=day_of_week)
|
||
key = monday.strftime('%Y%m%d')
|
||
else: # 月
|
||
key = date_str[:6] # YYYYMM
|
||
|
||
if key not in grouped_data:
|
||
grouped_data[key] = []
|
||
grouped_data[key].append(d)
|
||
|
||
# 聚合每组数据
|
||
result_dates = []
|
||
result_open = []
|
||
result_high = []
|
||
result_low = []
|
||
result_close = []
|
||
|
||
for key in sorted(grouped_data.keys()):
|
||
group = grouped_data[key]
|
||
# 按日期排序
|
||
group.sort(key=lambda x: x['date'])
|
||
|
||
result_dates.append(group[-1]['date']) # 取最后一天作为标签
|
||
result_open.append(group[0]['open']) # 开盘取第一天
|
||
result_high.append(max(d['high'] for d in group)) # 最高取最高
|
||
result_low.append(min(d['low'] for d in group)) # 最低取最低
|
||
result_close.append(group[-1]['close']) # 收盘取最后一天
|
||
|
||
print(f'[INFO] K 线聚合: {len(dates)} 条日K → {len(result_dates)} 条{freq}K')
|
||
|
||
return {
|
||
'dates': result_dates,
|
||
'open': result_open,
|
||
'high': result_high,
|
||
'low': result_low,
|
||
'close': result_close,
|
||
'volume': [0] * len(result_dates),
|
||
}
|
||
|
||
|
||
# ── CLI 解析 ──────────────────────────────────────────────────────────────────
|
||
def parse_args():
|
||
parser = argparse.ArgumentParser(
|
||
description='收敛三角形形态可视化验证',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog=__doc__,
|
||
)
|
||
parser.add_argument('ticker', help='股票名称或代码,如:中远海控 / SH601919')
|
||
parser.add_argument(
|
||
'freq', nargs='?', default='日',
|
||
choices=['日', '周', '月'],
|
||
help='K 线频率(默认:日)',
|
||
)
|
||
parser.add_argument('--save', action='store_true', help='保存图片到 output/ 目录')
|
||
parser.add_argument('--no-show', action='store_true', help='不弹出图形窗口')
|
||
parser.add_argument('--date', type=int, default=None, metavar='YYYYMMDD',
|
||
help='截止日期,如 20260120')
|
||
parser.add_argument('--mode', type=int, default=0, choices=[0, 1, 2, 3],
|
||
help='强度模式:0=等权 1=激进 2=保守 3=放量(默认:0)')
|
||
parser.add_argument('--window', type=str, default='3Y',
|
||
choices=['1Y', '3Y', '5Y', 'ALL'],
|
||
help='K线时间窗口:1Y=1年 3Y=3年 5Y=5年 ALL=全部(默认:3Y,与前端一致)')
|
||
parser.add_argument('--pivot-k', type=int, default=-1, metavar='N',
|
||
help='枢轴点窗口(默认:日K=15, 周K=5, 月K=3)')
|
||
parser.add_argument('--shrink-ratio', type=float, default=-1.0, metavar='R',
|
||
help='收敛比,三角形开口缩小程度(默认:0.8)')
|
||
parser.add_argument('--min-convergence', type=float, default=-1.0, metavar='C',
|
||
help='最小收敛比例(默认:日K=0.45, 周K=0.45, 月K=0.50)')
|
||
return parser.parse_args()
|
||
|
||
|
||
# ── Step 1:执行公式,拿 _id ──────────────────────────────────────────────────
|
||
def run_formula(ticker: str, freq: str, mode: int, date,
|
||
pivot_k: int = -1, shrink_ratio: float = -1.0,
|
||
min_convergence: float = -1.0) -> str:
|
||
"""
|
||
调 innerServer/runOneFormula 接口执行收敛三角形详情公式。
|
||
Returns:
|
||
index_info._id (str) — 直接从 response.output 取
|
||
"""
|
||
import uuid
|
||
|
||
# 构建参数列表
|
||
params = [ticker, freq, 'True', str(mode)]
|
||
|
||
# target_date
|
||
if date:
|
||
params.append(str(date))
|
||
else:
|
||
params.append('None')
|
||
|
||
# window, min_convergence, breakout_threshold, volume_multiplier 使用默认值
|
||
params.extend(['-1', '-1', '-1', '-1'])
|
||
|
||
# pivot_k, shrink_ratio
|
||
params.append(str(pivot_k))
|
||
params.append(str(shrink_ratio))
|
||
|
||
# 如果有自定义 min_convergence,需要调整参数位置
|
||
# 参数顺序: ticker, freq, include_klines, mode, target_date,
|
||
# window, min_convergence, breakout_threshold, volume_multiplier,
|
||
# pivot_k, shrink_ratio, display_days, custom_weights, fast
|
||
if min_convergence > 0:
|
||
params[6] = str(min_convergence) # min_convergence 在索引 6
|
||
|
||
formula = f'结果=收敛三角形详情({",".join(params)})'
|
||
|
||
url = f'{GPT_SERVER_URL}/innerServer/runOneFormula'
|
||
payload = {
|
||
'formula_str': formula,
|
||
'begin_date': 0,
|
||
'user_name': 'mcp',
|
||
'tool_name': 'triangle_validate',
|
||
'session_id': f'tv-{uuid.uuid4().hex[:8]}',
|
||
}
|
||
|
||
print(f'[validate] 执行公式: {formula}')
|
||
resp = requests.post(url, json=payload, timeout=120)
|
||
resp.raise_for_status()
|
||
|
||
body = resp.json()
|
||
_id = (body.get('response', {}) or {}).get('output')
|
||
if not _id:
|
||
_id = (
|
||
(body.get('response', {}) or {})
|
||
.get('data', {})
|
||
.get('index_info', {})
|
||
.get('_id')
|
||
)
|
||
if not _id:
|
||
raise RuntimeError(
|
||
f'公式执行失败,未返回 _id\n响应: {json.dumps(body, ensure_ascii=False)[:500]}'
|
||
)
|
||
|
||
print(f'[validate] 公式执行完成,_id = {_id}')
|
||
return str(_id)
|
||
|
||
|
||
# ── Step 2:读取完整 detail(含 klines)───────────────────────────────────────
|
||
def fetch_detail(index_id: str) -> dict:
|
||
"""
|
||
调 requestIndexDetail 接口读取 detail(含 chart_data.klines)。
|
||
Returns:
|
||
detail dict:包含 strength / chart_data / klines 等所有字段
|
||
"""
|
||
url = f'{DATA_SERVER_URL}/api/timeSeries/requestIndexDetail'
|
||
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'}
|
||
params = {'id': index_id}
|
||
|
||
print(f'[validate] 读取详情: {url}?id={index_id}')
|
||
resp = requests.get(url, headers=headers, params=params, timeout=30)
|
||
resp.raise_for_status()
|
||
|
||
body = resp.json()
|
||
if body.get('code') != 0:
|
||
raise RuntimeError(f'requestIndexDetail 返回错误: {body}')
|
||
|
||
return body['data']['data_info']['detail']
|
||
|
||
|
||
# ── Step 2.4:搜索资产获取 ticker(与前端一致)───────────────────────────────────
|
||
def search_asset(keyword: str) -> dict:
|
||
"""
|
||
调用与前端相同的资产搜索 API。
|
||
|
||
Args:
|
||
keyword: 搜索关键词,如 "中控技术"、"SH688777"、"英伟达"
|
||
|
||
Returns:
|
||
dict: { code, name, ticker, type } 或 None
|
||
"""
|
||
url = f'{SEARCH_API_URL}/smartstock/assets/search'
|
||
params = {'q': keyword}
|
||
|
||
print(f'[validate] 搜索资产: {url}?q={keyword}')
|
||
resp = requests.get(url, params=params, timeout=10)
|
||
resp.raise_for_status()
|
||
|
||
body = resp.json()
|
||
if body.get('code') != 0:
|
||
raise RuntimeError(f'搜索返回错误: {body}')
|
||
|
||
items = body.get('data', {}).get('items', [])
|
||
if items:
|
||
result = items[0]
|
||
print(f'[validate] 搜索结果: {result}')
|
||
return result
|
||
else:
|
||
print(f'[validate] 搜索无结果')
|
||
return None
|
||
|
||
|
||
# ── Step 2.5:单独获取 K 线数据(与前端一致)───────────────────────────────────
|
||
def fetch_kline_data(ticker: str, begin_date: int | str = None) -> dict:
|
||
"""
|
||
调用与前端相同的 K 线数据 API。
|
||
|
||
Args:
|
||
ticker: 股票代码,如 "SH601919"
|
||
begin_date: 开始日期 YYYYMMDD 格式,如 20250101
|
||
|
||
Returns:
|
||
dict: { labels: [...], open: [...], high: [...], low: [...], close: [...] }
|
||
"""
|
||
url = f'{KLINE_API_URL}/dataSupport/getKLineDataByTicker'
|
||
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'}
|
||
params = {'ticker': ticker}
|
||
if begin_date:
|
||
params['begin_date'] = begin_date
|
||
|
||
print(f'[validate] 获取K线数据: {url}?ticker={ticker}&begin_date={begin_date}')
|
||
resp = requests.get(url, headers=headers, params=params, timeout=30)
|
||
resp.raise_for_status()
|
||
|
||
body = resp.json()
|
||
if body.get('code') != 0:
|
||
raise RuntimeError(f'getKLineDataByTicker 返回错误: {body}')
|
||
|
||
# 数据嵌套在 body['data'] 中
|
||
data = body.get('data', {})
|
||
print(f'[validate] K线数据获取成功,共 {len(data.get("labels", []))} 条')
|
||
return data
|
||
|
||
|
||
# ── Step 3:打印摘要 ──────────────────────────────────────────────────────────
|
||
def print_summary(detail: dict, ticker: str, freq: str):
|
||
cd = detail.get('chart_data', {})
|
||
strength = detail.get('strength', 0)
|
||
direction = detail.get('direction', 'none')
|
||
is_valid = detail.get('is_valid', False)
|
||
bp_up = detail.get('breakout_price_up', '-')
|
||
bp_down = detail.get('breakout_price_down', '-')
|
||
klines = cd.get('klines', {})
|
||
|
||
print(f'\n{"=" * 52}')
|
||
print(f' {ticker} {freq}K 收敛三角形')
|
||
print(f'{"=" * 52}')
|
||
print(f' 有效形态 : {is_valid}')
|
||
print(f' 综合强度 : {strength:.4f}')
|
||
print(f' 突破方向 : {direction}')
|
||
print(f' 上突破价 : {bp_up}')
|
||
print(f' 下突破价 : {bp_down}')
|
||
print(f' 上沿触碰 : {detail.get("touches_upper", "?")}')
|
||
print(f' 下沿触碰 : {detail.get("touches_lower", "?")}')
|
||
print(f' 收敛比例 : {detail.get("width_ratio", 0):.4f}')
|
||
print(f' 窗口日期 : {cd.get("window_start_date", "?")} ~ {cd.get("window_end_date", "?")}')
|
||
print(f' K 线根数 : {len(klines.get("dates", []))}')
|
||
print(f'{"=" * 52}\n')
|
||
|
||
|
||
# ── 主函数 ────────────────────────────────────────────────────────────────────
|
||
def run(args):
|
||
# 0. 搜索资产获取正确的 ticker(与前端一致)
|
||
search_result = search_asset(args.ticker)
|
||
if search_result:
|
||
kline_ticker = search_result.get('ticker') # 用于 K 线 API
|
||
asset_name = search_result.get('name') # 资产名称
|
||
print(f'[validate] 资产: {asset_name}, Ticker: {kline_ticker}')
|
||
else:
|
||
# 搜索失败,使用原始输入
|
||
kline_ticker = args.ticker
|
||
print(f'[WARN] 搜索失败,使用原始输入: {kline_ticker}')
|
||
|
||
# 1. 执行公式
|
||
try:
|
||
index_id = run_formula(
|
||
args.ticker, args.freq, args.mode, args.date,
|
||
pivot_k=args.pivot_k,
|
||
shrink_ratio=args.shrink_ratio,
|
||
min_convergence=args.min_convergence,
|
||
)
|
||
except Exception as e:
|
||
print(f'[ERROR] 公式执行失败: {e}')
|
||
sys.exit(1)
|
||
|
||
# 2. 读取结果
|
||
try:
|
||
detail = fetch_detail(index_id)
|
||
except Exception as e:
|
||
print(f'[ERROR] 读取详情失败: {e}')
|
||
sys.exit(1)
|
||
|
||
# 3. 摘要
|
||
print_summary(detail, args.ticker, args.freq)
|
||
|
||
cd = detail.get('chart_data', {})
|
||
|
||
# 4. 获取 K 线数据(使用搜索到的 ticker)
|
||
# 根据 window 参数计算开始日期(与前端一致)
|
||
begin_date = get_window_start_date(args.window)
|
||
print(f'[validate] 时间窗口: {args.window}, 开始日期: {begin_date or "(全部)"}')
|
||
|
||
# 获取 K 线数据
|
||
klines = None
|
||
try:
|
||
kline_data = fetch_kline_data(kline_ticker, begin_date=begin_date)
|
||
|
||
# 🔧 过滤掉 None 值(非交易日),与前端 filterAndProcessKLineData 一致
|
||
raw_labels = kline_data.get('labels', [])
|
||
raw_open = kline_data.get('open', [])
|
||
raw_high = kline_data.get('high', [])
|
||
raw_low = kline_data.get('low', [])
|
||
raw_close = kline_data.get('close', [])
|
||
|
||
valid_indices = [
|
||
i for i in range(len(raw_labels))
|
||
if raw_open[i] is not None
|
||
and raw_high[i] is not None
|
||
and raw_low[i] is not None
|
||
and raw_close[i] is not None
|
||
]
|
||
|
||
if valid_indices:
|
||
# 转换为过滤后的 klines 格式
|
||
klines = {
|
||
'dates': [raw_labels[i] for i in valid_indices],
|
||
'open': [raw_open[i] for i in valid_indices],
|
||
'high': [raw_high[i] for i in valid_indices],
|
||
'low': [raw_low[i] for i in valid_indices],
|
||
'close': [raw_close[i] for i in valid_indices],
|
||
'volume': [0] * len(valid_indices), # 暂时用0填充
|
||
}
|
||
print(f'[INFO] K 线数据已获取,原始 {len(raw_labels)} 条,有效 {len(klines["dates"])} 条')
|
||
else:
|
||
print(f'[WARN] K 线 API 返回空数据')
|
||
except Exception as e:
|
||
print(f'[WARN] 获取 K 线数据失败: {e}')
|
||
|
||
# 如果 K 线 API 失败或返回空数据,使用公式返回的 klines
|
||
if not klines or not klines.get('dates'):
|
||
print('[INFO] 使用公式返回的 K 线数据')
|
||
klines = cd.get('klines')
|
||
if not klines or not klines.get('dates'):
|
||
print('[ERROR] 没有 K 线数据,跳过绘图。')
|
||
return
|
||
|
||
# 根据频率聚合 K 线数据(与前端 filterAndProcessKLineData 一致)
|
||
klines = aggregate_klines(klines, args.freq)
|
||
|
||
cd['klines'] = klines
|
||
|
||
# 5. 绘图
|
||
from chart import draw_triangle_chart
|
||
|
||
safe_ticker = args.ticker.replace('/', '_').replace('\\', '_')
|
||
output_path = None
|
||
if args.save:
|
||
output_dir = _THIS_DIR / 'output'
|
||
fname = f'{safe_ticker}_{args.freq}K_{cd.get("target_date", "")}.png'
|
||
output_path = str(output_dir / fname)
|
||
|
||
try:
|
||
draw_triangle_chart(
|
||
detail=detail,
|
||
output_path=output_path,
|
||
show=not args.no_show,
|
||
)
|
||
except Exception as e:
|
||
print(f'[ERROR] 绘图失败: {e}')
|
||
import traceback
|
||
traceback.print_exc()
|
||
sys.exit(1)
|
||
|
||
|
||
# ── 入口 ──────────────────────────────────────────────────────────────────────
|
||
if __name__ == '__main__':
|
||
run(parse_args())
|