diff --git a/validate.py b/validate.py index 6cd5c6b..969c0d7 100644 --- a/validate.py +++ b/validate.py @@ -72,6 +72,94 @@ def get_window_start_date(window: str) -> str: return start_date.strftime('%Y%m%d') +# ── K 线数据聚合(与前端 filterAndProcessKLineData 一致)─────────────────────── +def aggregate_klines(klines: dict, freq: str) -> dict: + """ + 将日K数据聚合成周K或月K(与前端 filterAndProcessKLineData 一致) + + Args: + klines: 日K数据 { dates: [...], open: [...], high: [...], low: [...], close: [...] } + freq: 频率 '日' / '周' / '月' + + Returns: + 聚合后的 klines + """ + if freq == '日': + return klines + + from datetime import datetime + + dates = klines['dates'] + open_prices = klines['open'] + high_prices = klines['high'] + low_prices = klines['low'] + close_prices = klines['close'] + + # 构建日数据列表 + daily_data = [] + for i in range(len(dates)): + daily_data.append({ + 'date': dates[i], + 'open': open_prices[i], + 'high': high_prices[i], + 'low': low_prices[i], + 'close': close_prices[i], + }) + + # 按周/月分组 + grouped_data = {} + + for d in daily_data: + date_str = str(d['date']) # YYYYMMDD + year = int(date_str[:4]) + month = int(date_str[4:6]) + day = int(date_str[6:8]) + + date_obj = datetime(year, month, day) + + if freq == '周': + # 计算该日期所在周的周一 + day_of_week = date_obj.weekday() # 0 是周一 + from datetime import timedelta + monday = date_obj - timedelta(days=day_of_week) + key = monday.strftime('%Y%m%d') + else: # 月 + key = date_str[:6] # YYYYMM + + if key not in grouped_data: + grouped_data[key] = [] + grouped_data[key].append(d) + + # 聚合每组数据 + result_dates = [] + result_open = [] + result_high = [] + result_low = [] + result_close = [] + + for key in sorted(grouped_data.keys()): + group = grouped_data[key] + # 按日期排序 + group.sort(key=lambda x: x['date']) + + result_dates.append(group[-1]['date']) # 取最后一天作为标签 + result_open.append(group[0]['open']) # 开盘取第一天 + result_high.append(max(d['high'] for d in group)) # 最高取最高 + result_low.append(min(d['low'] for d in group)) # 最低取最低 + result_close.append(group[-1]['close']) # 收盘取最后一天 + + print(f'[INFO] K 线聚合: {len(dates)} 条日K → {len(result_dates)} 条{freq}K') + + return { + 'dates': result_dates, + 'open': result_open, + 'high': result_high, + 'low': result_low, + 'close': result_close, + 'volume': [0] * len(result_dates), + } + + # ── CLI 解析 ────────────────────────────────────────────────────────────────── def parse_args(): parser = argparse.ArgumentParser( @@ -335,6 +423,9 @@ def run(args): print('[ERROR] 没有 K 线数据,跳过绘图。') return + # 根据频率聚合 K 线数据(与前端 filterAndProcessKLineData 一致) + klines = aggregate_klines(klines, args.freq) + cd['klines'] = klines # 5. 绘图