technical-patterns-lab/scripts/report_converging_triangles.py
褚宏光 543572667b Add initial implementation of converging triangle detection algorithm and related documentation
- Created README.md and USAGE.md for project overview and usage instructions.
- Added core algorithm in src/converging_triangle.py for batch processing of stock data.
- Introduced data files (open.pkl, high.pkl, low.pkl, close.pkl, volume.pkl) for OHLCV data.
- Developed output documentation for results and breakout strength calculations.
- Implemented scripts for running the detection and generating reports.
- Added SVG visualizations and markdown documentation for algorithm details and usage examples.
2026-01-21 18:02:58 +08:00

274 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
生成收敛三角形突破强度选股简报Markdown
默认读取 outputs/converging_triangles/all_results.csv
"""
from __future__ import annotations
import argparse
import csv
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
def parse_bool(value: str) -> Optional[bool]:
if value is None or value == "":
return None
if value.lower() in ("true", "1", "yes"):
return True
if value.lower() in ("false", "0", "no"):
return False
return None
def to_int(value: str, default: int = 0) -> int:
try:
return int(float(value))
except (TypeError, ValueError):
return default
def to_float(value: str, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def load_rows(csv_path: str) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
with open(csv_path, newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append({
"stock_idx": to_int(row.get("stock_idx", "")),
"date_idx": to_int(row.get("date_idx", "")),
"is_valid": parse_bool(row.get("is_valid", "")),
"breakout_strength_up": to_float(row.get("breakout_strength_up", "")),
"breakout_strength_down": to_float(row.get("breakout_strength_down", "")),
"upper_slope": to_float(row.get("upper_slope", "")),
"lower_slope": to_float(row.get("lower_slope", "")),
"width_ratio": to_float(row.get("width_ratio", "")),
"touches_upper": to_int(row.get("touches_upper", "")),
"touches_lower": to_int(row.get("touches_lower", "")),
"apex_x": to_float(row.get("apex_x", "")),
"breakout_dir": (row.get("breakout_dir") or "").strip(),
"volume_confirmed": parse_bool(row.get("volume_confirmed", "")),
"false_breakout": parse_bool(row.get("false_breakout", "")),
"window_start": to_int(row.get("window_start", "")),
"window_end": to_int(row.get("window_end", "")),
"stock_code": (row.get("stock_code") or "").strip(),
"stock_name": (row.get("stock_name") or "").strip(),
"date": to_int(row.get("date", "")),
})
return rows
def best_by_stock(
rows: List[Dict[str, Any]],
strength_key: str,
threshold: float,
breakout_dir: str,
) -> List[Dict[str, Any]]:
filtered = [
r for r in rows
if r.get(strength_key, 0.0) > threshold and r.get("breakout_dir") == breakout_dir
]
best_map: Dict[str, Dict[str, Any]] = {}
for r in filtered:
key = r.get("stock_code") or f"IDX{r.get('stock_idx', '')}"
prev = best_map.get(key)
if prev is None:
best_map[key] = r
continue
if r[strength_key] > prev[strength_key]:
best_map[key] = r
elif r[strength_key] == prev[strength_key] and r.get("date", 0) > prev.get("date", 0):
best_map[key] = r
return sorted(
best_map.values(),
key=lambda x: (x.get(strength_key, 0.0), x.get("date", 0)),
reverse=True,
)
def best_combined_by_stock(
rows: List[Dict[str, Any]],
threshold: float,
) -> List[Dict[str, Any]]:
filtered: List[Dict[str, Any]] = []
for r in rows:
if r.get("breakout_dir") not in ("up", "down"):
continue
strength_up = r.get("breakout_strength_up", 0.0)
strength_down = r.get("breakout_strength_down", 0.0)
combined = max(strength_up, strength_down)
if combined <= threshold:
continue
r = dict(r)
r["combined_strength"] = combined
r["combined_dir"] = "up" if strength_up >= strength_down else "down"
filtered.append(r)
best_map: Dict[str, Dict[str, Any]] = {}
for r in filtered:
key = r.get("stock_code") or f"IDX{r.get('stock_idx', '')}"
prev = best_map.get(key)
if prev is None:
best_map[key] = r
continue
if r["combined_strength"] > prev["combined_strength"]:
best_map[key] = r
elif r["combined_strength"] == prev["combined_strength"] and r.get("date", 0) > prev.get("date", 0):
best_map[key] = r
return sorted(
best_map.values(),
key=lambda x: (x.get("combined_strength", 0.0), x.get("date", 0)),
reverse=True,
)
def count_by_dir(rows: List[Dict[str, Any]]) -> Dict[str, int]:
counts = {"up": 0, "down": 0, "none": 0}
for r in rows:
dir_name = r.get("breakout_dir") or "none"
if dir_name not in counts:
counts[dir_name] = 0
counts[dir_name] += 1
return counts
def fmt_bool(value: Optional[bool]) -> str:
if value is True:
return ""
if value is False:
return ""
return "-"
def build_report(
rows: List[Dict[str, Any]],
threshold: float,
top_n: int,
output_path: str,
) -> None:
total = len(rows)
if total == 0:
content = "# 收敛三角形突破强度选股简报\n\n数据为空。\n"
with open(output_path, "w", encoding="utf-8") as f:
f.write(content)
return
dates = [r.get("date", 0) for r in rows if r.get("date", 0) > 0]
date_min = min(dates) if dates else 0
date_max = max(dates) if dates else 0
counts = count_by_dir(rows)
up_picks = best_by_stock(rows, "breakout_strength_up", threshold, "up")[:top_n]
down_picks = best_by_stock(rows, "breakout_strength_down", threshold, "down")[:top_n]
combined_picks = best_combined_by_stock(rows, threshold)[:top_n]
now_str = datetime.now().strftime("%Y-%m-%d %H:%M")
lines: List[str] = []
lines.append("# 收敛三角形突破强度选股简报")
lines.append("")
lines.append(f"- 生成时间:{now_str}")
if date_min and date_max:
lines.append(f"- 数据范围:{date_min} ~ {date_max}")
lines.append(f"- 记录数:{total}")
lines.append(f"- 突破方向统计:上破 {counts.get('up', 0)} / 下破 {counts.get('down', 0)} / 无突破 {counts.get('none', 0)}")
lines.append("")
lines.append("## 筛选条件")
lines.append(f"- 强度阈值:> {threshold}")
lines.append(f"- 每方向最多输出:{top_n} 只(按单只股票的最高强度去重)")
lines.append("")
lines.append("## 综合突破强度候选")
if combined_picks:
lines.append("| 排名 | 股票 | 日期 | 方向 | 综合强度 | 宽度比 | 放量确认 |")
lines.append("| --- | --- | --- | --- | --- | --- | --- |")
for i, r in enumerate(combined_picks, start=1):
stock = f"{r.get('stock_code', '')} {r.get('stock_name', '')}".strip()
lines.append(
f"| {i} | {stock} | {r.get('date', '')} | "
f"{r.get('combined_dir', '')} | {r.get('combined_strength', 0.0):.4f} | "
f"{r.get('width_ratio', 0.0):.4f} | {fmt_bool(r.get('volume_confirmed'))} |"
)
else:
lines.append("无满足条件的综合突破记录。")
lines.append("")
lines.append("## 向上突破候选")
if up_picks:
lines.append("| 排名 | 股票 | 日期 | 强度 | 宽度比 | 触碰(上/下) | 放量确认 |")
lines.append("| --- | --- | --- | --- | --- | --- | --- |")
for i, r in enumerate(up_picks, start=1):
stock = f"{r.get('stock_code', '')} {r.get('stock_name', '')}".strip()
lines.append(
f"| {i} | {stock} | {r.get('date', '')} | "
f"{r.get('breakout_strength_up', 0.0):.4f} | "
f"{r.get('width_ratio', 0.0):.4f} | "
f"{r.get('touches_upper', 0)}/{r.get('touches_lower', 0)} | "
f"{fmt_bool(r.get('volume_confirmed'))} |"
)
else:
lines.append("无满足条件的向上突破记录。")
lines.append("")
lines.append("## 向下突破候选")
if down_picks:
lines.append("| 排名 | 股票 | 日期 | 强度 | 宽度比 | 触碰(上/下) | 放量确认 |")
lines.append("| --- | --- | --- | --- | --- | --- | --- |")
for i, r in enumerate(down_picks, start=1):
stock = f"{r.get('stock_code', '')} {r.get('stock_name', '')}".strip()
lines.append(
f"| {i} | {stock} | {r.get('date', '')} | "
f"{r.get('breakout_strength_down', 0.0):.4f} | "
f"{r.get('width_ratio', 0.0):.4f} | "
f"{r.get('touches_upper', 0)}/{r.get('touches_lower', 0)} | "
f"{fmt_bool(r.get('volume_confirmed'))} |"
)
else:
lines.append("无满足条件的向下突破记录。")
lines.append("")
lines.append("## 说明")
lines.append("- 强度由价格突破幅度、收敛程度与成交量放大综合计算。")
lines.append("- 仅统计 `breakout_dir` 与方向一致的记录。")
lines.append("- 每只股票仅保留强度最高的一条记录(同强度取最新日期)。")
lines.append("- 综合强度 = max(向上强度, 向下强度)。")
lines.append("")
with open(output_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
def main() -> None:
parser = argparse.ArgumentParser(description="生成收敛三角形突破强度选股简报")
parser.add_argument(
"--input",
default=os.path.join("outputs", "converging_triangles", "all_results.csv"),
help="输入 CSV 路径",
)
parser.add_argument(
"--output",
default=os.path.join("outputs", "converging_triangles", "report.md"),
help="输出 Markdown 路径",
)
parser.add_argument("--threshold", type=float, default=0.3, help="强度阈值")
parser.add_argument("--top-n", type=int, default=20, help="每方向输出数量")
args = parser.parse_args()
if not os.path.exists(args.input):
raise FileNotFoundError(f"输入文件不存在: {args.input}")
rows = load_rows(args.input)
output_dir = os.path.dirname(args.output)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
build_report(rows, args.threshold, args.top_n, args.output)
print(f"Report saved to: {args.output}")
if __name__ == "__main__":
main()