technical-patterns-lab/scripts/report_converging_triangles.py
褚宏光 8dea3fbccb Enhance converging triangle analysis with new scripts and data outputs
- Updated all_results.csv with additional stock data and breakout strength metrics.
- Revised report.md to improve clarity and detail on stock selection criteria and results.
- Expanded strong_breakout_down.csv and strong_breakout_up.csv with new entries reflecting recent analysis.
- Introduced new chart images for selected stocks to visualize breakout patterns.
- Added plot_converging_triangles.py script for generating visualizations of stocks meeting convergence criteria.
- Enhanced report_converging_triangles.py to allow for date-specific reporting and improved output formatting.
- Optimized run_converging_triangle.py for performance and added execution time logging.
2026-01-22 10:00:47 +08:00

307 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
生成收敛三角形突破强度选股简报Markdown
默认读取 outputs/converging_triangles/all_results.csv
用法示例:
1) 使用默认参数生成最新交易日排名
python scripts/report_converging_triangles.py
2) 指定强度阈值与输出数量
python scripts/report_converging_triangles.py --threshold 0.4 --top-n 30
3) 指定报告日期YYYYMMDD
python scripts/report_converging_triangles.py --report-date 20260120
4) 指定输入与输出路径
python scripts/report_converging_triangles.py \
--input outputs/converging_triangles/all_results.csv \
--output outputs/converging_triangles/report.md
"""
from __future__ import annotations
import argparse
import csv
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
def parse_bool(value: str) -> Optional[bool]:
if value is None or value == "":
return None
if value.lower() in ("true", "1", "yes"):
return True
if value.lower() in ("false", "0", "no"):
return False
return None
def to_int(value: str, default: int = 0) -> int:
try:
return int(float(value))
except (TypeError, ValueError):
return default
def to_float(value: str, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def load_rows(csv_path: str) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
with open(csv_path, newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append({
"stock_idx": to_int(row.get("stock_idx", "")),
"date_idx": to_int(row.get("date_idx", "")),
"is_valid": parse_bool(row.get("is_valid", "")),
"breakout_strength_up": to_float(row.get("breakout_strength_up", "")),
"breakout_strength_down": to_float(row.get("breakout_strength_down", "")),
"upper_slope": to_float(row.get("upper_slope", "")),
"lower_slope": to_float(row.get("lower_slope", "")),
"width_ratio": to_float(row.get("width_ratio", "")),
"touches_upper": to_int(row.get("touches_upper", "")),
"touches_lower": to_int(row.get("touches_lower", "")),
"apex_x": to_float(row.get("apex_x", "")),
"breakout_dir": (row.get("breakout_dir") or "").strip(),
"volume_confirmed": parse_bool(row.get("volume_confirmed", "")),
"false_breakout": parse_bool(row.get("false_breakout", "")),
"window_start": to_int(row.get("window_start", "")),
"window_end": to_int(row.get("window_end", "")),
"stock_code": (row.get("stock_code") or "").strip(),
"stock_name": (row.get("stock_name") or "").strip(),
"date": to_int(row.get("date", "")),
})
return rows
def best_by_stock(
rows: List[Dict[str, Any]],
strength_key: str,
threshold: float,
breakout_dir: str,
) -> List[Dict[str, Any]]:
filtered = [
r for r in rows
if r.get(strength_key, 0.0) > threshold and r.get("breakout_dir") == breakout_dir
]
best_map: Dict[str, Dict[str, Any]] = {}
for r in filtered:
key = r.get("stock_code") or f"IDX{r.get('stock_idx', '')}"
prev = best_map.get(key)
if prev is None:
best_map[key] = r
continue
if r[strength_key] > prev[strength_key]:
best_map[key] = r
elif r[strength_key] == prev[strength_key] and r.get("date", 0) > prev.get("date", 0):
best_map[key] = r
return sorted(
best_map.values(),
key=lambda x: (x.get(strength_key, 0.0), x.get("date", 0)),
reverse=True,
)
def daily_rank_by_strength(
rows: List[Dict[str, Any]],
target_date: int,
top_n: int,
) -> List[Dict[str, Any]]:
candidates: List[Dict[str, Any]] = []
for r in rows:
if r.get("date") != target_date:
continue
if r.get("breakout_dir") not in ("up", "down"):
continue
strength_up = r.get("breakout_strength_up", 0.0)
strength_down = r.get("breakout_strength_down", 0.0)
combined = max(strength_up, strength_down)
r = dict(r)
r["combined_strength"] = combined
r["combined_dir"] = "up" if strength_up >= strength_down else "down"
candidates.append(r)
return sorted(
candidates,
key=lambda x: (x.get("combined_strength", 0.0), x.get("date", 0)),
reverse=True,
)[:top_n]
def count_by_dir(rows: List[Dict[str, Any]]) -> Dict[str, int]:
counts = {"up": 0, "down": 0, "none": 0}
for r in rows:
dir_name = r.get("breakout_dir") or "none"
if dir_name not in counts:
counts[dir_name] = 0
counts[dir_name] += 1
return counts
def fmt_bool(value: Optional[bool]) -> str:
if value is True:
return ""
if value is False:
return ""
return "-"
def build_report(
rows: List[Dict[str, Any]],
threshold: float,
top_n: int,
report_date: Optional[int],
output_path: str,
) -> None:
total = len(rows)
if total == 0:
content = "# 收敛三角形当日选股简报\n\n数据为空。\n"
with open(output_path, "w", encoding="utf-8") as f:
f.write(content)
return
dates = [r.get("date", 0) for r in rows if r.get("date", 0) > 0]
date_max = max(dates) if dates else 0
target_date = report_date or date_max
daily_rows = [r for r in rows if r.get("date") == target_date]
daily_counts = count_by_dir(daily_rows)
# 按方向分类并排序
daily_up = sorted(
[r for r in daily_rows if r.get("breakout_dir") == "up"],
key=lambda x: x.get("breakout_strength_up", 0.0),
reverse=True
)
daily_down = sorted(
[r for r in daily_rows if r.get("breakout_dir") == "down"],
key=lambda x: x.get("breakout_strength_down", 0.0),
reverse=True
)
daily_none = sorted(
[r for r in daily_rows if r.get("breakout_dir") == "none"],
key=lambda x: max(x.get("breakout_strength_up", 0.0), x.get("breakout_strength_down", 0.0)),
reverse=True
)
now_str = datetime.now().strftime("%Y-%m-%d %H:%M")
lines: List[str] = []
lines.append("# 收敛三角形当日选股简报")
lines.append("")
lines.append("## 数据说明")
lines.append("")
lines.append("- **股票池**108 只个股从万得全A按顺序索引等距50取样")
lines.append("- **检测窗口**400 个交易日")
lines.append("- **检测算法**:收敛三角形形态识别(基于枢轴点、趋势线拟合与收敛度判定)")
lines.append("")
lines.append(f"## {target_date} 当日统计")
lines.append("")
lines.append(f"- 生成时间:{now_str}")
lines.append(f"- 当日满足收敛三角形的个股:{len(daily_rows)}")
lines.append(f" - 向上突破:{daily_counts.get('up', 0)}")
lines.append(f" - 向下突破:{daily_counts.get('down', 0)}")
lines.append(f" - 无突破(形态成立但未突破):{daily_counts.get('none', 0)}")
lines.append("")
# 向上突破
lines.append("## 向上突破")
if daily_up:
lines.append("| 排名 | 股票 | 突破强度 | 宽度比 | 触碰(上/下) | 放量确认 |")
lines.append("| --- | --- | --- | --- | --- | --- |")
for i, r in enumerate(daily_up, start=1):
stock = f"{r.get('stock_code', '')} {r.get('stock_name', '')}".strip()
lines.append(
f"| {i} | {stock} | "
f"{r.get('breakout_strength_up', 0.0):.4f} | "
f"{r.get('width_ratio', 0.0):.4f} | "
f"{r.get('touches_upper', 0)}/{r.get('touches_lower', 0)} | "
f"{fmt_bool(r.get('volume_confirmed'))} |"
)
else:
lines.append("当日无向上突破记录。")
lines.append("")
# 向下突破
lines.append("## 向下突破")
if daily_down:
lines.append("| 排名 | 股票 | 突破强度 | 宽度比 | 触碰(上/下) | 放量确认 |")
lines.append("| --- | --- | --- | --- | --- | --- |")
for i, r in enumerate(daily_down, start=1):
stock = f"{r.get('stock_code', '')} {r.get('stock_name', '')}".strip()
lines.append(
f"| {i} | {stock} | "
f"{r.get('breakout_strength_down', 0.0):.4f} | "
f"{r.get('width_ratio', 0.0):.4f} | "
f"{r.get('touches_upper', 0)}/{r.get('touches_lower', 0)} | "
f"{fmt_bool(r.get('volume_confirmed'))} |"
)
else:
lines.append("当日无向下突破记录。")
lines.append("")
# 无突破(形态成立)
lines.append("## 无突破(形态成立)")
if daily_none:
lines.append("| 排名 | 股票 | 宽度比 | 触碰(上/下) |")
lines.append("| --- | --- | --- | --- |")
for i, r in enumerate(daily_none, start=1):
stock = f"{r.get('stock_code', '')} {r.get('stock_name', '')}".strip()
lines.append(
f"| {i} | {stock} | "
f"{r.get('width_ratio', 0.0):.4f} | "
f"{r.get('touches_upper', 0)}/{r.get('touches_lower', 0)} |"
)
else:
lines.append("当日无未突破记录。")
lines.append("")
lines.append("## 说明")
lines.append("")
lines.append("- **突破强度**价格突破幅度、收敛程度与成交量放大综合计算0~1")
lines.append("- **宽度比**:三角形末端宽度 / 起始宽度(越小越收敛)")
lines.append("- **触碰(上/下)**:价格触碰上沿和下沿的次数")
lines.append("- **放量确认**:突破时成交量是否显著放大")
lines.append("")
with open(output_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
def main() -> None:
parser = argparse.ArgumentParser(description="生成收敛三角形突破强度选股简报")
parser.add_argument(
"--input",
default=os.path.join("outputs", "converging_triangles", "all_results.csv"),
help="输入 CSV 路径",
)
parser.add_argument(
"--output",
default=os.path.join("outputs", "converging_triangles", "report.md"),
help="输出 Markdown 路径",
)
parser.add_argument("--threshold", type=float, default=0.3, help="强度阈值")
parser.add_argument("--top-n", type=int, default=20, help="每方向输出数量")
parser.add_argument("--report-date", type=int, default=None, help="指定报告日期YYYYMMDD")
args = parser.parse_args()
if not os.path.exists(args.input):
raise FileNotFoundError(f"输入文件不存在: {args.input}")
rows = load_rows(args.input)
output_dir = os.path.dirname(args.output)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
build_report(rows, args.threshold, args.top_n, args.report_date, args.output)
print(f"Report saved to: {args.output}")
if __name__ == "__main__":
main()