Trad / r2.py
Riy777's picture
Update r2.py
87db251
raw
history blame
16.6 kB
# r2.py (ู…ุญุฏุซ)
import os, traceback, json, time
from datetime import datetime, timedelta
import asyncio
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID")
R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY")
BUCKET_NAME = "trading"
INITIAL_CAPITAL = 10.0
class R2Service:
def __init__(self):
try:
endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
self.s3_client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
)
self.lock_acquired = False
self.BUCKET_NAME = BUCKET_NAME
self._open_trades_warning_printed = False
self._portfolio_warning_printed = False
self._contracts_warning_printed = False
except Exception as e:
raise RuntimeError(f"Failed to initialize S3 client: {e}")
def acquire_lock(self, max_retries=3):
lock_path = "lock.txt"
for attempt in range(max_retries):
try:
try:
self.s3_client.head_object(Bucket=BUCKET_NAME, Key=lock_path)
print(f"๐Ÿ”’ Lock file exists. Attempt {attempt + 1}/{max_retries}. Waiting...")
time.sleep(1)
except ClientError as e:
if e.response['Error']['Code'] == '404':
self.s3_client.put_object(Bucket=BUCKET_NAME, Key=lock_path, Body=b'')
self.lock_acquired = True
print("โœ… Lock acquired.")
return True
else:
raise
except Exception as e:
print(f"โŒ Failed to acquire lock: {e}")
time.sleep(1)
print(f"โŒ Failed to acquire lock after {max_retries} attempts.")
return False
def release_lock(self):
lock_path = "lock.txt"
if self.lock_acquired:
try:
self.s3_client.delete_object(Bucket=BUCKET_NAME, Key=lock_path)
print("โœ… Lock released.")
self.lock_acquired = False
except Exception as e:
print(f"โŒ Failed to release lock: {e}")
async def save_candidates_async(self, candidates):
"""ุญูุธ ุจูŠุงู†ุงุช ุงู„ู…ุฑุดุญูŠู† ุงู„ุนุดุฑุฉ ููŠ ู…ู„ู ู…ู†ูุตู„ ููŠ R2"""
try:
key = "Candidates.json"
data = {
"timestamp": datetime.now().isoformat(),
"total_candidates": len(candidates),
"candidates": candidates
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"โœ… ุชู… ุญูุธ {len(candidates)} ู…ุฑุดุญ ููŠ ู…ู„ู Candidates ููŠ R2")
# ุนุฑุถ ู…ุนู„ูˆู…ุงุช ุงู„ู…ุฑุดุญูŠู† ุงู„ู…ุญููˆุธูŠู†
print("๐Ÿ“Š ุงู„ู…ุฑุดุญูˆู† ุงู„ู…ุญููˆุธูˆู†:")
for i, candidate in enumerate(candidates):
symbol = candidate.get('symbol', 'Unknown')
score = candidate.get('enhanced_final_score', 0)
strategy = candidate.get('target_strategy', 'GENERIC')
print(f" {i+1}. {symbol}: {score:.3f} - {strategy}")
except Exception as e:
print(f"โŒ ูุดู„ ุญูุธ ุงู„ู…ุฑุดุญูŠู† ููŠ R2: {e}")
async def load_candidates_async(self):
"""ุชุญู…ูŠู„ ุจูŠุงู†ุงุช ุงู„ู…ุฑุดุญูŠู† ู…ู† R2"""
try:
key = "Candidates.json"
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
data = json.loads(response['Body'].read())
candidates = data.get('candidates', [])
print(f"โœ… ุชู… ุชุญู…ูŠู„ {len(candidates)} ู…ุฑุดุญ ู…ู† R2")
return candidates
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print("โš ๏ธ ู„ุง ูŠูˆุฌุฏ ู…ู„ู ู…ุฑุดุญูŠู† ุณุงุจู‚")
return []
else:
raise
async def save_llm_prompts_async(self, symbol, prompt_type, prompt_content, analysis_data=None):
"""ุญูุธ ุงู„ู€ Prompts ุงู„ู…ุฑุณู„ุฉ ุฅู„ู‰ ุงู„ู†ู…ูˆุฐุฌ ุงู„ุถุฎู…"""
try:
key = "llm_prompts.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_data = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
existing_data = {"prompts": []}
else:
raise
new_prompt = {
"timestamp": datetime.now().isoformat(),
"symbol": symbol,
"prompt_type": prompt_type, # 'trading_decision' or 'trade_reanalysis'
"prompt_content": prompt_content,
"analysis_data": analysis_data
}
existing_data["prompts"].append(new_prompt)
if len(existing_data["prompts"]) > 2000:
existing_data["prompts"] = existing_data["prompts"][-2000:]
data_json = json.dumps(existing_data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"โœ… ุชู… ุญูุธ prompt ู„ู€ {symbol} ({prompt_type}) ููŠ R2")
except Exception as e:
print(f"โŒ ูุดู„ ุญูุธ prompt ู„ู€ {symbol}: {e}")
async def save_system_logs_async(self, log_data):
try:
key = "system_logs.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_logs = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
existing_logs = {"logs": []}
else:
raise
log_entry = {
"timestamp": datetime.now().isoformat(),
**log_data
}
existing_logs["logs"].append(log_entry)
if len(existing_logs["logs"]) > 2000:
existing_logs["logs"] = existing_logs["logs"][-2000:]
data_json = json.dumps(existing_logs, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"โœ… System log saved: {log_data.get('cycle_started', log_data.get('cycle_completed', 'event'))}")
except Exception as e:
print(f"โŒ Failed to save system logs: {e}")
async def save_learning_data_async(self, learning_data):
try:
key = "learning_data.json"
data = {
"timestamp": datetime.now().isoformat(),
"learning_data": learning_data
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print("โœ… Learning data saved to R2")
except Exception as e:
print(f"โŒ Failed to save learning data: {e}")
async def load_learning_data_async(self):
try:
key = "learning_data.json"
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
data = json.loads(response['Body'].read())
print("โœ… Learning data loaded from R2")
return data
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print("โš ๏ธ No learning data found. Starting fresh.")
return {}
else:
raise
async def get_portfolio_state_async(self):
key = "portfolio_state.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
state = json.loads(response['Body'].read())
if hasattr(self, '_portfolio_warning_printed'):
delattr(self, '_portfolio_warning_printed')
print(f"๐Ÿ’ฐ Portfolio state loaded: Current Capital ${state.get('current_capital_usd', 0):.2f}")
return state
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_portfolio_warning_printed'):
print(f"โš ๏ธ No portfolio state file found. Initializing with ${INITIAL_CAPITAL:.2f}")
self._portfolio_warning_printed = True
initial_state = {
"current_capital_usd": INITIAL_CAPITAL,
"invested_capital_usd": 0.0,
"initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0,
"winning_trades": 0,
"total_profit_usd": 0.0,
"total_loss_usd": 0.0
}
await self.save_portfolio_state_async(initial_state)
return initial_state
else:
raise
async def save_portfolio_state_async(self, state):
key = "portfolio_state.json"
try:
data_json = json.dumps(state, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"๐Ÿ’พ Portfolio state saved: Current Capital ${state.get('current_capital_usd', 0):.2f}")
except Exception as e:
print(f"โŒ Failed to save portfolio state: {e}")
raise
async def get_open_trades_async(self):
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key="open_trades.json")
trades = json.loads(response['Body'].read())
if hasattr(self, '_open_trades_warning_printed'):
delattr(self, '_open_trades_warning_printed')
return trades
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_open_trades_warning_printed'):
print("โš ๏ธ No open trades file found. Starting with an empty list.")
print("๐Ÿ’ก This is normal for first-time runs or when all trades are closed.")
self._open_trades_warning_printed = True
return []
else:
raise
async def save_open_trades_async(self, trades):
try:
data_json = json.dumps(trades, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key="open_trades.json", Body=data_json, ContentType="application/json"
)
print(f"โœ… Open trades saved to R2. Total open trades: {len(trades)}")
except Exception as e:
print(f"โŒ Failed to save open trades: {e}")
raise
async def load_contracts_db_async(self):
key = "contracts.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
contracts_db = json.loads(response['Body'].read())
if hasattr(self, '_contracts_warning_printed'):
delattr(self, '_contracts_warning_printed')
print(f"๐Ÿ’พ Contracts database loaded from R2. Total entries: {len(contracts_db)}")
return contracts_db
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_contracts_warning_printed'):
print("โš ๏ธ No existing contracts database found. Initializing new one.")
self._contracts_warning_printed = True
return {}
else:
raise
async def save_contracts_db_async(self, data):
key = "contracts.json"
try:
data_json = json.dumps(data, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"โœ… Contracts database saved to R2 successfully. Total entries: {len(data)}")
except Exception as e:
print(f"โŒ Failed to save contracts database to R2: {e}")
raise
async def get_trade_by_symbol_async(self, symbol):
try:
open_trades = await self.get_open_trades_async()
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
return trade
return None
except Exception as e:
print(f"โŒ Failed to get trade by symbol {symbol}: {e}")
return None
async def update_trade_monitoring_status_async(self, symbol, is_monitored):
try:
open_trades = await self.get_open_trades_async()
updated = False
for trade in open_trades:
if trade['symbol'] == symbol:
trade['is_monitored'] = is_monitored
updated = True
break
if updated:
await self.save_open_trades_async(open_trades)
status = "ENABLED" if is_monitored else "DISABLED"
print(f"โœ… Real-time monitoring {status} for {symbol}")
else:
print(f"โš ๏ธ Trade {symbol} not found for monitoring status update")
return updated
except Exception as e:
print(f"โŒ Failed to update monitoring status for {symbol}: {e}")
return False
async def get_monitored_trades_async(self):
try:
open_trades = await self.get_open_trades_async()
monitored_trades = [trade for trade in open_trades if trade.get('is_monitored', False)]
return monitored_trades
except Exception as e:
print(f"โŒ Failed to get monitored trades: {e}")
return []
#
# ๐Ÿ”ด ุฏุงู„ุฉ ุฌุฏูŠุฏุฉ: ู„ุญูุธ ุณุฌู„ ุชุฏู‚ูŠู‚ ุงู„ุชุญู„ูŠู„
#
async def save_analysis_audit_log_async(self, audit_data):
"""ุญูุธ ุณุฌู„ ุชุฏู‚ูŠู‚ ุฏูˆุฑุฉ ุงู„ุชุญู„ูŠู„ (ูŠุญุชูุธ ุจุขุฎุฑ 50 ุฏูˆุฑุฉ)"""
try:
key = "analysis_audit_log.json"
# 1. ุฌู„ุจ ุงู„ุณุฌู„ ุงู„ุญุงู„ูŠ (ุฅู† ูˆุฌุฏ)
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_log_data = json.loads(response['Body'].read())
if isinstance(existing_log_data, list):
history = existing_log_data
else:
history = [] # ุจุฏุก ุณุฌู„ ุฌุฏูŠุฏ ุฅุฐุง ูƒุงู† ุงู„ุชู†ุณูŠู‚ ุบูŠุฑ ุตุงู„ุญ
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
history = [] # ู…ู„ู ุฌุฏูŠุฏ
else:
raise
# 2. ุฅุถุงูุฉ ุงู„ุฏูˆุฑุฉ ุงู„ุญุงู„ูŠุฉ
history.append(audit_data)
# 3. ุงู„ุญูุงุธ ุนู„ู‰ ุขุฎุฑ 50 ุณุฌู„ ูู‚ุท
if len(history) > 50:
history = history[-50:]
# 4. ุญูุธ ุงู„ู…ู„ู ุงู„ู…ุญุฏุซ
data_json = json.dumps(history, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"๐Ÿ“Š ุชู… ุญูุธ ุณุฌู„ ุชุฏู‚ูŠู‚ ุงู„ุชุญู„ูŠู„ ุจู†ุฌุงุญ ููŠ R2 (ุฅุฌู…ุงู„ูŠ {len(history)} ุณุฌู„ุงุช)")
except Exception as e:
print(f"โŒ ูุดู„ ุญูุธ ุณุฌู„ ุชุฏู‚ูŠู‚ ุงู„ุชุญู„ูŠู„ ููŠ R2: {e}")
print("โœ… Enhanced R2 Service Loaded - Comprehensive Logging System with Candidates Support")