""" 生成包含内嵌数据的股票查看器HTML 用法: python scripts/generate_stock_viewer.py python scripts/generate_stock_viewer.py --date 20260120 python scripts/generate_stock_viewer.py --all-stocks # 显示所有108只股票 """ from __future__ import annotations import argparse import csv import os import json import sys import pickle import numpy as np def load_all_stocks_list(data_dir: str) -> tuple: """从close.pkl加载所有股票列表""" # 创建假模块以加载pickle sys.modules['model'] = type('FakeModule', (), {'ndarray': np.ndarray, '__path__': []})() sys.modules['model.index_info'] = sys.modules['model'] close_path = os.path.join(data_dir, 'close.pkl') with open(close_path, 'rb') as f: data = pickle.load(f) return data['tkrs'], data['tkrs_name'] def load_stock_data(csv_path: str, target_date: int = None, all_stocks_mode: bool = False, data_dir: str = 'data') -> tuple: """从CSV加载股票数据""" stocks_map = {} max_date = 0 with open(csv_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: try: date = int(row.get('date', '0')) if date > max_date: max_date = date except: continue # 使用指定日期或最新日期 use_date = target_date if target_date else max_date # 从CSV读取有强度分的股票 with open(csv_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: try: date = int(row.get('date', '0')) if date != use_date: continue stock_code = row.get('stock_code', '') stock = { 'idx': int(row.get('stock_idx', '0')), 'code': stock_code, 'name': row.get('stock_name', ''), 'strengthUp': float(row.get('breakout_strength_up', '0')), 'strengthDown': float(row.get('breakout_strength_down', '0')), 'direction': row.get('breakout_dir', ''), 'widthRatio': float(row.get('width_ratio', '0')), 'touchesUpper': int(row.get('touches_upper', '0')), 'touchesLower': int(row.get('touches_lower', '0')), 'volumeConfirmed': row.get('volume_confirmed', ''), 'boundaryUtilization': float(row.get('boundary_utilization', '0')), 'date': date, 'hasTriangle': True # 标记为有三角形形态 } stock['strength'] = max(stock['strengthUp'], stock['strengthDown']) # 清理文件名中的非法字符 clean_name = stock['name'].replace('*', '').replace('?', '').replace('"', '').replace('<', '').replace('>', '').replace('|', '').replace(':', '').replace('/', '').replace('\\', '') stock['chartPath'] = f"charts/{date}_{stock_code}_{clean_name}.png" stock['chartPathDetail'] = f"charts/{date}_{stock_code}_{clean_name}_detail.png" if stock_code not in stocks_map or stocks_map[stock_code]['strength'] < stock['strength']: stocks_map[stock_code] = stock except Exception as e: continue # 如果是all_stocks模式,添加所有股票 if all_stocks_mode: all_codes, all_names = load_all_stocks_list(data_dir) for idx, (code, name) in enumerate(zip(all_codes, all_names)): if code not in stocks_map: # 没有三角形形态的股票,强度分为0 clean_name = name.replace('*', '').replace('?', '').replace('"', '').replace('<', '').replace('>', '').replace('|', '').replace(':', '').replace('/', '').replace('\\', '') stocks_map[code] = { 'idx': idx, 'code': code, 'name': name, 'strengthUp': 0.0, 'strengthDown': 0.0, 'strength': 0.0, 'direction': 'none', 'widthRatio': 0.0, 'touchesUpper': 0, 'touchesLower': 0, 'volumeConfirmed': '', 'boundaryUtilization': 0.0, 'date': use_date, 'chartPath': f"charts/{use_date}_{code}_{clean_name}.png", 'chartPathDetail': f"charts/{use_date}_{code}_{clean_name}_detail.png", 'hasTriangle': False # 标记为无三角形形态 } stocks = list(stocks_map.values()) stocks.sort(key=lambda x: x['strength'], reverse=True) return stocks, use_date def generate_html(stocks: list, date: int, output_path: str): """生成包含数据的HTML""" html_template = ''' 收敛三角形强度分选股系统

收敛三角形选股系统

数据日期: DATA_DATE 监控股票: TOTAL_STOCKS
排序
全部
↑ 向上
↓ 向下
— 无
全部
✓ 已确认
✗ 未确认
0.00
📊
Total
0
Showing
0
Average
0.00
''' # 替换数据 stocks_json = json.dumps(stocks, ensure_ascii=False, indent=2) html = html_template.replace('STOCK_DATA', stocks_json) html = html.replace('DATA_DATE', str(date)) html = html.replace('TOTAL_STOCKS', str(len(stocks))) with open(output_path, 'w', encoding='utf-8') as f: f.write(html) def main(): parser = argparse.ArgumentParser(description="生成包含内嵌数据的股票查看器HTML") parser.add_argument( "--date", type=int, default=None, help="指定日期(YYYYMMDD),默认为最新日期" ) parser.add_argument( "--input", default=os.path.join("outputs", "converging_triangles", "all_results.csv"), help="输入CSV路径" ) parser.add_argument( "--output", default=os.path.join("outputs", "converging_triangles", "stock_viewer.html"), help="输出HTML路径" ) parser.add_argument( "--all-stocks", action="store_true", help="显示所有108只股票(包括不满足条件的)" ) args = parser.parse_args() print("=" * 70) print("生成股票查看器HTML") print("=" * 70) if not os.path.exists(args.input): print(f"错误: CSV文件不存在: {args.input}") print("请先运行: python scripts/pipeline_converging_triangle.py") return print(f"读取数据: {args.input}") print(f"模式: {'所有股票' if args.all_stocks else '仅满足条件的股票'}") data_dir = "data" stocks, date = load_stock_data(args.input, args.date, args.all_stocks, data_dir) print(f"数据日期: {date}") print(f"股票数量: {len(stocks)}") if args.all_stocks: has_triangle = sum(1 for s in stocks if s.get('hasTriangle', False)) print(f" - 有三角形形态: {has_triangle}") print(f" - 无三角形形态: {len(stocks) - has_triangle}") print(f"生成HTML: {args.output}") generate_html(stocks, date, args.output) print("\n完成!") print(f"\n用浏览器打开: {args.output}") print("=" * 70) if __name__ == "__main__": main()