""" 生成收敛三角形突破强度选股简报(Markdown) 默认读取 outputs/converging_triangles/all_results.csv 用法示例: 1) 使用默认参数生成最新交易日排名 python scripts/report_converging_triangles.py 2) 指定强度阈值与输出数量 python scripts/report_converging_triangles.py --threshold 0.4 --top-n 30 3) 指定报告日期(YYYYMMDD) python scripts/report_converging_triangles.py --report-date 20260120 4) 指定输入与输出路径 python scripts/report_converging_triangles.py \ --input outputs/converging_triangles/all_results.csv \ --output outputs/converging_triangles/report.md """ from __future__ import annotations import argparse import csv import os import sys from datetime import datetime from typing import Any, Dict, List, Optional # 让脚本能找到 src/ 下的模块 sys.path.append(os.path.join(os.path.dirname(__file__), "..", "src")) # 导入统一的参数配置 from triangle_config import DETECTION_PARAMS def parse_bool(value: str) -> Optional[bool]: if value is None or value == "": return None if value.lower() in ("true", "1", "yes"): return True if value.lower() in ("false", "0", "no"): return False return None def to_int(value: str, default: int = 0) -> int: try: return int(float(value)) except (TypeError, ValueError): return default def to_float(value: str, default: float = 0.0) -> float: try: return float(value) except (TypeError, ValueError): return default def load_rows(csv_path: str) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] with open(csv_path, newline="", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for row in reader: rows.append({ "stock_idx": to_int(row.get("stock_idx", "")), "date_idx": to_int(row.get("date_idx", "")), "is_valid": parse_bool(row.get("is_valid", "")), "breakout_strength_up": to_float(row.get("breakout_strength_up", "")), "breakout_strength_down": to_float(row.get("breakout_strength_down", "")), "upper_slope": to_float(row.get("upper_slope", "")), "lower_slope": to_float(row.get("lower_slope", "")), "width_ratio": to_float(row.get("width_ratio", "")), "touches_upper": to_int(row.get("touches_upper", "")), "touches_lower": to_int(row.get("touches_lower", "")), "apex_x": to_float(row.get("apex_x", "")), "breakout_dir": (row.get("breakout_dir") or "").strip(), "volume_confirmed": parse_bool(row.get("volume_confirmed", "")), "false_breakout": parse_bool(row.get("false_breakout", "")), "window_start": to_int(row.get("window_start", "")), "window_end": to_int(row.get("window_end", "")), "stock_code": (row.get("stock_code") or "").strip(), "stock_name": (row.get("stock_name") or "").strip(), "date": to_int(row.get("date", "")), }) return rows def best_by_stock( rows: List[Dict[str, Any]], strength_key: str, threshold: float, breakout_dir: str, ) -> List[Dict[str, Any]]: filtered = [ r for r in rows if r.get(strength_key, 0.0) > threshold and r.get("breakout_dir") == breakout_dir ] best_map: Dict[str, Dict[str, Any]] = {} for r in filtered: key = r.get("stock_code") or f"IDX{r.get('stock_idx', '')}" prev = best_map.get(key) if prev is None: best_map[key] = r continue if r[strength_key] > prev[strength_key]: best_map[key] = r elif r[strength_key] == prev[strength_key] and r.get("date", 0) > prev.get("date", 0): best_map[key] = r return sorted( best_map.values(), key=lambda x: (x.get(strength_key, 0.0), x.get("date", 0)), reverse=True, ) def daily_rank_by_strength( rows: List[Dict[str, Any]], target_date: int, top_n: int, ) -> List[Dict[str, Any]]: candidates: List[Dict[str, Any]] = [] for r in rows: if r.get("date") != target_date: continue if r.get("breakout_dir") not in ("up", "down"): continue strength_up = r.get("breakout_strength_up", 0.0) strength_down = r.get("breakout_strength_down", 0.0) combined = max(strength_up, strength_down) r = dict(r) r["combined_strength"] = combined r["combined_dir"] = "up" if strength_up >= strength_down else "down" candidates.append(r) return sorted( candidates, key=lambda x: (x.get("combined_strength", 0.0), x.get("date", 0)), reverse=True, )[:top_n] def count_by_dir(rows: List[Dict[str, Any]]) -> Dict[str, int]: counts = {"up": 0, "down": 0, "none": 0} for r in rows: dir_name = r.get("breakout_dir") or "none" if dir_name not in counts: counts[dir_name] = 0 counts[dir_name] += 1 return counts def fmt_bool(value: Optional[bool]) -> str: if value is True: return "是" if value is False: return "否" return "-" def build_report( rows: List[Dict[str, Any]], threshold: float, top_n: int, report_date: Optional[int], output_path: str, ) -> None: total = len(rows) if total == 0: content = "# 收敛三角形当日选股简报\n\n数据为空。\n" with open(output_path, "w", encoding="utf-8") as f: f.write(content) return dates = [r.get("date", 0) for r in rows if r.get("date", 0) > 0] date_max = max(dates) if dates else 0 target_date = report_date or date_max daily_rows = [r for r in rows if r.get("date") == target_date] daily_counts = count_by_dir(daily_rows) # 按方向分类并排序 daily_up = sorted( [r for r in daily_rows if r.get("breakout_dir") == "up"], key=lambda x: x.get("breakout_strength_up", 0.0), reverse=True ) daily_down = sorted( [r for r in daily_rows if r.get("breakout_dir") == "down"], key=lambda x: x.get("breakout_strength_down", 0.0), reverse=True ) daily_none = sorted( [r for r in daily_rows if r.get("breakout_dir") == "none"], key=lambda x: max(x.get("breakout_strength_up", 0.0), x.get("breakout_strength_down", 0.0)), reverse=True ) now_str = datetime.now().strftime("%Y-%m-%d %H:%M") lines: List[str] = [] lines.append("# 收敛三角形当日选股简报") lines.append("") lines.append("## 数据说明") lines.append("") lines.append("- **股票池**:108 只个股(从万得全A按顺序索引,等距50取样)") lines.append(f"- **检测窗口**:{DETECTION_PARAMS.window} 个交易日") lines.append("- **检测算法**:收敛三角形形态识别(基于枢轴点、趋势线拟合与收敛度判定)") lines.append("") lines.append(f"## {target_date} 当日统计") lines.append("") lines.append(f"- 生成时间:{now_str}") lines.append(f"- 当日满足收敛三角形的个股:{len(daily_rows)} 只") lines.append(f" - 向上突破:{daily_counts.get('up', 0)} 只") lines.append(f" - 向下突破:{daily_counts.get('down', 0)} 只") lines.append(f" - 无突破(形态成立但未突破):{daily_counts.get('none', 0)} 只") lines.append("") # 向上突破 lines.append("## 向上突破") if daily_up: lines.append("| 排名 | 股票 | 突破强度 | 宽度比 | 触碰(上/下) | 放量确认 |") lines.append("| --- | --- | --- | --- | --- | --- |") for i, r in enumerate(daily_up, start=1): stock = f"{r.get('stock_code', '')} {r.get('stock_name', '')}".strip() lines.append( f"| {i} | {stock} | " f"{r.get('breakout_strength_up', 0.0):.4f} | " f"{r.get('width_ratio', 0.0):.4f} | " f"{r.get('touches_upper', 0)}/{r.get('touches_lower', 0)} | " f"{fmt_bool(r.get('volume_confirmed'))} |" ) else: lines.append("当日无向上突破记录。") lines.append("") # 向下突破 lines.append("## 向下突破") if daily_down: lines.append("| 排名 | 股票 | 突破强度 | 宽度比 | 触碰(上/下) | 放量确认 |") lines.append("| --- | --- | --- | --- | --- | --- |") for i, r in enumerate(daily_down, start=1): stock = f"{r.get('stock_code', '')} {r.get('stock_name', '')}".strip() lines.append( f"| {i} | {stock} | " f"{r.get('breakout_strength_down', 0.0):.4f} | " f"{r.get('width_ratio', 0.0):.4f} | " f"{r.get('touches_upper', 0)}/{r.get('touches_lower', 0)} | " f"{fmt_bool(r.get('volume_confirmed'))} |" ) else: lines.append("当日无向下突破记录。") lines.append("") # 无突破(形态成立) lines.append("## 无突破(形态成立)") if daily_none: lines.append("| 排名 | 股票 | 宽度比 | 触碰(上/下) |") lines.append("| --- | --- | --- | --- |") for i, r in enumerate(daily_none, start=1): stock = f"{r.get('stock_code', '')} {r.get('stock_name', '')}".strip() lines.append( f"| {i} | {stock} | " f"{r.get('width_ratio', 0.0):.4f} | " f"{r.get('touches_upper', 0)}/{r.get('touches_lower', 0)} |" ) else: lines.append("当日无未突破记录。") lines.append("") lines.append("## 说明") lines.append("") lines.append("- **突破强度**:价格突破幅度、收敛程度与成交量放大综合计算(0~1)") lines.append("- **宽度比**:三角形末端宽度 / 起始宽度(越小越收敛)") lines.append("- **触碰(上/下)**:价格触碰上沿和下沿的次数") lines.append("- **放量确认**:突破时成交量是否显著放大") lines.append("") with open(output_path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) def main() -> None: parser = argparse.ArgumentParser(description="生成收敛三角形突破强度选股简报") parser.add_argument( "--input", default=os.path.join("outputs", "converging_triangles", "all_results.csv"), help="输入 CSV 路径", ) parser.add_argument( "--output", default=os.path.join("outputs", "converging_triangles", "report.md"), help="输出 Markdown 路径", ) parser.add_argument("--threshold", type=float, default=0.3, help="强度阈值") parser.add_argument("--top-n", type=int, default=20, help="每方向输出数量") parser.add_argument("--report-date", type=int, default=None, help="指定报告日期(YYYYMMDD)") args = parser.parse_args() if not os.path.exists(args.input): raise FileNotFoundError(f"输入文件不存在: {args.input}") rows = load_rows(args.input) output_dir = os.path.dirname(args.output) if output_dir: os.makedirs(output_dir, exist_ok=True) build_report(rows, args.threshold, args.top_n, args.report_date, args.output) print(f"Report saved to: {args.output}") if __name__ == "__main__": main()