# 生产环境性能评估:每日5000股检测场景 ## 使用场景 **真实需求**: - 股票数量:5000+只 - 运行频率:每天执行一次 - 检测参数:可自定义(窗口、收敛度、突破阈值等) - 输出要求:标准化强度分 [0,1] --- ## 性能瓶颈分析 ### 完整流程耗时分解 ``` ┌─────────────────────────────────────────────────────────────┐ │ 每日检测流程 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 数据加载 (load_pkl) ~0.5-2秒 (I/O瓶颈) │ │ 2. 三角形批量检测 ~10-60秒 (计算密集) │ │ 3. 标准化处理 (7个维度) ~0.05-0.2秒 (CPU) │ │ 4. 强度分计算 (加权求和) ~0.01秒 (向量运算) │ │ 5. 结果输出/存储 ~0.1-0.5秒 (I/O) │ ├─────────────────────────────────────────────────────────────┤ │ 总耗时:~11-63秒/天 │ └─────────────────────────────────────────────────────────────┘ ``` **结论:标准化不是瓶颈!** - 标准化耗时 < 0.5% 总时间 - 真正瓶颈:三角形检测算法 (占90%+) --- ## 关键问题:标准化基准如何选择? ### 场景A:基于历史基准(18,004样本) ```python # 预先计算并缓存历史基准 HISTORICAL_QUANTILES = { 'price_score_up': {'p5': 0.0, 'p50': 0.0, 'p95': 0.15}, 'convergence_score': {'p5': 0.45, 'p50': 0.80, 'p95': 0.92}, # ... 其他维度 } def normalize_with_historical_baseline(today_data): """每天用历史基准标准化今天的5000只股票""" for col in columns: p5, p50, p95 = HISTORICAL_QUANTILES[col].values() normalized = apply_normalization(today_data[col], p5, p50, p95) return normalized ``` **优点**: - ✅ 标准化基准稳定,不随每日数据波动 - ✅ 预设模式阈值(0.7/0.75)仍然有效 - ✅ 不同日期的强度分可直接对比 - ✅ 每天只需O(n)线性映射,极快 **缺点**: - ❌ 需要定期更新历史基准(如每季度) - ❌ 极端市场环境下可能出现大量值超出[0,1]范围(需clip) --- ### 场景B:基于当天样本(5000只) ```python def normalize_with_daily_baseline(today_data): """每天用当天5000只股票重新计算分位数""" for col in columns: p5 = today_data[col].quantile(0.05) p50 = today_data[col].median() p95 = today_data[col].quantile(0.95) normalized = apply_normalization(today_data[col], p5, p50, p95) return normalized ``` **优点**: - ✅ 自适应市场环境,无需维护历史基准 - ✅ 保证当天数据完全在[0,1]范围 **缺点**: - ❌ 标准化基准每天变化,不同日期强度分不可比 - ❌ 预设模式阈值失效(今天0.7 ≠ 明天0.7) - ❌ 牛市/熊市时相对排名被压缩 --- ## 推荐方案:混合策略 ### 方案:历史基准 + 快速线性映射 ```python # ========== 离线预计算(每季度更新一次) ========== def compute_historical_baseline(historical_data_18004_samples): """ 基于历史数据计算标准化基准 输入:18,004个历史样本 输出:每个维度的P5/P50/P95 """ baseline = {} for col in ['price_score_up', 'price_score_down', ...]: baseline[col] = { 'p5': historical_data[col].quantile(0.05), 'p50': historical_data[col].median(), 'p95': historical_data[col].quantile(0.95), 'method': 'zero_inflated' if col in [...] else 'standard' } # 保存为配置文件 save_json('baseline_config.json', baseline) return baseline # ========== 在线标准化(每天运行) ========== def normalize_daily_fast(today_5000_samples, baseline_config): """ 使用历史基准快速标准化今天的数据 时间复杂度:O(n) = O(5000) ≈ 10-20ms vs 百分位排名 O(n log n) ≈ 50-100ms """ result = {} for col, config in baseline_config.items(): p5 = config['p5'] p50 = config['p50'] p95 = config['p95'] method = config['method'] if method == 'zero_inflated': # 零膨胀:零值→0.5,非零值线性映射到[0.5, 1.0] is_zero = (today_5000_samples[col] < 1e-6) result[col] = np.where( is_zero, 0.5, 0.5 + 0.5 * ((today_5000_samples[col] - p5) / (p95 - p5)).clip(0, 1) ) elif method == 'median_aligned': # 中位数对齐:上半[p50,p95]→[0.5,1.0],下半[p5,p50]→[0.0,0.5] is_upper = (today_5000_samples[col] >= p50) upper_norm = 0.5 + 0.5 * ((today_5000_samples[col] - p50) / (p95 - p50)).clip(0, 1) lower_norm = 0.5 * ((today_5000_samples[col] - p5) / (p50 - p5)).clip(0, 1) result[col] = np.where(is_upper, upper_norm, lower_norm) else: # 标准线性映射 result[col] = ((today_5000_samples[col] - p5) / (p95 - p5)).clip(0, 1) return pd.DataFrame(result) ``` --- ## 性能对比(5000样本/天) | 方法 | 耗时 | 相比当前 | 质量 | 复杂度 | |------|------|---------|------|--------| | **当前:rank(pct=True)** | 50-100ms | 基准 | ⭐⭐⭐⭐⭐ | 简单 | | **历史基准+线性映射** | 10-20ms | **5倍提速** | ⭐⭐⭐⭐ | 中等 | | **每日重算baseline** | 15-30ms | 3倍提速 | ⭐⭐⭐ | 中等 | **结论**: - 对于5000样本,优化收益从 50ms → 20ms,**节省30ms** - 相对于检测算法的10-60秒,**收益微乎其微** (<0.1%) - **不建议优化标准化环节**,应优化检测算法 --- ## 实际生产建议 ### 短期(保持现状) ```python def 收敛三角形_生产环境(today_data_5000_stocks): """ 每天运行,使用历史基准标准化 """ # 1. 加载历史基准(只需加载一次,常驻内存) baseline = load_cached_baseline() # 2. 检测三角形(主要耗时:10-60秒) detection_result = detect_converging_triangle_batch(today_data_5000_stocks) # 3. 使用历史基准标准化(耗时:10-20ms,用线性映射) normalized = normalize_with_historical_baseline( detection_result, baseline ) # 4. 计算强度分(耗时:<10ms) strength = calculate_strength(normalized, CONFIG_EQUAL) return strength ``` **关键点**: 1. **历史基准常驻内存**,不用每天重算 2. **标准化用线性映射**(历史基准+快速计算) 3. **优化重点:检测算法**,而非标准化 --- ### 中期(如果检测算法已优化) **场景:检测算法优化到1-2秒后,标准化才可能成为瓶颈** 此时可考虑: 1. 预编译Numba/Cython加速标准化 2. 多进程并行化(5000股分成10组并行) 3. GPU加速(如果数据量进一步增大) --- ### 长期(大规模扩展) **场景:股票数量 > 1万只,或需要分钟级实时计算** ```python # 使用增量更新策略 class IncrementalNormalizer: """增量标准化器,维护滚动窗口基准""" def __init__(self, window_size=20_days): self.historical_buffer = deque(maxlen=window_size) self.baseline = None def update(self, today_data): """每天更新基准""" self.historical_buffer.append(today_data) # 重新计算最近20天的基准(采样策略) if len(self.historical_buffer) == self.window_size: self.baseline = compute_baseline_fast(self.historical_buffer) def normalize(self, today_data): """O(1)标准化""" return apply_linear_mapping(today_data, self.baseline) ``` --- ## 决策树 ``` 是否需要优化标准化? │ ├─ 检测算法耗时 > 10秒? │ ├─ 是 → 先优化检测算法(收益更大) │ └─ 否 → 继续往下 │ ├─ 标准化耗时 > 500ms? │ ├─ 是 → 考虑优化(使用历史基准+线性映射) │ └─ 否 → 不需要优化 │ └─ 总结:对于5000样本/天,标准化不是瓶颈 ``` --- ## 实施建议 ### 立即行动 1. **基准测试**:实测当前5000股的实际耗时 ```bash python scripts/benchmark_production.py --stocks 5000 ``` 2. **分析瓶颈**:确认检测算法 vs 标准化的耗时占比 3. **优先级排序**: - P0: 优化检测算法(如果>10秒) - P1: 优化数据加载I/O(如果>2秒) - P2: 优化标准化(如果>500ms) ### 如果确需优化标准化 **步骤:** 1. 离线计算历史基准(每季度更新) 2. 修改标准化函数使用线性映射 3. A/B测试质量影响 4. 监控每日强度分分布 **代码位置:** ``` dunhe_dataServer/src/library/expression/funcs/pattern.py - 修改 _compute_all_metrics() 中的标准化逻辑 - 增加 load_historical_baseline() 函数 - 增加 normalize_with_baseline_fast() 函数 ``` --- ## 结论 **对于每天5000只股票的场景:** 1. ✅ **保持当前方案**(百分位排名) - 标准化耗时 < 100ms,不是瓶颈 - 质量最优,稳定可靠 2. 📊 **如果未来需要优化**(检测算法已优化到秒级) - 使用**历史基准 + 线性映射** - 5倍提速(100ms → 20ms) - 质量略降(0.98 → 0.88) 3. 🎯 **真正应该优化的**: - 检测算法性能(占90%+时间) - 数据加载I/O - 结果缓存策略 **一句话总结**:标准化优化对每天5000股场景的收益 < 0.1%,不值得投入。应聚焦检测算法优化。