Trad / r2.py
Riy777's picture
Update r2.py
670fc25
raw
history blame
13.5 kB
# r2.py
import os, traceback, json, time
from datetime import datetime, timedelta
import asyncio
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID")
R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY")
BUCKET_NAME = "trading"
INITIAL_CAPITAL = 10.0
class R2Service:
def __init__(self):
try:
endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
self.s3_client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
)
self.lock_acquired = False
self.BUCKET_NAME = BUCKET_NAME
self._open_trades_warning_printed = False
self._portfolio_warning_printed = False
self._contracts_warning_printed = False
except Exception as e:
raise RuntimeError(f"Failed to initialize S3 client: {e}")
def acquire_lock(self, max_retries=3):
lock_path = "lock.txt"
for attempt in range(max_retries):
try:
try:
self.s3_client.head_object(Bucket=BUCKET_NAME, Key=lock_path)
print(f"πŸ”’ Lock file exists. Attempt {attempt + 1}/{max_retries}. Waiting...")
time.sleep(1)
except ClientError as e:
if e.response['Error']['Code'] == '404':
self.s3_client.put_object(Bucket=BUCKET_NAME, Key=lock_path, Body=b'')
self.lock_acquired = True
print("βœ… Lock acquired.")
return True
else:
raise
except Exception as e:
print(f"❌ Failed to acquire lock: {e}")
time.sleep(1)
print(f"❌ Failed to acquire lock after {max_retries} attempts.")
return False
def release_lock(self):
lock_path = "lock.txt"
if self.lock_acquired:
try:
self.s3_client.delete_object(Bucket=BUCKET_NAME, Key=lock_path)
print("βœ… Lock released.")
self.lock_acquired = False
except Exception as e:
print(f"❌ Failed to release lock: {e}")
async def save_candidates_data_async(self, candidates_data, reanalysis_data):
try:
key = "candidates_data.json"
data = {
"timestamp": datetime.now().isoformat(),
"top_candidates": candidates_data,
"reanalysis_data": reanalysis_data
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"βœ… Candidates data saved to R2: {len(candidates_data) if candidates_data else 0} candidates")
except Exception as e:
print(f"❌ Failed to save candidates data: {e}")
async def save_llm_responses_async(self, symbol, prompt, full_response, parsed_decision):
try:
key = "llm_responses.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_data = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
existing_data = {"responses": []}
else:
raise
new_response = {
"timestamp": datetime.now().isoformat(),
"symbol": symbol,
"prompt": prompt[:2000] + "..." if len(prompt) > 2000 else prompt,
"full_response": full_response,
"parsed_decision": parsed_decision
}
existing_data["responses"].append(new_response)
if len(existing_data["responses"]) > 1000:
existing_data["responses"] = existing_data["responses"][-1000:]
data_json = json.dumps(existing_data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"βœ… LLM response saved for {symbol}")
except Exception as e:
print(f"❌ Failed to save LLM response: {e}")
async def save_system_logs_async(self, log_data):
try:
key = "system_logs.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
existing_logs = json.loads(response['Body'].read())
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
existing_logs = {"logs": []}
else:
raise
log_entry = {
"timestamp": datetime.now().isoformat(),
**log_data
}
existing_logs["logs"].append(log_entry)
if len(existing_logs["logs"]) > 2000:
existing_logs["logs"] = existing_logs["logs"][-2000:]
data_json = json.dumps(existing_logs, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"βœ… System log saved: {log_data.get('cycle_started', log_data.get('cycle_completed', 'event'))}")
except Exception as e:
print(f"❌ Failed to save system logs: {e}")
async def save_learning_data_async(self, learning_data):
try:
key = "learning_data.json"
data = {
"timestamp": datetime.now().isoformat(),
"learning_data": learning_data
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print("βœ… Learning data saved to R2")
except Exception as e:
print(f"❌ Failed to save learning data: {e}")
async def load_learning_data_async(self):
try:
key = "learning_data.json"
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
data = json.loads(response['Body'].read())
print("βœ… Learning data loaded from R2")
return data
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print("⚠️ No learning data found. Starting fresh.")
return {}
else:
raise
async def get_portfolio_state_async(self):
key = "portfolio_state.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
state = json.loads(response['Body'].read())
if hasattr(self, '_portfolio_warning_printed'):
delattr(self, '_portfolio_warning_printed')
print(f"πŸ’° Portfolio state loaded: Current Capital ${state.get('current_capital_usd', 0):.2f}")
return state
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_portfolio_warning_printed'):
print(f"⚠️ No portfolio state file found. Initializing with ${INITIAL_CAPITAL:.2f}")
self._portfolio_warning_printed = True
initial_state = {
"current_capital_usd": INITIAL_CAPITAL,
"invested_capital_usd": 0.0,
"initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0,
"winning_trades": 0,
"total_profit_usd": 0.0,
"total_loss_usd": 0.0
}
await self.save_portfolio_state_async(initial_state)
return initial_state
else:
raise
async def save_portfolio_state_async(self, state):
key = "portfolio_state.json"
try:
data_json = json.dumps(state, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"πŸ’Ύ Portfolio state saved: Current Capital ${state.get('current_capital_usd', 0):.2f}")
except Exception as e:
print(f"❌ Failed to save portfolio state: {e}")
raise
async def get_open_trades_async(self):
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key="open_trades.json")
trades = json.loads(response['Body'].read())
if hasattr(self, '_open_trades_warning_printed'):
delattr(self, '_open_trades_warning_printed')
return trades
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_open_trades_warning_printed'):
print("⚠️ No open trades file found. Starting with an empty list.")
print("πŸ’‘ This is normal for first-time runs or when all trades are closed.")
self._open_trades_warning_printed = True
return []
else:
raise
async def save_open_trades_async(self, trades):
try:
data_json = json.dumps(trades, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key="open_trades.json", Body=data_json, ContentType="application/json"
)
print(f"βœ… Open trades saved to R2. Total open trades: {len(trades)}")
except Exception as e:
print(f"❌ Failed to save open trades: {e}")
raise
async def load_contracts_db_async(self):
key = "contracts.json"
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
contracts_db = json.loads(response['Body'].read())
if hasattr(self, '_contracts_warning_printed'):
delattr(self, '_contracts_warning_printed')
print(f"πŸ’Ύ Contracts database loaded from R2. Total entries: {len(contracts_db)}")
return contracts_db
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_contracts_warning_printed'):
print("⚠️ No existing contracts database found. Initializing new one.")
self._contracts_warning_printed = True
return {}
else:
raise
async def save_contracts_db_async(self, data):
key = "contracts.json"
try:
data_json = json.dumps(data, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json"
)
print(f"βœ… Contracts database saved to R2 successfully. Total entries: {len(data)}")
except Exception as e:
print(f"❌ Failed to save contracts database to R2: {e}")
raise
async def get_trade_by_symbol_async(self, symbol):
try:
open_trades = await self.get_open_trades_async()
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
return trade
return None
except Exception as e:
print(f"❌ Failed to get trade by symbol {symbol}: {e}")
return None
async def update_trade_monitoring_status_async(self, symbol, is_monitored):
try:
open_trades = await self.get_open_trades_async()
updated = False
for trade in open_trades:
if trade['symbol'] == symbol:
trade['is_monitored'] = is_monitored
updated = True
break
if updated:
await self.save_open_trades_async(open_trades)
status = "ENABLED" if is_monitored else "DISABLED"
print(f"βœ… Real-time monitoring {status} for {symbol}")
else:
print(f"⚠️ Trade {symbol} not found for monitoring status update")
return updated
except Exception as e:
print(f"❌ Failed to update monitoring status for {symbol}: {e}")
return False
async def get_monitored_trades_async(self):
try:
open_trades = await self.get_open_trades_async()
monitored_trades = [trade for trade in open_trades if trade.get('is_monitored', False)]
return monitored_trades
except Exception as e:
print(f"❌ Failed to get monitored trades: {e}")
return []
print("βœ… Enhanced R2 Service Loaded - Comprehensive Logging System")