- Add scripts/scoring/ module with normalizer, sensitivity analysis, and config - Enhance stock_viewer.html with standardized scoring display - Add integration tests and normalization verification scripts - Add documentation for standardization implementation and usage guides - Add data distribution analysis reports for strength scoring dimensions - Update discussion documents with algorithm optimization plans
286 lines
8.4 KiB
Python
286 lines
8.4 KiB
Python
"""
|
||
强度分标准化模块
|
||
|
||
针对不同分布类型的得分字段,采用不同的标准化策略,
|
||
使得所有维度在标准化后具有可比性,能够进行等权相加。
|
||
|
||
核心问题:
|
||
1. 零膨胀分布(突破幅度分、成交量分):中位数=0
|
||
2. 点质量分布(倾斜度分):75%的值=0.5
|
||
3. 低区分度(形态规则度):中位数极低
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from typing import Literal
|
||
|
||
|
||
def normalize_zero_inflated(series: pd.Series) -> pd.Series:
|
||
"""
|
||
零膨胀分布标准化
|
||
|
||
适用于:突破幅度分(up/down)、成交量分
|
||
|
||
策略:
|
||
- 零值 -> 0.5(中性基准)
|
||
- 非零值 -> 在[0.5, 1.0]区间内按排名映射
|
||
|
||
原理:
|
||
- 零值代表"未发生"(未突破/无放量),赋予中性分0.5
|
||
- 非零值代表"已发生",根据强度排名在0.5-1.0之间分配
|
||
- 这样既保留了"零vs非零"的质的差异,又在非零内部保持了量的差异
|
||
|
||
Args:
|
||
series: 原始得分序列
|
||
|
||
Returns:
|
||
标准化后的序列,范围[0.5, 1.0]
|
||
"""
|
||
result = pd.Series(0.5, index=series.index, dtype=float)
|
||
|
||
# 找出非零值
|
||
nonzero_mask = series > 1e-6 # 使用小容差避免浮点误差
|
||
|
||
if nonzero_mask.sum() > 0:
|
||
# 非零值按百分位排名
|
||
ranks = series[nonzero_mask].rank(pct=True) # [0, 1]
|
||
# 映射到[0.5, 1.0]
|
||
result[nonzero_mask] = 0.5 + 0.5 * ranks
|
||
|
||
return result
|
||
|
||
|
||
def normalize_point_mass(series: pd.Series, center: float = 0.5, tol: float = 0.001) -> pd.Series:
|
||
"""
|
||
点质量分布标准化
|
||
|
||
适用于:倾斜度分
|
||
|
||
策略:
|
||
- 中心值(0.5)附近的保持不变
|
||
- 偏离中心的值按偏离程度拉伸
|
||
|
||
原理:
|
||
- 75%的值恰好=0.5(对称三角形),这些保持0.5
|
||
- 剩余25%偏离0.5的值,分别向两侧拉伸
|
||
- 正偏离(>0.5)拉伸到[0.5, 1.0]
|
||
- 负偏离(<0.5)拉伸到[0.0, 0.5]
|
||
|
||
Args:
|
||
series: 原始得分序列
|
||
center: 中心值(默认0.5)
|
||
tol: 容差,在center±tol内的都视为中心值
|
||
|
||
Returns:
|
||
标准化后的序列,范围[0, 1]
|
||
"""
|
||
result = pd.Series(center, index=series.index, dtype=float)
|
||
deviation = series - center
|
||
|
||
# 正偏离:> center + tol
|
||
pos_mask = deviation > tol
|
||
if pos_mask.sum() > 0:
|
||
pos_dev = deviation[pos_mask]
|
||
ranks = pos_dev.rank(pct=True) # [0, 1]
|
||
result[pos_mask] = center + 0.5 * ranks # [center, 1.0]
|
||
|
||
# 负偏离:< center - tol
|
||
neg_mask = deviation < -tol
|
||
if neg_mask.sum() > 0:
|
||
neg_dev = deviation[neg_mask].abs()
|
||
ranks = neg_dev.rank(pct=True, ascending=False) # 越偏离越小
|
||
result[neg_mask] = center * (1 - ranks) # [0.0, center]
|
||
|
||
return result
|
||
|
||
|
||
def normalize_standard(series: pd.Series) -> pd.Series:
|
||
"""
|
||
标准分位数标准化
|
||
|
||
适用于:收敛度分、价格活跃度
|
||
|
||
策略:
|
||
- 直接转换为百分位排名
|
||
|
||
原理:
|
||
- 这些维度分布相对正常(近均匀或近正态)
|
||
- 直接排名即可,最小值->0,最大值->1
|
||
|
||
Args:
|
||
series: 原始得分序列
|
||
|
||
Returns:
|
||
标准化后的序列,范围[0, 1]
|
||
"""
|
||
return series.rank(pct=True)
|
||
|
||
|
||
def normalize_low_variance(series: pd.Series, expansion_factor: float = 10.0) -> pd.Series:
|
||
"""
|
||
低区分度分布标准化
|
||
|
||
适用于:形态规则度
|
||
|
||
策略:
|
||
- 对数变换扩大小值区间的区分度
|
||
- 然后进行分位数标准化
|
||
|
||
原理:
|
||
- 形态规则度普遍极低(中位数0.005),直接排名区分度差
|
||
- log1p变换可以拉开小值之间的差距
|
||
- 0.001 -> log1p(0.01) = 0.0099
|
||
- 0.010 -> log1p(0.10) = 0.0953 (差距放大9倍)
|
||
|
||
Args:
|
||
series: 原始得分序列
|
||
expansion_factor: 放大因子(先乘以此因子再取对数)
|
||
|
||
Returns:
|
||
标准化后的序列,范围[0, 1]
|
||
"""
|
||
# 对数变换扩大区分度
|
||
log_transformed = np.log1p(series * expansion_factor)
|
||
# 分位数标准化
|
||
return log_transformed.rank(pct=True)
|
||
|
||
|
||
def normalize_all(df: pd.DataFrame) -> pd.DataFrame:
|
||
"""
|
||
对all_results.csv中的所有得分字段进行分层标准化
|
||
|
||
处理映射:
|
||
- price_score_up, price_score_down, volume_score: 零膨胀标准化
|
||
- tilt_score: 点质量标准化
|
||
- convergence_score, activity_score: 标准分位数标准化
|
||
- geometry_score: 低区分度标准化
|
||
|
||
Args:
|
||
df: 原始数据DataFrame,需包含以下字段:
|
||
- price_score_up, price_score_down
|
||
- convergence_score, volume_score
|
||
- geometry_score, activity_score, tilt_score
|
||
|
||
Returns:
|
||
标准化后的DataFrame,新增带_norm后缀的字段
|
||
"""
|
||
result = df.copy()
|
||
|
||
# 1. 零膨胀分布:突破幅度分、成交量分
|
||
for col in ['price_score_up', 'price_score_down', 'volume_score']:
|
||
if col in df.columns:
|
||
result[f'{col}_norm'] = normalize_zero_inflated(df[col])
|
||
|
||
# 2. 点质量分布:倾斜度分
|
||
if 'tilt_score' in df.columns:
|
||
result['tilt_score_norm'] = normalize_point_mass(df['tilt_score'], center=0.5)
|
||
|
||
# 3. 标准分位数:收敛度分、价格活跃度
|
||
for col in ['convergence_score', 'activity_score']:
|
||
if col in df.columns:
|
||
result[f'{col}_norm'] = normalize_standard(df[col])
|
||
|
||
# 4. 低区分度:形态规则度
|
||
if 'geometry_score' in df.columns:
|
||
result['geometry_score_norm'] = normalize_low_variance(df['geometry_score'])
|
||
|
||
return result
|
||
|
||
|
||
def calculate_strength_equal_weight(
|
||
df_normalized: pd.DataFrame,
|
||
direction: Literal['up', 'down'] = 'up'
|
||
) -> pd.Series:
|
||
"""
|
||
等权强度分计算(基于标准化后的数据)
|
||
|
||
Args:
|
||
df_normalized: 标准化后的DataFrame,需包含*_norm字段
|
||
direction: 突破方向,'up'或'down'
|
||
|
||
Returns:
|
||
等权强度分序列,范围[0, 1]
|
||
"""
|
||
# 选择对应方向的突破幅度分
|
||
price_col = f'price_score_{direction}_norm'
|
||
|
||
# 等权计算:各1/6
|
||
strength = (
|
||
df_normalized[price_col] +
|
||
df_normalized['convergence_score_norm'] +
|
||
df_normalized['volume_score_norm'] +
|
||
df_normalized['geometry_score_norm'] +
|
||
df_normalized['activity_score_norm'] +
|
||
df_normalized['tilt_score_norm']
|
||
) / 6.0
|
||
|
||
return strength
|
||
|
||
|
||
def normalize_and_score(
|
||
df: pd.DataFrame,
|
||
direction: Literal['up', 'down'] = 'up'
|
||
) -> pd.DataFrame:
|
||
"""
|
||
一站式:标准化 + 等权计算
|
||
|
||
Args:
|
||
df: 原始数据DataFrame
|
||
direction: 突破方向
|
||
|
||
Returns:
|
||
包含标准化字段和等权强度分的DataFrame
|
||
"""
|
||
# 标准化
|
||
result = normalize_all(df)
|
||
|
||
# 计算等权强度分
|
||
result[f'strength_{direction}_equal'] = calculate_strength_equal_weight(
|
||
result, direction=direction
|
||
)
|
||
|
||
return result
|
||
|
||
|
||
if __name__ == "__main__":
|
||
"""
|
||
测试和演示
|
||
"""
|
||
import sys
|
||
import os
|
||
|
||
# 添加路径
|
||
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
|
||
|
||
# 读取数据
|
||
data_path = os.path.join(
|
||
os.path.dirname(__file__),
|
||
"..", "..", "outputs", "converging_triangles", "all_results.csv"
|
||
)
|
||
|
||
if os.path.exists(data_path):
|
||
print("=" * 80)
|
||
print("强度分标准化模块测试")
|
||
print("=" * 80)
|
||
|
||
df = pd.read_csv(data_path)
|
||
print(f"\n加载数据: {len(df)} 条记录")
|
||
|
||
# 标准化
|
||
df_norm = normalize_all(df)
|
||
print(f"标准化完成: 新增 {df_norm.columns.difference(df.columns).tolist()} 字段")
|
||
|
||
# 统计
|
||
print("\n标准化后中位数对比:")
|
||
for col in ['price_score_up', 'price_score_down', 'convergence_score',
|
||
'volume_score', 'geometry_score', 'activity_score', 'tilt_score']:
|
||
if col in df.columns:
|
||
before = df[col].median()
|
||
after = df_norm[f'{col}_norm'].median()
|
||
print(f" {col:20s}: {before:.4f} -> {after:.4f}")
|
||
|
||
print("\n测试通过!")
|
||
else:
|
||
print(f"数据文件不存在: {data_path}")
|
||
print("请先运行检测脚本生成数据")
|