# 枢轴点拟合算法详解 **日期**: 2026-01-26 (v2 更新) **文件**: `src/converging_triangle.py` - `fit_pivot_line()` 函数 --- ## 📋 目录 1. [算法概述](#算法概述) 2. [迭代离群点移除(当前算法)](#迭代离群点移除当前算法) 3. [历史版本:分段选择算法](#历史版本分段选择算法) 4. [独立处理机制](#独立处理机制) 5. [代码实现](#代码实现) 6. [实际案例分析](#实际案例分析) 7. [边界情况处理](#边界情况处理) 8. [可视化说明](#可视化说明) --- ## 算法概述 ### 核心思想 对于识别出的所有枢轴点,我们采用**迭代离群点移除**算法来拟合趋势线: 1. **初始拟合**:先用所有枢轴点做线性回归 2. **识别离群点**:计算残差,找出偏离拟合线的"弱"点 3. **移除离群点**:移除最差的离群点 4. **迭代优化**:重复步骤1-3直到收敛 ### 设计目标 ```python # 上沿线:移除价格明显低于拟合线的点(弱高点) # 下沿线:移除价格明显高于拟合线的点(弱低点) ``` --- ## 迭代离群点移除(当前算法) ### 为什么需要这个算法? **问题场景**: ``` 价格 ^ | * ← 强高点 (7.9元) | \ | * ← 弱高点 (5.8元) ← 问题! | \ | * ← 强高点 (6.8元) | \ | * ← 强高点 (6.0元) └──────────────> 时间 ``` 如果使用所有4个点拟合,5.8元的"弱高点"会把上沿线拉低,导致: - ❌ 拟合线不是真正的上边界 - ❌ 与主观判断不符 - ❌ 可能错过真实的突破信号 ### 解决方案:迭代移除 ``` 第1次迭代: 拟合线: y = ax + b (用全部4点) 残差: [0.1, 1.8, 0.2, 0.3] ← 5.8元点的残差最大(1.8) 判定: 1.8 > 1.5*std → 离群点 操作: 移除5.8元点 第2次迭代: 拟合线: y = a'x + b' (用剩余3点) 残差: [0.05, 0.08, 0.12] 判定: 无离群点 → 收敛 最终结果: 只用3个强高点拟合 ``` ### 算法流程图 ```mermaid flowchart TD A[输入全部枢轴点] --> B[线性回归拟合] B --> C[计算方向性残差] C --> D{残差 > 阈值?} D -->|是| E[移除最大残差点] E --> F{剩余点 >= 3?} F -->|是| G{迭代次数 < 3?} G -->|是| B G -->|否| H[返回当前拟合] F -->|否| H D -->|否| H ``` ### 关键参数 | 参数 | 默认值 | 说明 | |------|--------|------| | `outlier_threshold` | 1.5 | 残差超过 1.5 倍标准差视为离群点 | | `max_iterations` | 3 | 最多迭代 3 次,避免过度过滤 | | `min_keep` | 3 | 至少保留 3 个点用于拟合 | ### 残差计算逻辑 **上沿线(upper)**: ```python # 残差 = 拟合值 - 实际值 # 正残差 = 点在拟合线下方 = 弱高点(应移除) residuals = fitted_values - actual_values outliers = residuals > threshold ``` **下沿线(lower)**: ```python # 残差 = 实际值 - 拟合值 # 正残差 = 点在拟合线上方 = 弱低点(应移除) residuals = actual_values - fitted_values outliers = residuals > threshold ``` ### 算法优势 | 优势 | 说明 | |------|------| | **自适应** | 自动识别偏离点,无需人工干预 | | **稳健性** | 不会被单个异常点影响整条线 | | **可控性** | 通过阈值和迭代次数控制过滤强度 | | **保守性** | 至少保留3个点,避免过度过滤 | --- ## 历史版本:分段选择算法 > **注意**:以下是 v1 版本的分段算法,已被迭代离群点移除算法替代。保留此节供参考。 ### 原核心思想 对于识别出的所有枢轴点: 1. **分段策略**:将枢轴点按时间顺序分成 **3 个时间段** 2. **代表点选择**:从每段中选出 **最极端的点**(上沿选最高,下沿选最低) 3. **线性回归**:用这 3 个代表点进行线性回归,拟合趋势线 ### 原触发条件 ```python if 枢轴点数量 > 4: 使用分段策略(3段,每段1个代表点) else: 使用全部枢轴点 ``` ### 原算法的局限性 分段算法会选中一些"弱"枢轴点: - 某个高点虽然是时间段内最高,但可能明显低于其他时间段的高点 - 这些点会拉低/拉高拟合线,导致与主观判断不符 --- ## 为什么需要分段 ### 问题1: 仅用两点的缺陷 如果只用首尾两点画线: ``` 价格 ^ | * * ← 两个高点 | \ / | \ / | X ← 两点连线 | / \ | / \ | * * ← 被忽略的中间高点 └──────────────> 时间 ``` **问题**: - ❌ 中间的极值点被忽略 - ❌ 线可能不是真正的边界 - ❌ 容易被噪声影响(首尾点恰好是噪声) ### 问题2: 使用全部点的问题 如果用所有枢轴点进行回归: ``` 价格 ^ | * * * * * * ← 6个高点,但分布不均 | └─┬─┘ └──┬──┘ | 前期 后期 | 密集 稀疏 └──────────────> 时间 ``` **问题**: - ❌ 某些时间段的点过多,权重过大 - ❌ 回归结果偏向点密集的区域 - ❌ 不能均衡反映整个周期的趋势 ### 解决方案: 时间均衡分段 ``` 价格 ^ | * * * * * * ← 6个高点 | └─┬─┘ └┬┘ └┬┘ | 第1段 第2段 第3段 | ↓ ↓ ↓ | * * * ← 每段选1个最高点(3个拟合点) └──────────────> 时间 ``` **优点**: - ✅ 时间均衡(前、中、后都有代表点) - ✅ 代表性强(每段选最极端的点) - ✅ 稳健性好(不易被局部噪声影响) - ✅ 覆盖性好(确保边界线包络所有点) --- ## 分段算法详解 ### 第1步: 排序枢轴点 ```python # 按时间顺序排序(从早到晚) sort_idx = np.argsort(pivot_indices) x_sorted = pivot_indices[sort_idx] # 时间索引 y_sorted = pivot_values[sort_idx] # 价格值 ``` **示例**: ``` 原始枢轴点(未排序): 索引: [50, 10, 80, 30, 90, 60] 价格: [95, 100, 98, 96, 92, 94] 排序后: 索引: [10, 30, 50, 60, 80, 90] ← 时间从早到晚 价格: [100, 96, 95, 94, 98, 92] ``` ### 第2步: 计算分段大小 ```python n = len(x_sorted) # 总点数,例如 n = 6 segment_size = n // 3 # 整除,segment_size = 6 // 3 = 2 ``` **分段规则**: ``` 总数 n → segment_size → 分段方式 ───────────────────────────── n = 5 → 1 → [0:1], [1:2], [2:5] (1,1,3个点) n = 6 → 2 → [0:2], [2:4], [4:6] (2,2,2个点) n = 7 → 2 → [0:2], [2:4], [4:7] (2,2,3个点) n = 8 → 2 → [0:2], [2:4], [4:8] (2,2,4个点) n = 9 → 3 → [0:3], [3:6], [6:9] (3,3,3个点) n = 10 → 3 → [0:3], [3:6], [6:10] (3,3,4个点) ``` **重要特性**: - 第3段可能比前两段多点(余数分配给第3段) - 确保每段至少有1个点 - 第3段包含最新的数据点 ### 第3步: 定义三个时间段 ```python segments = [ range(0, segment_size), # 第1段: [0, segment_size) range(segment_size, 2 * segment_size), # 第2段: [segment_size, 2*segment_size) range(2 * segment_size, n), # 第3段: [2*segment_size, n) ] ``` **以 n=6, segment_size=2 为例**: ``` 索引位置: 0 1 | 2 3 | 4 5 [ 第1段 | 第2段 | 第3段 ] 时间: 早期 中期 晚期 范围: [0:2) [2:4) [4:6) 点数: 2个 2个 2个 ``` **以 n=7, segment_size=2 为例**: ``` 索引位置: 0 1 | 2 3 | 4 5 6 [ 第1段 | 第2段 | 第3段 ] 时间: 早期 中期 晚期 范围: [0:2) [2:4) [4:7) 点数: 2个 2个 3个 ← 第3段多1个 ``` ### 第4步: 从每段选择极值点 ```python for seg in segments: seg_list = list(seg) seg_y = y_sorted[seg_list] # 该段的所有价格 if mode == "upper": # 上沿:选该段最高点 best_idx_in_seg = np.argmax(seg_y) else: # mode == "lower" # 下沿:选该段最低点 best_idx_in_seg = np.argmin(seg_y) # 标记为选中 selected_mask[seg_list[best_idx_in_seg]] = True ``` **上沿示例(选最高点)**: ``` 第1段 [0:2): 索引0: 价格100 ← 最高 ✓ 索引1: 价格96 第2段 [2:4): 索引2: 价格95 索引3: 价格94 第3段 [4:6): 索引4: 价格98 ← 最高 ✓ 索引5: 价格92 结果: 选中索引 0, 3, 4 (没选错,第2段最高是索引2的95) ``` 等等,让我重新计算: ``` 排序后的数据: 索引: [10, 30, 50, 60, 80, 90] 价格: [100, 96, 95, 94, 98, 92] 位置: 0 1 2 3 4 5 第1段 [0:2) - 位置0,1: 位置0: 价格100 ← 最高 ✓ 位置1: 价格96 第2段 [2:4) - 位置2,3: 位置2: 价格95 ← 最高 ✓ 位置3: 价格94 第3段 [4:6) - 位置4,5: 位置4: 价格98 ← 最高 ✓ 位置5: 价格92 选中的拟合点: 位置0 (时间10, 价格100) 位置2 (时间50, 价格95) 位置4 (时间80, 价格98) ``` **下沿示例(选最低点)**: ``` 排序后的数据: 索引: [15, 35, 55, 75] 价格: [92, 90, 88, 86] 位置: 0 1 2 3 n = 4 ≤ 4,不分段,全部使用 ✓ ``` --- ## 独立处理机制 ### 关键点:高点和低点分别独立处理 ```python # 伪代码展示独立性 高点枢轴点 = [6个] → 分3段 → 选3个拟合点 低点枢轴点 = [4个] → 不分段 → 全部4个拟合点 # 两者完全独立,互不影响 ``` ### 为什么要独立分段? **原因1: 时间分布不同** ``` 价格 ^ | H H H H H H ← 6个高点(分布较均匀) | \ / | \ / | \ / | \ / | \ / | \ / | X | X | / \ | / \ | / \ | / \ | L L L L ← 4个低点(集中在两侧) └───────────────────────> 时间 ``` - 高点和低点出现的时间点不同 - 如果混合分段,会破坏各自的时间均衡性 **原因2: 数量可能不同** ``` 高点: 6个 > 4 → 需要分段 低点: 4个 ≤ 4 → 不需要分段 ``` - 如果混合判断,要么都分段,要么都不分段 - 独立判断更灵活,更合理 **原因3: 含义不同** - 高点定义上沿(压力线) - 低点定义下沿(支撑线) - 两条线的拟合完全独立 ### 独立分段的实现 ```python # 1. 高点独立处理 if len(高点枢轴) > 4: 高点分3段 → 选3个高点拟合上沿线 else: 全部高点拟合上沿线 # 2. 低点独立处理(完全独立的逻辑) if len(低点枢轴) > 4: 低点分3段 → 选3个低点拟合下沿线 else: 全部低点拟合下沿线 ``` --- ## 代码实现 ### 当前算法:迭代离群点移除(v2) ```python def fit_pivot_line( pivot_indices: np.ndarray, pivot_values: np.ndarray, mode: str = "upper", min_points: int = 2, outlier_threshold: float = 1.5, # 离群点阈值(标准差倍数) max_iterations: int = 3, # 最大迭代次数 ) -> Tuple[float, float, np.ndarray]: """ 迭代离群点移除的枢轴点拟合算法 策略: 1. 先用所有点做初始拟合 2. 识别并移除偏离拟合线的"弱"点 3. 迭代直到收敛 对于上沿线:移除价格明显低于拟合线的点(弱高点) 对于下沿线:移除价格明显高于拟合线的点(弱低点) """ if len(pivot_indices) < min_points: return 0.0, 0.0, np.array([]) # 按时间排序 sort_idx = np.argsort(pivot_indices) x_sorted = pivot_indices[sort_idx].astype(float) y_sorted = pivot_values[sort_idx] n = len(x_sorted) if n < 2: return 0.0, 0.0, np.array([]) # 初始化:所有点都参与 active_mask = np.ones(n, dtype=bool) min_keep = max(3, min_points) # 至少保留3个点 # ───────────────────────────────────────────────────────── # 迭代离群点移除 # ───────────────────────────────────────────────────────── for iteration in range(max_iterations): active_indices = np.where(active_mask)[0] if len(active_indices) <= min_keep: break # 当前活跃点 x_active = x_sorted[active_mask] y_active = y_sorted[active_mask] # 线性回归 a, b = fit_line(x_active, y_active) # 计算拟合值 fitted_values = a * x_active + b # ───────────────────────────────────────────────────── # 计算方向性残差 # ───────────────────────────────────────────────────── if mode == "upper": # 上沿:残差 = 拟合值 - 实际值 # 正残差表示点在拟合线下方(弱高点) residuals = fitted_values - y_active else: # 下沿:残差 = 实际值 - 拟合值 # 正残差表示点在拟合线上方(弱低点) residuals = y_active - fitted_values # 计算标准差 std_residual = np.std(residuals) if std_residual < 1e-10: # 所有点几乎在一条线上,无需移除 break # ───────────────────────────────────────────────────── # 识别并移除最大离群点 # ───────────────────────────────────────────────────── threshold = outlier_threshold * std_residual max_outlier_idx = np.argmax(residuals) if residuals[max_outlier_idx] <= threshold: # 最大残差不超过阈值,收敛 break if np.sum(active_mask) <= min_keep: break # 移除离群点 original_idx = active_indices[max_outlier_idx] active_mask[original_idx] = False # ───────────────────────────────────────────────────────── # 最终拟合 # ───────────────────────────────────────────────────────── selected_indices_sorted = np.where(active_mask)[0] selected_x = x_sorted[active_mask] selected_y = y_sorted[active_mask] if len(selected_x) < 2: # 兜底:使用首尾两点 active_mask = np.zeros(n, dtype=bool) active_mask[0] = True active_mask[-1] = True selected_x = x_sorted[active_mask] selected_y = y_sorted[active_mask] selected_indices_sorted = np.where(active_mask)[0] a, b = fit_line(selected_x, selected_y) # 返回原始索引 selected_original = sort_idx[selected_indices_sorted] return float(a), float(b), selected_original ``` ### 历史算法:分段选择(v1) > 以下代码已被替代,保留供参考。 ```python def fit_pivot_line_v1( pivot_indices: np.ndarray, pivot_values: np.ndarray, mode: str = "upper", min_points: int = 2, ) -> Tuple[float, float, np.ndarray]: """ 分段选择策略(已废弃) 策略: - 如果枢轴点 > 4:分3段,每段选1个极值点(共3个拟合点) - 如果枢轴点 ≤ 4:全部使用 """ # ... 详见历史版本 ``` --- ## 实际案例分析 ### 案例1: SZ002343 慈文传媒 **高点枢轴点(6个)**: ``` 时间索引: [2025-05-12, 2025-09-23, 2025-11-04, 2025-11-26, 2025-12-09, 2026-01-13] 价格: [9.50, 9.36, 10.07, 9.63, 9.21, 8.92] 位置: 0 1 2 3 4 5 ``` **分段处理**: ``` n = 6 > 4,需要分段 segment_size = 6 // 3 = 2 第1段 [0:2) - 位置0,1(2025-05到2025-09): 位置0: 9.50 位置1: 9.36 → 选最高: 位置0, 价格9.50 ✓ 第2段 [2:4) - 位置2,3(2025-11): 位置2: 10.07 ← 最高 ✓ 位置3: 9.63 → 选最高: 位置2, 价格10.07 ✓ 第3段 [4:6) - 位置4,5(2025-12到2026-01): 位置4: 9.21 位置5: 8.92 → 选最高: 位置4, 价格9.21 ✓ 拟合点(3个): 2025-05-12: 9.50 (早期高点) 2025-11-04: 10.07 (中期高点) ← 最高点 2025-12-09: 9.21 (晚期高点) ``` **低点枢轴点(4个)**: ``` 时间索引: [2025-08-08, 2025-12-11, 2025-12-30, 2026-01-20] 价格: [5.17, 7.37, 7.42, 6.87] 位置: 0 1 2 3 ``` **不分段处理**: ``` n = 4 ≤ 4,全部使用 拟合点(4个): 2025-08-08: 5.17 ✓ 2025-12-11: 7.37 ✓ 2025-12-30: 7.42 ✓ 2026-01-20: 6.87 ✓ ``` **结果对比**: | 项目 | 高点枢轴 | 低点枢轴 | |------|---------|---------| | 总数 | 6个 | 4个 | | 是否分段 | 是(>4) | 否(≤4) | | 拟合点数 | 3个 | 4个 | | 时间跨度 | 2025-05 → 2026-01 | 2025-08 → 2026-01 | --- ## 边界情况处理 ### 情况1: 点数恰好等于4 ```python n = 4 # 不分段 selected_mask = np.ones(4, dtype=bool) # 全部选中 ``` **理由**: - 4个点已经足够稳定 - 分3段会导致某些段只有1个点 - 全部使用能获得更好的拟合效果 ### 情况2: 点数为5 ```python n = 5 > 4 # 需要分段 segment_size = 5 // 3 = 1 第1段: [0:1) → 1个点 → 选1个 第2段: [1:2) → 1个点 → 选1个 第3段: [2:5) → 3个点 → 选最极值的1个 拟合点: 3个 ``` ### 情况3: 点数很多(如10个) ```python n = 10 > 4 # 需要分段 segment_size = 10 // 3 = 3 第1段: [0:3) → 3个点 → 选最极值的1个 第2段: [3:6) → 3个点 → 选最极值的1个 第3段: [6:10) → 4个点 → 选最极值的1个 拟合点: 3个 ``` **效果**: - ✅ 无论点数多少,最终都是3个代表点 - ✅ 时间均衡(前1/3、中1/3、后1/3) - ✅ 代表性强(每段最极端) ### 情况4: 分段后某段为空 ```python for seg in segments: if len(seg) == 0: # 跳过空段 continue # 处理非空段... ``` **何时发生**: - 理论上不会发生(`segment_size ≥ 1`) - 但代码仍然防御性地检查 ### 情况5: 选中点少于2个 ```python if len(selected_x) < 2: # 保底方案:强制选首尾两点 selected_mask[0] = True selected_mask[-1] = True ``` **何时触发**: - 极端异常情况(理论上不应发生) - 确保至少有两点可以画线 --- ## 可视化说明 ### 图表元素对应关系 在详细模式(`--show-details`)下,图表显示: ``` 图表元素 对应内容 ───────────────────────────────────────────── 浅红色小实心圆 所有高点枢轴点 深红色大空心圆 上沿拟合点(迭代算法选出) 浅绿色小实心圆 所有低点枢轴点 深绿色大空心圆 下沿拟合点(迭代算法选出) ``` **注意**:自 v2.0 起,分段竖线已被移除(因算法不再使用分段策略)。 --- ## 总结 ### 迭代离群点移除算法的核心要点 1. **初始状态**: 所有枢轴点参与拟合 2. **迭代移除**: 每次移除最大残差的离群点 3. **方向性残差**: 上沿移除低于线的点,下沿移除高于线的点 4. **收敛条件**: 无离群点 / 达到最大迭代次数 / 点数达到下限 5. **独立性**: 高点和低点各自独立处理 6. **保底机制**: 至少保留3个点用于拟合 ### 算法优势 | 优势 | 说明 | |------|------| | **自适应** | 自动识别并移除偏离点 | | **稳健性** | 不受单个异常点影响 | | **可控性** | 通过阈值和迭代次数控制 | | **保守性** | 至少保留3个点,避免过度过滤 | | **符合直觉** | 拟合结果与主观判断一致 | ### 与其他方法的对比 | 方法 | 优点 | 缺点 | |------|------|------| | **两点连线** | 简单快速 | 忽略中间点,易受噪声影响 | | **全部点回归** | 利用所有信息 | 权重不均,异常点影响大 | | **分段选择(v1)** | 时间均衡 | 可能选中弱枢轴点 | | **迭代离群点移除(当前)** | 自适应,稳健 | 需要迭代计算 | --- ## 参考资料 - [枢轴点拟合改进.md](./2026-01-26_枢轴点拟合改进.md) - 改进历程 - [图表详细模式功能.md](./2026-01-26_图表详细模式功能.md) - 可视化说明 - [converging_triangle.py](../src/converging_triangle.py) - 源代码实现 --- **文档版本**: v2.0 **最后更新**: 2026-01-26 **变更记录**: - v2.0: 采用迭代离群点移除算法,替代原分段选择策略 - v1.0: 初始版本,分段选择算法