-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
389 lines (319 loc) · 15.3 KB
/
cli.py
File metadata and controls
389 lines (319 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Tushare缓存系统命令行接口
"""
import argparse
import os
import sys
from dotenv import load_dotenv
# 添加当前目录到Python路径,确保能够导入本地模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 设置环境变量,确保Python使用UTF-8编码处理所有I/O
os.environ['PYTHONIOENCODING'] = 'utf-8'
# 简单的打印调试信息
print("DEBUG: CLI模块开始执行")
try:
# 尝试从安装的包中导入
from tushare_cache_system import TushareCacheSystem
from tushare_cache_system.shared_logger import get_logger
print("DEBUG: 从安装的包中成功导入")
except ImportError:
# 如果失败,尝试使用相对导入
try:
from . import TushareCacheSystem
from .shared_logger import get_logger
print("DEBUG: 使用相对导入成功")
except ImportError:
# 如果还失败,尝试直接导入模块
try:
from __init__ import TushareCacheSystem
from shared_logger import get_logger
print("DEBUG: 直接导入模块成功")
except ImportError:
# 最后尝试直接导入核心模块
try:
from core import TushareDataCacheCore
from stock_data import StockDataMixin
# 创建一个简单的类来模拟TushareCacheSystem
class TushareCacheSystem(TushareDataCacheCore, StockDataMixin):
def __init__(self, token=None, data_root=None, cache_size=1000, max_workers=4):
TushareDataCacheCore.__init__(self, data_root, token, cache_size, max_workers)
StockDataMixin.__init__(self)
from shared_logger import get_logger
print("DEBUG: 直接导入核心模块成功")
except Exception as e:
print(f"无法导入必要的模块: {e}")
sys.exit(1)
# 设置日志
logger = get_logger(__name__)
print("DEBUG: CLI模块已成功导入并初始化")
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='Tushare缓存系统命令行工具')
# 子命令
subparsers = parser.add_subparsers(dest='command', help='可用命令')
# 初始化命令
init_parser = subparsers.add_parser('init', help='初始化缓存系统')
init_parser.add_argument('--token', help='Tushare API Token')
init_parser.add_argument('--data-root', default='e:/code/optimized_quant_data/', help='数据根目录')
init_parser.add_argument('--cache-size', type=int, default=1000, help='缓存大小')
init_parser.add_argument('--max-workers', type=int, default=4, help='最大工作线程数')
# 获取数据命令
get_parser = subparsers.add_parser('get', help='获取数据')
get_parser.add_argument('type', choices=['stock', 'index', 'financial', 'market', 'fund'],
help='数据类型')
get_parser.add_argument('--symbol', help='股票/指数代码')
get_parser.add_argument('--symbols', help='多股票代码(逗号分隔)')
get_parser.add_argument('--symbols-file', help='从文件读取股票代码(每行一个)')
get_parser.add_argument('--start-date', help='开始日期 (YYYYMMDD)')
get_parser.add_argument('--end-date', help='结束日期 (YYYYMMDD)')
get_parser.add_argument('--save-fields', help='保存到本地时的字段集合(逗号分隔)')
get_parser.add_argument('--force-full-sync', action='store_true', help='强制全量同步指定日期范围')
get_parser.add_argument('--output', help='输出文件路径')
# 下载数据命令
download_parser = subparsers.add_parser('download', help='下载数据')
download_parser.add_argument('type', choices=['stock', 'index', 'financial', 'market', 'fund'],
help='数据类型')
download_parser.add_argument('--symbol', help='股票/指数代码')
download_parser.add_argument('--symbols', help='多股票代码(逗号分隔)')
download_parser.add_argument('--symbols-file', help='从文件读取股票代码(每行一个)')
download_parser.add_argument('--start-date', help='开始日期 (YYYYMMDD)')
download_parser.add_argument('--end-date', help='结束日期 (YYYYMMDD)')
download_parser.add_argument('--save-fields', help='保存到本地时的字段集合(逗号分隔)')
download_parser.add_argument('--force-full-sync', action='store_true', help='强制全量同步指定日期范围')
download_parser.add_argument('--output', help='输出文件路径')
# 缓存管理命令
cache_parser = subparsers.add_parser('cache', help='缓存管理')
cache_subparsers = cache_parser.add_subparsers(dest='cache_command', help='缓存操作')
cache_subparsers.add_parser('stats', help='显示缓存统计')
cache_subparsers.add_parser('save', help='保存缓存到磁盘')
clear_parser = cache_subparsers.add_parser('clear', help='清理缓存')
clear_parser.add_argument('type', choices=['memory', 'sqlite', 'file', 'all'],
help='缓存类型')
# 测试命令
test_parser = subparsers.add_parser('test', help='运行测试')
test_parser.add_argument('--module', help='测试特定模块')
return parser.parse_args()
def init_system(args):
"""初始化缓存系统"""
print("DEBUG: 开始初始化系统...")
# 尝试从环境变量加载token
load_dotenv()
token = getattr(args, 'token', None) or os.getenv('TUSHARE_TOKEN')
print(f"DEBUG: 从参数或环境变量获取Token: {token is not None}")
# 如果环境变量中没有token,尝试从配置文件读取
if not token:
try:
from config_reader import read_tushare_token
token = read_tushare_token()
if token:
print("DEBUG: 成功从配置文件读取Tushare Token")
except ImportError:
print("DEBUG: 未找到config_reader模块,无法从配置文件读取token")
except Exception as e:
print(f"DEBUG: 从配置文件读取token时出错: {e}")
# 对于不需要token的命令(如cache),可以使用测试token
if not token and args.command in ['cache']:
token = 'test_token'
print("DEBUG: 使用测试token")
if not token:
print("错误: 请提供Tushare Token (通过--token参数、TUSHARE_TOKEN环境变量或config.json配置文件)")
return None
# 添加调试信息
print(f"DEBUG: 使用的Token: {token[:10] if token else 'None'}...")
data_root = getattr(args, 'data_root', 'e:/code/optimized_quant_data/')
cache_size = getattr(args, 'cache_size', 1000)
max_workers = getattr(args, 'max_workers', 4)
print(f"DEBUG: 初始化参数 - data_root: {data_root}, cache_size: {cache_size}, max_workers: {max_workers}")
# 在初始化之前测试token
if token:
print(f"DEBUG: 尝试测试Token...")
try:
import tushare as ts
ts.set_token(token)
pro = ts.pro_api()
# 测试API连接
today = pro.trade_cal(exchange='SSE', start_date='20230101', end_date='20230101')
print(f"DEBUG: Token测试成功")
except Exception as e:
print(f"DEBUG: Token测试失败: {e}")
system = TushareCacheSystem(
token=token,
data_root=data_root,
cache_size=cache_size,
max_workers=max_workers
)
print(f"DEBUG: 系统初始化完成")
print(f"DEBUG: Tushare API状态: {getattr(system, 'pro', None) is not None}")
# 添加更多调试信息
if hasattr(system, 'pro'):
print(f"DEBUG: Tushare API对象: {system.pro}")
return system
def get_data(system, args):
"""获取数据"""
if not system:
print("错误: 系统未初始化,请先运行init命令")
return
print(f"DEBUG: 系统初始化状态: {system is not None}")
print(f"DEBUG: Tushare API状态: {getattr(system, 'pro', None) is not None}")
# 添加更多调试信息
if hasattr(system, 'pro'):
print(f"DEBUG: Tushare API对象: {system.pro}")
# 确保args有output属性
if not hasattr(args, 'output'):
args.output = None
if args.type == 'stock':
if (not args.symbol and not args.symbols and not args.symbols_file) or not args.start_date or not args.end_date:
print("错误: 获取股票数据需要提供--symbol/--symbols/--symbols-file与--start-date、--end-date")
return
def _is_cn_trading_hours():
from datetime import datetime, time as dtime
now = datetime.now()
if now.weekday() >= 5:
return False
t = now.time()
morning = (dtime(9, 30), dtime(11, 30))
afternoon = (dtime(13, 0), dtime(15, 0))
in_morning = morning[0] <= t <= morning[1]
in_afternoon = afternoon[0] <= t <= afternoon[1]
return in_morning or in_afternoon
save_fields = None
if args.save_fields:
save_fields = [f.strip() for f in args.save_fields.split(',') if f.strip()]
if args.force_full_sync and _is_cn_trading_hours():
print("当前为交易时段,已跳过全量同步以避免影响主脚本效率。请在收盘后重试。")
return
symbols = []
if args.symbol:
symbols.append(args.symbol)
if args.symbols:
symbols.extend([s.strip() for s in args.symbols.split(',') if s.strip()])
if args.symbols_file:
try:
with open(args.symbols_file, 'r', encoding='utf-8') as f:
for line in f:
code = line.strip()
if code:
symbols.append(code)
except Exception as e:
print(f"读取symbols文件失败: {e}")
return
# 检查Tushare API是否可用
if not hasattr(system, 'pro') or system.pro is None:
print("❌ 错误: Tushare API未正确初始化,请检查Token配置")
print(" 解决方案:")
print(" 1. 在config.json文件中配置有效的Tushare Token")
print(" 2. 设置环境变量TUSHARE_TOKEN")
print(" 3. 使用--token参数提供Token")
return
if len(symbols) > 1:
results = system.batch_get_stock_daily(symbols, args.start_date, args.end_date, fields=None, parallel=True, save_fields=save_fields, force_full_sync=args.force_full_sync)
# 检查结果并给出警告
successful_count = sum(1 for df in results.values() if df is not None and not df.empty)
failed_count = len(results) - successful_count
if failed_count > 0:
print(f"⚠️ 警告: {failed_count} 只股票数据获取失败,请检查日志")
total = sum(len(df) for df in results.values() if df is not None)
print(f"批量获取完成,共 {len(results)} 只股票,成功 {successful_count} 只,失败 {failed_count} 只,合计 {total} 条")
if args.output:
import os
os.makedirs(args.output, exist_ok=True)
for sym, df in results.items():
out = os.path.join(args.output, sym.replace('.', '_') + '.csv')
if df is not None:
df.to_csv(out, index=False)
else:
print(f"⚠️ 跳过保存 {sym}:无数据")
print(f"已保存到目录: {args.output}")
# 对于批量获取,我们不需要在函数末尾处理data变量
return
else:
sym = symbols[0]
data = system.get_stock_daily(sym, args.start_date, args.end_date, save_fields=save_fields, force_full_sync=args.force_full_sync)
elif args.type == 'index':
if not args.symbol or not args.start_date or not args.end_date:
print("错误: 获取指数数据需要提供--symbol, --start-date和--end-date参数")
return
data = system.get_index_daily(args.symbol, args.start_date, args.end_date)
elif args.type == 'financial':
if not args.symbol or not args.start_date or not args.end_date:
print("错误: 获取财务数据需要提供--symbol, --start-date和--end-date参数")
return
data = system.get_income(args.symbol, args.start_date, args.end_date)
else:
print(f"错误: 不支持的数据类型 {args.type}")
return
if data is not None and not data.empty:
print(f"获取到 {len(data)} 条数据")
print(data.head())
if args.output:
data.to_csv(args.output)
print(f"数据已保存到 {args.output}")
else:
print("❌ 未获取到数据,请检查:")
print(" 1. Token是否有效")
print(" 2. 股票代码是否正确")
print(" 3. 日期范围是否合理")
print(" 4. 网络连接是否正常")
def download_data(system, args):
"""下载数据"""
if not system:
print("错误: 系统未初始化,请先运行init命令")
return
print("开始下载数据...")
# 复用get_data的逻辑
get_data(system, args)
def manage_cache(system, args):
"""管理缓存"""
if not system:
print("错误: 系统未初始化,请先运行init命令")
return
if args.cache_command == 'stats':
stats = system.get_cache_stats()
print("缓存统计:")
for key, value in stats.items():
print(f" {key}: {value}")
elif args.cache_command == 'save':
system.save_cache()
print("缓存已保存到磁盘")
elif args.cache_command == 'clear':
system.clear_cache(args.type)
print(f"已清理 {args.type} 缓存")
def run_tests(args):
"""运行测试"""
if args.module:
print(f"运行 {args.module} 模块测试...")
# 这里可以添加特定模块的测试逻辑
print("测试功能待实现")
else:
print("运行系统测试...")
# 运行内置测试
from test_refactored_system import main
main()
def main():
"""主函数"""
args = parse_args()
if not args.command:
print("请指定命令,使用 --help 查看帮助")
return
system = None
# 对于需要系统的命令,先初始化系统
if args.command in ['get', 'cache', 'download']:
print("DEBUG: 正在初始化系统...")
system = init_system(args)
print(f"DEBUG: 系统初始化完成: {system is not None}")
# 执行相应命令
if args.command == 'init':
init_system(args)
elif args.command == 'get':
get_data(system, args)
elif args.command == 'download':
download_data(system, args)
elif args.command == 'cache':
manage_cache(system, args)
elif args.command == 'test':
run_tests(args)
if __name__ == '__main__':
main()