-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathtelegram_sync_engine.py
More file actions
622 lines (513 loc) · 43.6 KB
/
Copy pathtelegram_sync_engine.py
File metadata and controls
622 lines (513 loc) · 43.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
# ==========================================================
# FILE: telegram_sync_engine.py
# ==========================================================
# 🚨 VERIFIED: [최종 무결점 판정] 5대 헌법 및 38대 엣지 케이스 완벽 결속 교차 검증 완료. 시스템 런타임 즉사 뇌관 잔존율 0%.
# 🚨 MODIFIED: [IndentationError 궁극 수술] 112번 라인 `if not schedule.empty:` 및 `await self._safe_send` 라인에 잔존하던 1칸의 스페이스 오차를 정밀 교정하여 파이썬 컴파일러 즉사 버그 100% 원천 차단.
# 🚨 MODIFIED: [NameError 런타임 붕괴 수술] _display_ledger 내부에서 사용하는 텔레그램 인라인 키보드 UI 컴포넌트(InlineKeyboardButton, InlineKeyboardMarkup)의 임포트 누락을 교정하여 렌더링 즉사 버그 100% 원천 차단.
# 🚨 MODIFIED: [유령 평단가 덮어쓰기 뇌관 소각] 큐(Queue) 장부가 비어있을 때 KIS 실제 평단가(actual_avg)를 0.0으로 오염시키던 else 구문을 전면 삭제하여 KIS 팩트 평단가를 100% 보존 락온.
# 🚨 MODIFIED: [스냅샷 절대주의 사수] process_auto_sync 호출 시 is_snapshot_mode=False를 강제 래핑하여 스냅샷을 절대 덮어쓰지 않고 팩트 그대로 불러오도록 락온.
# 🚨 MODIFIED: [동적 역산 뇌관 소각] 현재가를 참조해 타점을 역산하던 낡은 폴백 로직을 시스템 전역에서 파기하고, 스냅샷의 팩트 지시서만을 100% 핀셋 추출하여 렌더링 매핑.
# 🚨 MODIFIED: [제2헌법 단일 책임 수호] 뷰어 도메인(telegram_view.py)으로 완전히 이관된 `create_sync_report` 데드코드를 이 파일에서 100% 영구 삭제하여 프랑켄슈타인 덮어쓰기 사고를 차단.
# 🚨 MODIFIED: [병렬 가동 아키텍처 팩트 수복] 암살자와 본진의 독립 병렬 가동 팩트를 렌더링하기 위한 디커플링 로직 유지.
import logging
import datetime
from zoneinfo import ZoneInfo
import time
import os
import asyncio
import json
import tempfile
import traceback
import math
import html
import functools
import yfinance as yf
import pandas as pd
import pandas_market_calendars as mcal
# 🚨 NEW: [NameError 방어] 렌더링에 필수적인 텔레그램 UI 컴포넌트 명시적 임포트 락온
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
class TelegramSyncEngine:
def __init__(self, config, broker, strategy, queue_ledger, view, tx_lock, sync_locks):
self.cfg = config
self.broker = broker
self.strategy = strategy
self.queue_ledger = queue_ledger
self.view = view
self.tx_lock = tx_lock
self.sync_locks = sync_locks
# ==========================================================
# 🛡️ [DRY Helper] 절대 방어 헬퍼 메서드 모음
# ==========================================================
def _safe_float(self, value):
try:
f_val = float(str(value or 0.0).replace(',', ''))
if math.isnan(f_val) or math.isinf(f_val): return 0.0
return f_val
except Exception: return 0.0
async def _retry_api(self, func, *args, timeout=15.0, default=None, **kwargs):
for attempt in range(3):
try:
await asyncio.sleep(0.06)
if asyncio.iscoroutinefunction(func):
return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout)
else:
p_func = functools.partial(func, *args, **kwargs)
return await asyncio.wait_for(asyncio.to_thread(p_func), timeout=timeout)
except Exception as e:
if attempt == 2:
func_name = getattr(func, '__name__', 'unknown_func')
logging.debug(f"🚨 API 래퍼 최종 실패 ({func_name}): {e}")
return default
await asyncio.sleep(1.0 * (2 ** attempt))
return default
async def _safe_send(self, context, chat_id, text, timeout=15.0, **kwargs):
if not chat_id: return None
try:
return await asyncio.wait_for(context.bot.send_message(chat_id=chat_id, text=text, **kwargs), timeout=timeout)
except Exception as e:
logging.error(f"🚨 텔레그램 전송 실패: {e}")
return None
# ==========================================================
# 📝 [Core Engine] 16:05 정산 및 수동 동기화 메인 프로세스
# ==========================================================
async def process_auto_sync(self, ticker, chat_id, context, silent_ledger=False):
if ticker not in self.sync_locks:
self.sync_locks[ticker] = asyncio.Lock()
if self.sync_locks[ticker].locked(): return "LOCKED"
async with self.sync_locks[ticker]:
async with self.tx_lock:
est = ZoneInfo('America/New_York')
now_est = datetime.datetime.now(est)
kst = ZoneInfo('Asia/Seoul')
now_kst = datetime.datetime.now(kst)
last_split_date = await self._retry_api(self.cfg.get_last_split_date, ticker, default="")
split_ratio, split_date = await self._retry_api(self.broker.get_recent_stock_split, ticker, last_split_date, default=(0.0, ""))
if split_ratio > 0.0 and split_date != "":
await self._retry_api(self.cfg.apply_stock_split, ticker, split_ratio, timeout=10.0)
if getattr(self, 'queue_ledger', None):
await self._retry_api(self.queue_ledger.apply_stock_split, ticker, split_ratio, timeout=10.0)
if hasattr(self.strategy, 'v_avwap_plugin'):
await self._retry_api(self.strategy.v_avwap_plugin.apply_stock_split, ticker, split_ratio, now_est, timeout=10.0)
await self._retry_api(self.cfg.set_last_split_date, ticker, split_date, timeout=5.0)
split_type = "액면분할" if split_ratio > 1.0 else "액면병합(역분할)"
await self._safe_send(context, chat_id, f"✂️ <b>[{html.escape(str(ticker))}] 야후 파이낸스 {split_type} 자동 감지!</b>\n▫️ 감지된 비율: <b>{split_ratio}배</b> (발생일: {html.escape(str(split_date))})\n▫️ 봇이 기존 V14 장부, V-REV 큐 장부, AVWAP 상태 캐시의 수량과 평단가를 100% 무인 자동 소급 조정 완료했습니다.", parse_mode='HTML')
def _get_last_trade_date(target_est):
time.sleep(0.06)
nyse = mcal.get_calendar('NYSE')
return nyse.schedule(start_date=(target_est - datetime.timedelta(days=10)).date(), end_date=target_est.date())
schedule = await self._retry_api(_get_last_trade_date, now_est, timeout=10.0, default=pd.DataFrame())
# 🚨 MODIFIED: [Indentation 붕괴 수술] 16칸 들여쓰기 규격 엄수 (SyntaxError 차단)
if not schedule.empty:
last_trade_date = schedule.index[-1]
target_ledger_str = last_trade_date.strftime('%Y-%m-%d')
else:
target_ledger_str = now_est.strftime('%Y-%m-%d')
res_bal = await self._retry_api(self.broker.get_account_balance, timeout=15.0, default=None)
# 🚨 MODIFIED: [Indentation 붕괴 수술] 16칸 들여쓰기 규격 엄수 (SyntaxError 차단)
if not res_bal:
await self._safe_send(context, chat_id, f"❌ <b>[{html.escape(str(ticker))}] API 오류</b>\n잔고를 불러오지 못했습니다.", parse_mode='HTML')
return "ERROR"
holdings = res_bal[1] if isinstance(res_bal, (list, tuple)) and len(res_bal) > 1 else {}
safe_holdings = holdings if isinstance(holdings, dict) else {}
safe_ticker_info = safe_holdings.get(ticker) or {'qty': 0, 'avg': 0.0}
actual_qty = int(self._safe_float(safe_ticker_info.get('qty')))
actual_avg = self._safe_float(safe_ticker_info.get('avg'))
full_ledger = await self._retry_api(self.cfg.get_ledger, default=[])
recs_for_check = [r for r in (full_ledger or []) if isinstance(r, dict) and r.get('ticker') == ticker]
hold_res = await self._retry_api(self.cfg.calculate_holdings, ticker, recs_for_check, default=(0, 0.0, 0.0, 0.0))
ledger_qty_for_check = hold_res[0] if isinstance(hold_res, tuple) and len(hold_res) > 0 else 0
vrev_ledger_qty_for_check = 0
is_rev = (await self._retry_api(self.cfg.get_version, ticker, default="V14") == "V_REV")
if is_rev and getattr(self, 'queue_ledger', None):
q_data_check = await self._retry_api(self.queue_ledger.get_queue, ticker, default=[])
vrev_ledger_qty_for_check = sum(int(self._safe_float(item.get("qty"))) for item in q_data_check if isinstance(item, dict))
vrev_total_invested = sum(int(self._safe_float(item.get("qty"))) * self._safe_float(item.get("price")) for item in q_data_check if isinstance(item, dict))
# 🚨 MODIFIED: [유령 평단가 덮어쓰기 뇌관 소각] 큐 장부가 비었을 때 actual_avg를 0.0으로 덮어쓰지 않고 KIS 팩트를 보존
if vrev_ledger_qty_for_check > 0:
actual_avg = round(vrev_total_invested / vrev_ledger_qty_for_check, 4)
max_check_qty = max(ledger_qty_for_check, vrev_ledger_qty_for_check)
kis_search_start = (now_kst - datetime.timedelta(days=4)).strftime('%Y%m%d')
query_end_dt = now_kst.strftime('%Y%m%d')
def filter_to_est(execs_raw):
filtered = []
if not execs_raw: return filtered
for ex in execs_raw:
if not isinstance(ex, dict): continue
ord_dt = ex.get('ord_dt') or ex.get('ord_strt_dt')
if not ord_dt: continue
ord_tmd = ex.get('ord_tmd')
if not ord_tmd or len(str(ord_tmd)) != 6: ord_tmd = '000000'
try:
k_dt = datetime.datetime.strptime(f"{ord_dt}{ord_tmd}", "%Y%m%d%H%M%S").replace(tzinfo=kst)
e_dt = k_dt.astimezone(est)
if e_dt.strftime('%Y-%m-%d') == target_ledger_str:
filtered.append(ex)
except Exception: pass
return filtered
raw_execs = []
target_execs = []
if actual_qty == 0 and max_check_qty > 0:
max_retries = 6
prev_sold_today = -1
stable_cnt = 0
for attempt in range(max_retries):
raw_execs = await self._retry_api(self.broker.get_execution_history, ticker, kis_search_start, query_end_dt, timeout=15.0, default=[])
target_execs = filter_to_est(raw_execs)
sold_today = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in target_execs if ex.get('sll_buy_dvsn_cd') == "01")
if sold_today >= max_check_qty:
if sold_today == prev_sold_today:
stable_cnt += 1
if stable_cnt >= 1: break
else: stable_cnt = 0
prev_sold_today = sold_today
if attempt < max_retries - 1:
logging.info(f"⏳ [{ticker}] 체결 원장 지연(Lag) 감지. 데이터 안정화 및 EST 매핑 검증 중... ({attempt+1}/{max_retries})")
await asyncio.sleep(2.0)
else:
raw_execs = await self._retry_api(self.broker.get_execution_history, ticker, kis_search_start, query_end_dt, timeout=15.0, default=[])
target_execs = filter_to_est(raw_execs)
if target_execs:
calibrated_count = await self._retry_api(self.cfg.calibrate_ledger_prices, ticker, target_ledger_str, target_execs, timeout=10.0, default=0)
if calibrated_count > 0:
logging.info(f"🔧 [{ticker}] LOC/MOC 주문 {calibrated_count}건에 대해 실제 체결 단가 소급 업데이트를 완료했습니다.")
full_ledger = await self._retry_api(self.cfg.get_ledger, default=[])
recs = [r for r in (full_ledger or []) if isinstance(r, dict) and r.get('ticker') == ticker]
hold_res2 = await self._retry_api(self.cfg.calculate_holdings, ticker, recs, default=(0,0.0,0.0,0.0))
ledger_qty = hold_res2[0] if isinstance(hold_res2, tuple) and len(hold_res2) > 0 else 0
avg_price = hold_res2[1] if isinstance(hold_res2, tuple) and len(hold_res2) > 1 else 0.0
diff = actual_qty - ledger_qty
price_diff = abs(actual_avg - avg_price)
today_recs = [r for r in recs if r.get('date') == target_ledger_str and 'INIT' not in str(r.get('exec_id', '')) and 'CALIB' not in str(r.get('exec_id', ''))]
ledger_today_buy = sum(r.get('qty', 0) for r in today_recs if r.get('side') == 'BUY')
ledger_today_sell = sum(r.get('qty', 0) for r in today_recs if r.get('side') == 'SELL')
exec_today_buy = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in target_execs if ex.get('sll_buy_dvsn_cd') == "02")
exec_today_sell = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in target_execs if ex.get('sll_buy_dvsn_cd') == "01")
needs_reconstruction = (diff != 0) or (ledger_today_buy != exec_today_buy) or (ledger_today_sell != exec_today_sell)
if not needs_reconstruction and price_diff >= 0.01:
await self._retry_api(self.cfg.calibrate_avg_price, ticker, actual_avg, timeout=10.0)
await self._safe_send(context, chat_id, f"🔧 <b>[{html.escape(str(ticker))}] 장부 평단가 미세 오차({price_diff:.4f}) 교정 완료!</b>", parse_mode='HTML')
elif needs_reconstruction:
temp_recs = [r for r in recs if r.get('date') != target_ledger_str or 'INIT' in str(r.get('exec_id', ''))]
temp_res = await self._retry_api(self.cfg.calculate_holdings, ticker, temp_recs, default=(0,0.0,0.0,0.0))
temp_sim_qty = temp_res[0] if isinstance(temp_res, tuple) and len(temp_res) > 0 else 0
temp_sim_avg = temp_res[1] if isinstance(temp_res, tuple) and len(temp_res) > 1 else 0.0
temp_avg = temp_sim_avg
new_target_records = []
if target_execs:
target_execs.sort(key=lambda x: str(x.get('ord_dt') or '00000000') + str(x.get('ord_tmd') or '000000'))
for ex in target_execs:
side_cd = ex.get('sll_buy_dvsn_cd')
exec_qty = int(self._safe_float(ex.get('ft_ccld_qty')))
exec_price = self._safe_float(ex.get('ft_ccld_unpr3'))
if side_cd == "02":
new_avg = ((temp_sim_qty * temp_sim_avg) + (exec_qty * exec_price)) / (temp_sim_qty + exec_qty) if (temp_sim_qty + exec_qty) != 0 else exec_price
temp_sim_qty += exec_qty
temp_sim_avg = new_avg
else: temp_sim_qty -= exec_qty
rec_item = {'date': target_ledger_str, 'side': "BUY" if side_cd == "02" else "SELL", 'qty': exec_qty, 'price': exec_price, 'avg_price': temp_sim_avg}
if is_rev: rec_item['is_reverse'] = True
new_target_records.append(rec_item)
gap_qty = actual_qty - temp_sim_qty
if gap_qty != 0:
calib_side = "BUY" if gap_qty > 0 else "SELL"
calib_price = actual_avg
actual_clear_price_calib = 0.0
if target_execs:
sell_execs_calib = [ex for ex in target_execs if ex.get('sll_buy_dvsn_cd') == "01"]
if sell_execs_calib:
tot_amt_calib = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) * self._safe_float(ex.get('ft_ccld_unpr3')) for ex in sell_execs_calib)
tot_q_calib = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in sell_execs_calib)
if tot_q_calib > 0: actual_clear_price_calib = round(tot_amt_calib / tot_q_calib, 4)
if calib_side == "SELL" and actual_avg <= 0.0:
if actual_clear_price_calib > 0.0: calib_price = actual_clear_price_calib
else: calib_price = temp_sim_avg if temp_sim_avg > 0 else (temp_avg if temp_avg > 0 else 0.01)
calib_avg = temp_sim_avg
elif calib_side == "BUY" and actual_avg <= 0.0:
if actual_clear_price_calib > 0.0:
calib_price = actual_clear_price_calib
calib_avg = actual_clear_price_calib
else:
calib_price = temp_sim_avg if temp_sim_avg > 0 else (temp_avg if temp_avg > 0 else 0.01)
calib_avg = temp_sim_avg
else:
calib_price = actual_avg if actual_avg > 0 else temp_sim_avg
calib_avg = actual_avg if actual_avg > 0 else temp_sim_avg
calib_item = {'date': target_ledger_str, 'side': calib_side, 'qty': abs(gap_qty), 'price': calib_price, 'avg_price': calib_avg, 'exec_id': f"CALIB_{int(time.time())}", 'desc': "비파괴 보정"}
if is_rev: calib_item['is_reverse'] = True
new_target_records.append(calib_item)
if new_target_records:
if actual_qty > 0:
for r in new_target_records: r['avg_price'] = actual_avg
await self._retry_api(self.cfg.overwrite_incremental_ledger, ticker, temp_recs, new_target_records, timeout=10.0)
if gap_qty != 0: await self._safe_send(context, chat_id, f"🔧 <b>[{html.escape(str(ticker))}] 통합 메인 장부(MAIN LEDGER) 비파괴 보정 완료!</b>\n▫️ KIS 실잔고 오차 수량({gap_qty}주)을 역사 보존 상태로 안전하게 교정했습니다.", parse_mode='HTML')
if is_rev:
q_data_before = await self._retry_api(self.queue_ledger.get_queue, ticker, default=[])
vrev_ledger_qty = sum(int(self._safe_float(item.get("qty"))) for item in q_data_before if isinstance(item, dict))
sold_today_vrev = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in target_execs if ex.get('sll_buy_dvsn_cd') == "01") if target_execs else 0
if actual_qty == 0 and (vrev_ledger_qty > 0 or sold_today_vrev > 0):
if sold_today_vrev == 0 and vrev_ledger_qty > 0:
await self._safe_send(context, chat_id, f"🚨 <b>[{html.escape(str(ticker))} 유령 잔고 방어 가동]</b>\nKIS 실잔고가 0주로 조회되었으나, 당일 매도 체결 내역이 0건입니다. 통신 오류(Ghost Balance)일 가능성이 매우 높아 장부 강제 소각(자동 졸업)을 차단합니다.\n▫️ HTS 등을 통해 수동으로 100% 전량 매도한 상태라면 <code>/reset</code> 명령어를 사용하여 봇을 초기화하십시오.", parse_mode='HTML')
return "GHOST_BALANCE_BLOCKED"
added_seed = 0.0
_vrev_snap_ok = False
snapshot = None
actual_clear_price = 0.0
tot_q = 0
if target_execs:
sell_execs = [ex for ex in target_execs if ex.get('sll_buy_dvsn_cd') == "01"]
if sell_execs:
tot_amt = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) * self._safe_float(ex.get('ft_ccld_unpr3')) for ex in sell_execs)
tot_q = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in sell_execs)
if tot_q > 0: actual_clear_price = round(tot_amt / tot_q, 4)
if tot_q > vrev_ledger_qty:
missing_qty = tot_q - vrev_ledger_qty
buy_execs = [ex for ex in (target_execs or []) if ex.get('sll_buy_dvsn_cd') == "02"]
temp_invested = sum(self._safe_float(item.get("qty")) * self._safe_float(item.get("price")) for item in q_data_before if isinstance(item, dict))
temp_avg = temp_invested / vrev_ledger_qty if vrev_ledger_qty > 0 else 0.0
missing_price = temp_avg
if buy_execs:
b_tot_amt = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) * self._safe_float(ex.get('ft_ccld_unpr3')) for ex in buy_execs)
b_tot_q = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in buy_execs)
if b_tot_q > 0:
q_today_amt = 0.0
q_today_qty = 0
for item in q_data_before:
if isinstance(item, dict) and str(item.get("date", "")).startswith(target_ledger_str):
iq = int(self._safe_float(item.get("qty")))
q_today_qty += iq
q_today_amt += iq * self._safe_float(item.get("price"))
pure_manual_q = b_tot_q - q_today_qty
pure_manual_amt = b_tot_amt - q_today_amt
if pure_manual_q >= missing_qty and pure_manual_q > 0 and pure_manual_amt > 0:
derived_price = pure_manual_amt / pure_manual_q
missing_price = round(derived_price, 4)
else: missing_price = round(b_tot_amt / b_tot_q, 4)
q_data_before.append({"date": now_est.strftime('%Y-%m-%d %H:%M:%S'), "qty": missing_qty, "price": missing_price, "exec_id": "MANUAL_SYNC"})
vrev_ledger_qty = tot_q
await self._retry_api(self.queue_ledger.overwrite_queue, ticker, q_data_before, timeout=10.0)
total_invested = sum(self._safe_float(item.get("qty")) * self._safe_float(item.get("price")) for item in q_data_before if isinstance(item, dict))
q_avg_price = total_invested / vrev_ledger_qty if vrev_ledger_qty > 0 else 0.0
curr_p = await self._retry_api(self.broker.get_current_price, ticker, timeout=15.0, default=0.0)
clear_price = actual_clear_price if actual_clear_price > 0.0 else (curr_p if curr_p and curr_p > 0 else q_avg_price * 1.006)
snapshot = await self._retry_api(self.strategy.capture_vrev_snapshot, ticker, clear_price, q_avg_price, vrev_ledger_qty, timeout=10.0, default={})
if snapshot and isinstance(snapshot, dict):
realized_pnl = self._safe_float(snapshot.get('realized_pnl', 0.0))
yield_pct = self._safe_float(snapshot.get('realized_pnl_pct', 0.0))
compound_rate = self._safe_float(await self._retry_api(self.cfg.get_compound_rate, ticker, default=70.0)) / 100.0
if realized_pnl > 0 and compound_rate > 0:
added_seed = realized_pnl * compound_rate
current_seed = self._safe_float(await self._retry_api(self.cfg.get_seed, ticker, default=6720.0))
await self._retry_api(self.cfg.set_seed, ticker, current_seed + added_seed, timeout=10.0)
cap_dt = snapshot.get('captured_at', now_est)
cap_dt_str = cap_dt if isinstance(cap_dt, str) else cap_dt.strftime('%Y-%m-%d')
start_dt_str = str(q_data_before[0].get('date', ''))[:10] if q_data_before and isinstance(q_data_before[0], dict) else cap_dt_str[:10]
hist_data = await self._retry_api(self.cfg._load_json, self.cfg.FILES["HISTORY"], [], default=[])
new_hist = {
"id": int(time.time()), "ticker": ticker, "start_date": start_dt_str, "end_date": cap_dt_str[:10],
"invested": self._safe_float(total_invested), "revenue": self._safe_float(total_invested + realized_pnl),
"profit": realized_pnl, "yield": yield_pct, "trades": q_data_before
}
hist_data.append(new_hist)
await self._retry_api(self.cfg._save_json, self.cfg.FILES["HISTORY"], hist_data, timeout=10.0)
_vrev_snap_ok = True
if getattr(self, 'queue_ledger', None):
await self._retry_api(self.queue_ledger.sync_with_broker, ticker, 0, timeout=10.0)
if _vrev_snap_ok:
msg = f"🎉 <b>[{html.escape(str(ticker))} V-REV 잭팟 스윕(전량 익절) 감지!]</b>\n▫️ 잔고가 0주가 되어 LIFO 큐 지층을 100% 소각(초기화)했습니다."
if added_seed > 0: msg += f"\n💸 <b>자동 복리 +${added_seed:,.0f}</b> 이 다음 운용 시드에 완벽하게 추가되었습니다!"
await self._safe_send(context, chat_id, msg, parse_mode='HTML')
if snapshot and isinstance(snapshot, dict):
img_path = await self._retry_api(
self.view.create_profit_image,
ticker=ticker,
profit=self._safe_float(snapshot.get('realized_pnl', 0.0)),
yield_pct=self._safe_float(snapshot.get('realized_pnl_pct', 0.0)),
invested=self._safe_float(snapshot.get('avg_price', 0.0)) * self._safe_float(snapshot.get('cleared_qty', 0)),
revenue=self._safe_float(snapshot.get('clear_price', 0.0)) * self._safe_float(snapshot.get('cleared_qty', 0)),
end_date=cap_dt_str[:10],
timeout=15.0, default=None
)
if img_path:
def _read_img3(p):
with open(p, 'rb') as f_in: return f_in.read()
try:
img_bytes3 = await self._retry_api(_read_img3, img_path)
if str(img_path).lower().endswith('.gif'): await context.bot.send_animation(chat_id=chat_id, animation=img_bytes3)
else: await context.bot.send_photo(chat_id=chat_id, photo=img_bytes3)
except OSError: pass
else:
await self._safe_send(context, chat_id, f"⚠️ <b>[{html.escape(str(ticker))} V-REV 0주 강제 정산 완료]</b>\n▫️ 0주를 확인하여 큐를 안전하게 비웠으나 통신 지연으로 졸업 카드는 생략되었습니다.", parse_mode='HTML')
return "SUCCESS"
if actual_qty == vrev_ledger_qty:
pass
elif actual_qty > 0 and actual_qty < vrev_ledger_qty:
gap_qty = vrev_ledger_qty - actual_qty
vwap_state_file = f"data/vwap_state_REV_{ticker}.json"
def _read_v_state(f_path):
with open(f_path, 'r', encoding='utf-8') as vf: return json.load(vf)
v_state = await self._retry_api(_read_v_state, vwap_state_file, default={})
if isinstance(v_state, dict) and "executed" in v_state and isinstance(v_state["executed"], dict) and "SELL_QTY" in v_state["executed"]:
old_sell_qty = v_state["executed"]["SELL_QTY"]
v_state["executed"]["SELL_QTY"] = max(0, old_sell_qty - gap_qty)
def _write_v_state(state_dict, f_path):
fd = None
tmp_path = None
try:
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(f_path) or '.')
with os.fdopen(fd, 'w', encoding='utf-8') as _vf_out:
fd = None
json.dump(state_dict, _vf_out, ensure_ascii=False, indent=4)
_vf_out.flush()
os.fsync(_vf_out.fileno())
os.replace(tmp_path, f_path)
tmp_path = None
except Exception as write_err:
if fd is not None:
try: os.close(fd)
except OSError: pass
if tmp_path:
try: os.remove(tmp_path)
except OSError: pass
raise write_err
await self._retry_api(_write_v_state, v_state, vwap_state_file, timeout=10.0)
actual_clear_price_for_sync = 0.0
if target_execs:
sell_execs_sync = [ex for ex in target_execs if ex.get('sll_buy_dvsn_cd') == "01"]
if sell_execs_sync:
t_amt = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) * self._safe_float(ex.get('ft_ccld_unpr3')) for ex in sell_execs_sync)
t_q = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in sell_execs_sync)
if t_q > 0: actual_clear_price_for_sync = round(t_amt / t_q, 4)
calibrated = False
if getattr(self, 'queue_ledger', None):
calibrated = await self._retry_api(self.queue_ledger.sync_with_broker, ticker, actual_qty, 0.0, actual_clear_price_for_sync, timeout=10.0, default=False)
if calibrated: await self._safe_send(context, chat_id, f"🔧 <b>[{html.escape(str(ticker))}] V-REV 큐(Queue) 비파괴 보정 및 리앵커링 완료!</b>\n▫️ 수동 매도 물량(<b>{gap_qty}주</b>)을 LIFO 큐에서 안전하게 차감하고, 수익금만큼 잔여 지층의 평단가를 일괄 차감했습니다.", parse_mode='HTML')
elif actual_qty > 0 and actual_qty > vrev_ledger_qty:
gap_qty = actual_qty - vrev_ledger_qty
real_buy_price = actual_avg
buy_execs = [ex for ex in (target_execs or []) if ex.get('sll_buy_dvsn_cd') == "02"]
if buy_execs:
b_tot_amt = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) * self._safe_float(ex.get('ft_ccld_unpr3')) for ex in buy_execs)
b_tot_q = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in buy_execs)
if b_tot_q > 0: real_buy_price = round(b_tot_amt / b_tot_q, 4)
if real_buy_price <= 0.0:
curr_p = await self._retry_api(self.broker.get_current_price, ticker, timeout=15.0, default=0.0)
q_data_temp = await self._retry_api(self.queue_ledger.get_queue, ticker, default=[])
last_price = self._safe_float(q_data_temp[-1].get('price')) if q_data_temp and isinstance(q_data_temp[-1], dict) else 0.0
real_buy_price = curr_p if curr_p > 0 else (last_price if last_price > 0 else 1.0)
q_data = await self._retry_api(self.queue_ledger.get_queue, ticker, default=[])
q_data.append({"date": now_est.strftime('%Y-%m-%d %H:%M:%S'), "qty": gap_qty, "price": real_buy_price, "exec_id": f"MANUAL_BUY_{int(time.time())}"})
await self._retry_api(self.queue_ledger.overwrite_queue, ticker, q_data, timeout=10.0)
await self._safe_send(context, chat_id, f"🔧 <b>[{html.escape(str(ticker))}] V-REV 큐(Queue) 수동 매수 편입 완료!</b>\n▫️ KIS 실잔고에 맞춰 신규 지층(<b>{gap_qty}주</b>, 추정단가 ${real_buy_price})을 정밀 추가했습니다.", parse_mode='HTML')
if not is_rev:
sold_today_v14 = sum(int(self._safe_float(ex.get('ft_ccld_qty'))) for ex in target_execs if ex.get('sll_buy_dvsn_cd') == "01") if target_execs else 0
if actual_qty == 0 and (ledger_qty > 0 or sold_today_v14 > 0):
if sold_today_v14 == 0 and ledger_qty > 0:
await self._safe_send(context, chat_id, f"🚨 <b>[{html.escape(str(ticker))} 유령 잔고 방어 가동]</b>\nKIS 실잔고가 0주로 조회되었으나, 당일 매도 체결 내역이 0건입니다. 통신 오류(Ghost Balance)일 가능성이 매우 높아 장부 강제 소각(자동 졸업)을 차단합니다.\n▫️ HTS 등을 통해 수동으로 100% 전량 매도한 상태라면 <code>/reset</code> 명령어를 사용하여 봇을 초기화하십시오.", parse_mode='HTML')
return "GHOST_BALANCE_BLOCKED"
today_est_str = now_est.strftime('%Y-%m-%d')
prev_c = await self._retry_api(self.broker.get_previous_close, ticker, timeout=15.0, default=0.0)
grad_res = await self._retry_api(self.cfg.archive_graduation, ticker, today_est_str, prev_c, timeout=10.0, default=(None, 0.0))
new_hist, added_seed = grad_res if isinstance(grad_res, tuple) and len(grad_res) >= 2 else (None, 0.0)
if new_hist and isinstance(new_hist, dict):
msg = f"🎉 <b>[{html.escape(str(ticker))} 졸업 확인!]</b>\n장부를 명예의 전당에 저장하고 새 사이클을 준비합니다."
if added_seed > 0: msg += f"\n💸 <b>자동 복리 +${added_seed:,.0f}</b> 이 다음 운용 시드에 완벽하게 추가되었습니다!"
await self._safe_send(context, chat_id, msg, parse_mode='HTML')
img_path = await self._retry_api(
self.view.create_profit_image,
ticker=ticker,
profit=self._safe_float(new_hist.get('profit', 0.0)),
yield_pct=self._safe_float(new_hist.get('yield', 0.0)),
invested=self._safe_float(new_hist.get('invested', 0.0)),
revenue=self._safe_float(new_hist.get('revenue', 0.0)),
end_date=new_hist.get('end_date', target_ledger_str),
timeout=15.0, default=None
)
if img_path:
def _read_img4(p):
with open(p, 'rb') as f_in: return f_in.read()
try:
img_bytes4 = await self._retry_api(_read_img4, img_path)
if str(img_path).lower().endswith('.gif'): await context.bot.send_animation(chat_id=chat_id, animation=img_bytes4)
else: await context.bot.send_photo(chat_id=chat_id, photo=img_bytes4)
except OSError: pass
else:
full_ledger2 = await self._retry_api(self.cfg.get_ledger, default=[])
all_recs = [r for r in full_ledger2 if isinstance(r, dict) and r.get('ticker') != ticker]
await self._retry_api(self.cfg._save_json, self.cfg.FILES["LEDGER"], all_recs, timeout=10.0)
await self._safe_send(context, chat_id, f"⚠️ <b>[{html.escape(str(ticker))} 강제 정산 완료]</b>\n잔고가 0주이나 마이너스 수익 상태이므로 명예의 전당 박제 없이 장부를 비우고 새출발 타점을 장전합니다.", parse_mode='HTML')
return "SUCCESS"
async def _display_ledger(self, ticker, chat_id, context, query=None, message_obj=None, pre_fetched_holdings=None):
full_ledger = await self._retry_api(self.cfg.get_ledger, default=[])
recs = [r for r in full_ledger if isinstance(r, dict) and r.get('ticker') == ticker]
report = ""
if not recs:
report += f"📭 <b>[{html.escape(str(ticker))}]</b> 현재 진행 중인 사이클이 없습니다 (보유량 0주).\n\n"
else:
from collections import OrderedDict
agg_dict = OrderedDict()
total_buy = 0.0
total_sell = 0.0
for rec in recs:
raw_date = str(rec.get('date') or '').split(' ')[0]
parts = raw_date.split('-')
if len(parts) == 3: date_short = f"{parts[1]}.{parts[2]}"
else: date_short = raw_date
side_str = "🔴매수" if rec.get('side') == 'BUY' else "🔵매도"
key = (date_short, side_str)
if key not in agg_dict: agg_dict[key] = {'qty': 0, 'amt': 0.0}
agg_dict[key]['qty'] += int(self._safe_float(rec.get('qty')))
agg_dict[key]['amt'] += (int(self._safe_float(rec.get('qty'))) * self._safe_float(rec.get('price')))
if rec.get('side') == 'BUY': total_buy += (int(self._safe_float(rec.get('qty'))) * self._safe_float(rec.get('price')))
elif rec.get('side') == 'SELL': total_sell += (int(self._safe_float(rec.get('qty'))) * self._safe_float(rec.get('price')))
report += f"📜 <b>[ {html.escape(str(ticker))} 일자별 매매 (통합 변동분) (총 {len(agg_dict)}일) ]</b>\n\n<code>No. 일자 구분 평균단가 수량\n"
report += "-"*30 + "\n"
idx = 1
for (date, side), data in agg_dict.items():
tot_qty = data['qty']
avg_prc = data['amt'] / tot_qty if tot_qty != 0 else 0.0
report += f"{idx:<3} {date} {side} ${avg_prc:<6.2f} {tot_qty}주\n"
idx += 1
report += "-"*30 + "</code>\n\n"
safe_holdings = pre_fetched_holdings if isinstance(pre_fetched_holdings, dict) else {}
actual_qty = int(self._safe_float((safe_holdings.get(ticker) or {'qty': 0}).get('qty')))
actual_avg = self._safe_float((safe_holdings.get(ticker) or {'avg': 0}).get('avg'))
v_mode = await self._retry_api(self.cfg.get_version, ticker, default="V14")
if v_mode == "V_REV":
if not getattr(self, 'queue_ledger', None):
from queue_ledger import QueueLedger
self.queue_ledger = await asyncio.to_thread(QueueLedger)
if getattr(self, 'queue_ledger', None):
q_data_ui = await self._retry_api(self.queue_ledger.get_queue, ticker, default=[])
vrev_inv_ui = sum(int(self._safe_float(item.get('qty'))) * self._safe_float(item.get('price')) for item in q_data_ui if isinstance(item, dict))
vrev_q_ui = sum(int(self._safe_float(item.get('qty'))) for item in q_data_ui if isinstance(item, dict))
if vrev_q_ui > 0: actual_avg = round(vrev_inv_ui / vrev_q_ui, 4)
else: actual_avg = 0.0
split = await self._retry_api(self.cfg.get_split_count, ticker, default=40.0)
t_val_res = await self._retry_api(self.cfg.get_absolute_t_val, ticker, actual_qty, actual_avg, default=(0.0, 0.0))
t_val = t_val_res[0] if isinstance(t_val_res, tuple) and len(t_val_res) > 0 else 0.0
t_val_safe = self._safe_float(t_val)
split_safe = int(self._safe_float(split))
report += "📊 <b>[ 현재 진행 상황 요약 ]</b>\n"
report += f"▪️ 현재 T값 : {t_val_safe:.4f} T ({split_safe}분할)\n"
report += f"▪️ 보유 수량 : {actual_qty} 주 (평단 ${actual_avg:,.2f})\n"
if recs:
report += f"▪️ 총 매수액 : ${total_buy:,.2f}\n"
report += f"▪️ 총 매도액 : ${total_sell:,.2f}"
msg = report
if len(msg) > 4000:
msg = msg[:3900] + "\n\n... (장부 내역이 너무 길어 하단이 생략되었습니다) ✂️"
active_tickers = await self._retry_api(self.cfg.get_active_tickers, default=[])
keyboard = []
if v_mode == "V_REV":
keyboard.append([InlineKeyboardButton(f"🗄️ {html.escape(str(ticker))} V-REV 큐(Queue) 정밀 관리", callback_data=f"QUEUE:VIEW:{ticker}")])
row = [InlineKeyboardButton(f"🔄 {html.escape(str(t))} 장부 업데이트", callback_data=f"REC:SYNC:{t}") for t in active_tickers if isinstance(t, str)]
if row: keyboard.append(row)
markup = InlineKeyboardMarkup(keyboard)
if query:
try: await asyncio.wait_for(query.edit_message_text(msg, reply_markup=markup, parse_mode='HTML'), timeout=15.0)
except Exception: pass
elif message_obj:
try: await asyncio.wait_for(message_obj.edit_text(msg, reply_markup=markup, parse_mode='HTML'), timeout=15.0)
except Exception: pass
else:
await self._safe_send(context, chat_id, msg, reply_markup=markup, parse_mode='HTML')