Spaces:
Running
Running
| # r2.py | |
| import os, traceback, json, time | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| import boto3 | |
| from botocore.exceptions import NoCredentialsError, ClientError | |
| R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID") | |
| R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID") | |
| R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY") | |
| BUCKET_NAME = "trading" | |
| INITIAL_CAPITAL = 10.0 | |
| class R2Service: | |
| def __init__(self): | |
| try: | |
| endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com" | |
| self.s3_client = boto3.client( | |
| 's3', | |
| endpoint_url=endpoint_url, | |
| aws_access_key_id=R2_ACCESS_KEY_ID, | |
| aws_secret_access_key=R2_SECRET_ACCESS_KEY, | |
| ) | |
| self.lock_acquired = False | |
| self.BUCKET_NAME = BUCKET_NAME | |
| self._open_trades_warning_printed = False | |
| self._portfolio_warning_printed = False | |
| self._contracts_warning_printed = False | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to initialize S3 client: {e}") | |
| def acquire_lock(self, max_retries=3): | |
| lock_path = "lock.txt" | |
| for attempt in range(max_retries): | |
| try: | |
| try: | |
| self.s3_client.head_object(Bucket=BUCKET_NAME, Key=lock_path) | |
| print(f"π Lock file exists. Attempt {attempt + 1}/{max_retries}. Waiting...") | |
| time.sleep(1) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == '404': | |
| self.s3_client.put_object(Bucket=BUCKET_NAME, Key=lock_path, Body=b'') | |
| self.lock_acquired = True | |
| print("β Lock acquired.") | |
| return True | |
| else: | |
| raise | |
| except Exception as e: | |
| print(f"β Failed to acquire lock: {e}") | |
| time.sleep(1) | |
| print(f"β Failed to acquire lock after {max_retries} attempts.") | |
| return False | |
| def release_lock(self): | |
| lock_path = "lock.txt" | |
| if self.lock_acquired: | |
| try: | |
| self.s3_client.delete_object(Bucket=BUCKET_NAME, Key=lock_path) | |
| print("β Lock released.") | |
| self.lock_acquired = False | |
| except Exception as e: | |
| print(f"β Failed to release lock: {e}") | |
| async def save_candidates_data_async(self, candidates_data, reanalysis_data): | |
| try: | |
| key = "candidates_data.json" | |
| data = { | |
| "timestamp": datetime.now().isoformat(), | |
| "top_candidates": candidates_data, | |
| "reanalysis_data": reanalysis_data | |
| } | |
| data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"β Candidates data saved to R2: {len(candidates_data) if candidates_data else 0} candidates") | |
| except Exception as e: | |
| print(f"β Failed to save candidates data: {e}") | |
| async def save_llm_responses_async(self, symbol, prompt, full_response, parsed_decision): | |
| try: | |
| key = "llm_responses.json" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| existing_data = json.loads(response['Body'].read()) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| existing_data = {"responses": []} | |
| else: | |
| raise | |
| new_response = { | |
| "timestamp": datetime.now().isoformat(), | |
| "symbol": symbol, | |
| "prompt": prompt[:2000] + "..." if len(prompt) > 2000 else prompt, | |
| "full_response": full_response, | |
| "parsed_decision": parsed_decision | |
| } | |
| existing_data["responses"].append(new_response) | |
| if len(existing_data["responses"]) > 1000: | |
| existing_data["responses"] = existing_data["responses"][-1000:] | |
| data_json = json.dumps(existing_data, indent=2, ensure_ascii=False).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"β LLM response saved for {symbol}") | |
| except Exception as e: | |
| print(f"β Failed to save LLM response: {e}") | |
| async def save_system_logs_async(self, log_data): | |
| try: | |
| key = "system_logs.json" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| existing_logs = json.loads(response['Body'].read()) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| existing_logs = {"logs": []} | |
| else: | |
| raise | |
| log_entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| **log_data | |
| } | |
| existing_logs["logs"].append(log_entry) | |
| if len(existing_logs["logs"]) > 2000: | |
| existing_logs["logs"] = existing_logs["logs"][-2000:] | |
| data_json = json.dumps(existing_logs, indent=2, ensure_ascii=False).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"β System log saved: {log_data.get('cycle_started', log_data.get('cycle_completed', 'event'))}") | |
| except Exception as e: | |
| print(f"β Failed to save system logs: {e}") | |
| async def save_learning_data_async(self, learning_data): | |
| try: | |
| key = "learning_data.json" | |
| data = { | |
| "timestamp": datetime.now().isoformat(), | |
| "learning_data": learning_data | |
| } | |
| data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print("β Learning data saved to R2") | |
| except Exception as e: | |
| print(f"β Failed to save learning data: {e}") | |
| async def load_learning_data_async(self): | |
| try: | |
| key = "learning_data.json" | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| data = json.loads(response['Body'].read()) | |
| print("β Learning data loaded from R2") | |
| return data | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| print("β οΈ No learning data found. Starting fresh.") | |
| return {} | |
| else: | |
| raise | |
| async def get_portfolio_state_async(self): | |
| key = "portfolio_state.json" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| state = json.loads(response['Body'].read()) | |
| if hasattr(self, '_portfolio_warning_printed'): | |
| delattr(self, '_portfolio_warning_printed') | |
| print(f"π° Portfolio state loaded: Current Capital ${state.get('current_capital_usd', 0):.2f}") | |
| return state | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| if not hasattr(self, '_portfolio_warning_printed'): | |
| print(f"β οΈ No portfolio state file found. Initializing with ${INITIAL_CAPITAL:.2f}") | |
| self._portfolio_warning_printed = True | |
| initial_state = { | |
| "current_capital_usd": INITIAL_CAPITAL, | |
| "invested_capital_usd": 0.0, | |
| "initial_capital_usd": INITIAL_CAPITAL, | |
| "total_trades": 0, | |
| "winning_trades": 0, | |
| "total_profit_usd": 0.0, | |
| "total_loss_usd": 0.0 | |
| } | |
| await self.save_portfolio_state_async(initial_state) | |
| return initial_state | |
| else: | |
| raise | |
| async def save_portfolio_state_async(self, state): | |
| key = "portfolio_state.json" | |
| try: | |
| data_json = json.dumps(state, indent=2).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"πΎ Portfolio state saved: Current Capital ${state.get('current_capital_usd', 0):.2f}") | |
| except Exception as e: | |
| print(f"β Failed to save portfolio state: {e}") | |
| raise | |
| async def get_open_trades_async(self): | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key="open_trades.json") | |
| trades = json.loads(response['Body'].read()) | |
| if hasattr(self, '_open_trades_warning_printed'): | |
| delattr(self, '_open_trades_warning_printed') | |
| return trades | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| if not hasattr(self, '_open_trades_warning_printed'): | |
| print("β οΈ No open trades file found. Starting with an empty list.") | |
| print("π‘ This is normal for first-time runs or when all trades are closed.") | |
| self._open_trades_warning_printed = True | |
| return [] | |
| else: | |
| raise | |
| async def save_open_trades_async(self, trades): | |
| try: | |
| data_json = json.dumps(trades, indent=2).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key="open_trades.json", Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"β Open trades saved to R2. Total open trades: {len(trades)}") | |
| except Exception as e: | |
| print(f"β Failed to save open trades: {e}") | |
| raise | |
| async def load_contracts_db_async(self): | |
| key = "contracts.json" | |
| try: | |
| response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=key) | |
| contracts_db = json.loads(response['Body'].read()) | |
| if hasattr(self, '_contracts_warning_printed'): | |
| delattr(self, '_contracts_warning_printed') | |
| print(f"πΎ Contracts database loaded from R2. Total entries: {len(contracts_db)}") | |
| return contracts_db | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| if not hasattr(self, '_contracts_warning_printed'): | |
| print("β οΈ No existing contracts database found. Initializing new one.") | |
| self._contracts_warning_printed = True | |
| return {} | |
| else: | |
| raise | |
| async def save_contracts_db_async(self, data): | |
| key = "contracts.json" | |
| try: | |
| data_json = json.dumps(data, indent=2).encode('utf-8') | |
| self.s3_client.put_object( | |
| Bucket=BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json" | |
| ) | |
| print(f"β Contracts database saved to R2 successfully. Total entries: {len(data)}") | |
| except Exception as e: | |
| print(f"β Failed to save contracts database to R2: {e}") | |
| raise | |
| async def get_trade_by_symbol_async(self, symbol): | |
| try: | |
| open_trades = await self.get_open_trades_async() | |
| for trade in open_trades: | |
| if trade['symbol'] == symbol and trade['status'] == 'OPEN': | |
| return trade | |
| return None | |
| except Exception as e: | |
| print(f"β Failed to get trade by symbol {symbol}: {e}") | |
| return None | |
| async def update_trade_monitoring_status_async(self, symbol, is_monitored): | |
| try: | |
| open_trades = await self.get_open_trades_async() | |
| updated = False | |
| for trade in open_trades: | |
| if trade['symbol'] == symbol: | |
| trade['is_monitored'] = is_monitored | |
| updated = True | |
| break | |
| if updated: | |
| await self.save_open_trades_async(open_trades) | |
| status = "ENABLED" if is_monitored else "DISABLED" | |
| print(f"β Real-time monitoring {status} for {symbol}") | |
| else: | |
| print(f"β οΈ Trade {symbol} not found for monitoring status update") | |
| return updated | |
| except Exception as e: | |
| print(f"β Failed to update monitoring status for {symbol}: {e}") | |
| return False | |
| async def get_monitored_trades_async(self): | |
| try: | |
| open_trades = await self.get_open_trades_async() | |
| monitored_trades = [trade for trade in open_trades if trade.get('is_monitored', False)] | |
| return monitored_trades | |
| except Exception as e: | |
| print(f"β Failed to get monitored trades: {e}") | |
| return [] | |
| print("β Enhanced R2 Service Loaded - Comprehensive Logging System") |