fix: 添加日K/周K/月K聚合功能

- 新增 aggregate_klines() 函数,与前端 filterAndProcessKLineData 一致
- 周K:将同一周的日K聚合成一根K线
- 月K:将同一月的日K聚合成一根K线

测试结果:
- 日K:724 条
- 周K:154 条
- 月K:37 条

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
褚宏光 2026-03-04 15:14:29 +08:00
parent 430511e8c4
commit 604e88df4c

View File

@ -72,6 +72,94 @@ def get_window_start_date(window: str) -> str:
return start_date.strftime('%Y%m%d')
# ── K 线数据聚合(与前端 filterAndProcessKLineData 一致)───────────────────────
def aggregate_klines(klines: dict, freq: str) -> dict:
"""
将日K数据聚合成周K或月K与前端 filterAndProcessKLineData 一致
Args:
klines: 日K数据 { dates: [...], open: [...], high: [...], low: [...], close: [...] }
freq: 频率 '' / '' / ''
Returns:
聚合后的 klines
"""
if freq == '':
return klines
from datetime import datetime
dates = klines['dates']
open_prices = klines['open']
high_prices = klines['high']
low_prices = klines['low']
close_prices = klines['close']
# 构建日数据列表
daily_data = []
for i in range(len(dates)):
daily_data.append({
'date': dates[i],
'open': open_prices[i],
'high': high_prices[i],
'low': low_prices[i],
'close': close_prices[i],
})
# 按周/月分组
grouped_data = {}
for d in daily_data:
date_str = str(d['date']) # YYYYMMDD
year = int(date_str[:4])
month = int(date_str[4:6])
day = int(date_str[6:8])
date_obj = datetime(year, month, day)
if freq == '':
# 计算该日期所在周的周一
day_of_week = date_obj.weekday() # 0 是周一
from datetime import timedelta
monday = date_obj - timedelta(days=day_of_week)
key = monday.strftime('%Y%m%d')
else: # 月
key = date_str[:6] # YYYYMM
if key not in grouped_data:
grouped_data[key] = []
grouped_data[key].append(d)
# 聚合每组数据
result_dates = []
result_open = []
result_high = []
result_low = []
result_close = []
for key in sorted(grouped_data.keys()):
group = grouped_data[key]
# 按日期排序
group.sort(key=lambda x: x['date'])
result_dates.append(group[-1]['date']) # 取最后一天作为标签
result_open.append(group[0]['open']) # 开盘取第一天
result_high.append(max(d['high'] for d in group)) # 最高取最高
result_low.append(min(d['low'] for d in group)) # 最低取最低
result_close.append(group[-1]['close']) # 收盘取最后一天
print(f'[INFO] K 线聚合: {len(dates)} 条日K → {len(result_dates)}{freq}K')
return {
'dates': result_dates,
'open': result_open,
'high': result_high,
'low': result_low,
'close': result_close,
'volume': [0] * len(result_dates),
}
# ── CLI 解析 ──────────────────────────────────────────────────────────────────
def parse_args():
parser = argparse.ArgumentParser(
@ -335,6 +423,9 @@ def run(args):
print('[ERROR] 没有 K 线数据,跳过绘图。')
return
# 根据频率聚合 K 线数据(与前端 filterAndProcessKLineData 一致)
klines = aggregate_klines(klines, args.freq)
cd['klines'] = klines
# 5. 绘图