Spaces:
Paused
Paused
| """ | |
| R2 Storage integration for OpenManus | |
| Provides interface to Cloudflare R2 storage operations | |
| """ | |
| import io | |
| from typing import Any, BinaryIO, Dict, List, Optional | |
| from app.logger import logger | |
| from .client import CloudflareClient, CloudflareError | |
| class R2Storage: | |
| """Cloudflare R2 Storage client""" | |
| def __init__( | |
| self, | |
| client: CloudflareClient, | |
| storage_bucket: str, | |
| assets_bucket: Optional[str] = None, | |
| ): | |
| self.client = client | |
| self.storage_bucket = storage_bucket | |
| self.assets_bucket = assets_bucket or storage_bucket | |
| self.base_endpoint = f"accounts/{client.account_id}/r2/buckets" | |
| def _get_bucket_name(self, bucket_type: str = "storage") -> str: | |
| """Get bucket name based on type""" | |
| if bucket_type == "assets": | |
| return self.assets_bucket | |
| return self.storage_bucket | |
| async def upload_file( | |
| self, | |
| key: str, | |
| file_data: bytes, | |
| content_type: str = "application/octet-stream", | |
| bucket_type: str = "storage", | |
| metadata: Optional[Dict[str, str]] = None, | |
| use_worker: bool = True, | |
| ) -> Dict[str, Any]: | |
| """Upload a file to R2""" | |
| bucket_name = self._get_bucket_name(bucket_type) | |
| try: | |
| if use_worker: | |
| # Use worker endpoint for better performance | |
| form_data = { | |
| "file": file_data, | |
| "bucket": bucket_type, | |
| "key": key, | |
| "contentType": content_type, | |
| } | |
| if metadata: | |
| form_data["metadata"] = metadata | |
| response = await self.client.post( | |
| "api/files", data=form_data, use_worker=True | |
| ) | |
| else: | |
| # Use R2 API directly | |
| headers = {"Content-Type": content_type} | |
| if metadata: | |
| for k, v in metadata.items(): | |
| headers[f"x-amz-meta-{k}"] = v | |
| response = await self.client.upload_file( | |
| f"{self.base_endpoint}/{bucket_name}/objects/{key}", | |
| file_data, | |
| content_type, | |
| headers, | |
| ) | |
| return { | |
| "success": True, | |
| "key": key, | |
| "bucket": bucket_type, | |
| "bucket_name": bucket_name, | |
| "size": len(file_data), | |
| "content_type": content_type, | |
| "url": f"/{bucket_type}/{key}", | |
| **response, | |
| } | |
| except CloudflareError as e: | |
| logger.error(f"R2 upload failed: {e}") | |
| raise | |
| async def upload_file_stream( | |
| self, | |
| key: str, | |
| file_stream: BinaryIO, | |
| content_type: str = "application/octet-stream", | |
| bucket_type: str = "storage", | |
| metadata: Optional[Dict[str, str]] = None, | |
| ) -> Dict[str, Any]: | |
| """Upload a file from stream""" | |
| file_data = file_stream.read() | |
| return await self.upload_file( | |
| key, file_data, content_type, bucket_type, metadata | |
| ) | |
| async def get_file( | |
| self, key: str, bucket_type: str = "storage", use_worker: bool = True | |
| ) -> Optional[Dict[str, Any]]: | |
| """Get a file from R2""" | |
| bucket_name = self._get_bucket_name(bucket_type) | |
| try: | |
| if use_worker: | |
| response = await self.client.get( | |
| f"api/files/{key}?bucket={bucket_type}", use_worker=True | |
| ) | |
| if response: | |
| return { | |
| "key": key, | |
| "bucket": bucket_type, | |
| "bucket_name": bucket_name, | |
| "data": response, # Binary data would be handled by worker | |
| "exists": True, | |
| } | |
| else: | |
| response = await self.client.get( | |
| f"{self.base_endpoint}/{bucket_name}/objects/{key}" | |
| ) | |
| return { | |
| "key": key, | |
| "bucket": bucket_type, | |
| "bucket_name": bucket_name, | |
| "data": response, | |
| "exists": True, | |
| } | |
| except CloudflareError as e: | |
| if e.status_code == 404: | |
| return None | |
| logger.error(f"R2 get file failed: {e}") | |
| raise | |
| return None | |
| async def delete_file( | |
| self, key: str, bucket_type: str = "storage", use_worker: bool = True | |
| ) -> Dict[str, Any]: | |
| """Delete a file from R2""" | |
| bucket_name = self._get_bucket_name(bucket_type) | |
| try: | |
| if use_worker: | |
| response = await self.client.delete( | |
| f"api/files/{key}?bucket={bucket_type}", use_worker=True | |
| ) | |
| else: | |
| response = await self.client.delete( | |
| f"{self.base_endpoint}/{bucket_name}/objects/{key}" | |
| ) | |
| return { | |
| "success": True, | |
| "key": key, | |
| "bucket": bucket_type, | |
| "bucket_name": bucket_name, | |
| **response, | |
| } | |
| except CloudflareError as e: | |
| logger.error(f"R2 delete failed: {e}") | |
| raise | |
| async def list_files( | |
| self, | |
| bucket_type: str = "storage", | |
| prefix: str = "", | |
| limit: int = 1000, | |
| use_worker: bool = True, | |
| ) -> Dict[str, Any]: | |
| """List files in R2 bucket""" | |
| bucket_name = self._get_bucket_name(bucket_type) | |
| try: | |
| if use_worker: | |
| params = {"bucket": bucket_type, "prefix": prefix, "limit": limit} | |
| query_string = "&".join([f"{k}={v}" for k, v in params.items() if v]) | |
| response = await self.client.get( | |
| f"api/files/list?{query_string}", use_worker=True | |
| ) | |
| else: | |
| params = {"prefix": prefix, "max-keys": limit} | |
| query_string = "&".join([f"{k}={v}" for k, v in params.items() if v]) | |
| response = await self.client.get( | |
| f"{self.base_endpoint}/{bucket_name}/objects?{query_string}" | |
| ) | |
| return { | |
| "bucket": bucket_type, | |
| "bucket_name": bucket_name, | |
| "prefix": prefix, | |
| "files": response.get("objects", []), | |
| "truncated": response.get("truncated", False), | |
| **response, | |
| } | |
| except CloudflareError as e: | |
| logger.error(f"R2 list files failed: {e}") | |
| raise | |
| async def get_file_metadata( | |
| self, key: str, bucket_type: str = "storage", use_worker: bool = True | |
| ) -> Optional[Dict[str, Any]]: | |
| """Get file metadata without downloading content""" | |
| bucket_name = self._get_bucket_name(bucket_type) | |
| try: | |
| if use_worker: | |
| response = await self.client.get( | |
| f"api/files/{key}/metadata?bucket={bucket_type}", use_worker=True | |
| ) | |
| else: | |
| # Use HEAD request to get metadata only | |
| response = await self.client.get( | |
| f"{self.base_endpoint}/{bucket_name}/objects/{key}", | |
| headers={"Range": "bytes=0-0"}, # Minimal range to get headers | |
| ) | |
| if response: | |
| return { | |
| "key": key, | |
| "bucket": bucket_type, | |
| "bucket_name": bucket_name, | |
| **response, | |
| } | |
| except CloudflareError as e: | |
| if e.status_code == 404: | |
| return None | |
| logger.error(f"R2 get metadata failed: {e}") | |
| raise | |
| return None | |
| async def copy_file( | |
| self, | |
| source_key: str, | |
| destination_key: str, | |
| source_bucket: str = "storage", | |
| destination_bucket: str = "storage", | |
| use_worker: bool = True, | |
| ) -> Dict[str, Any]: | |
| """Copy a file within R2 or between buckets""" | |
| try: | |
| if use_worker: | |
| copy_data = { | |
| "sourceKey": source_key, | |
| "destinationKey": destination_key, | |
| "sourceBucket": source_bucket, | |
| "destinationBucket": destination_bucket, | |
| } | |
| response = await self.client.post( | |
| "api/files/copy", data=copy_data, use_worker=True | |
| ) | |
| else: | |
| # Get source file first | |
| source_file = await self.get_file(source_key, source_bucket, False) | |
| if not source_file: | |
| raise CloudflareError(f"Source file {source_key} not found") | |
| # Upload to destination | |
| response = await self.upload_file( | |
| destination_key, | |
| source_file["data"], | |
| bucket_type=destination_bucket, | |
| use_worker=False, | |
| ) | |
| return { | |
| "success": True, | |
| "source_key": source_key, | |
| "destination_key": destination_key, | |
| "source_bucket": source_bucket, | |
| "destination_bucket": destination_bucket, | |
| **response, | |
| } | |
| except CloudflareError as e: | |
| logger.error(f"R2 copy failed: {e}") | |
| raise | |
| async def move_file( | |
| self, | |
| source_key: str, | |
| destination_key: str, | |
| source_bucket: str = "storage", | |
| destination_bucket: str = "storage", | |
| use_worker: bool = True, | |
| ) -> Dict[str, Any]: | |
| """Move a file (copy then delete)""" | |
| try: | |
| # Copy file first | |
| copy_result = await self.copy_file( | |
| source_key, | |
| destination_key, | |
| source_bucket, | |
| destination_bucket, | |
| use_worker, | |
| ) | |
| # Delete source file | |
| delete_result = await self.delete_file( | |
| source_key, source_bucket, use_worker | |
| ) | |
| return { | |
| "success": True, | |
| "source_key": source_key, | |
| "destination_key": destination_key, | |
| "source_bucket": source_bucket, | |
| "destination_bucket": destination_bucket, | |
| "copy_result": copy_result, | |
| "delete_result": delete_result, | |
| } | |
| except CloudflareError as e: | |
| logger.error(f"R2 move failed: {e}") | |
| raise | |
| async def generate_presigned_url( | |
| self, | |
| key: str, | |
| bucket_type: str = "storage", | |
| expires_in: int = 3600, | |
| method: str = "GET", | |
| ) -> Dict[str, Any]: | |
| """Generate a presigned URL for direct access""" | |
| # Note: This would typically require additional R2 configuration | |
| # For now, return a worker endpoint URL | |
| try: | |
| url_data = { | |
| "key": key, | |
| "bucket": bucket_type, | |
| "expiresIn": expires_in, | |
| "method": method, | |
| } | |
| response = await self.client.post( | |
| "api/files/presigned-url", data=url_data, use_worker=True | |
| ) | |
| return { | |
| "success": True, | |
| "key": key, | |
| "bucket": bucket_type, | |
| "method": method, | |
| "expires_in": expires_in, | |
| **response, | |
| } | |
| except CloudflareError as e: | |
| logger.error(f"R2 presigned URL generation failed: {e}") | |
| raise | |
| async def get_storage_stats(self, use_worker: bool = True) -> Dict[str, Any]: | |
| """Get storage statistics""" | |
| try: | |
| if use_worker: | |
| response = await self.client.get("api/files/stats", use_worker=True) | |
| else: | |
| # Get stats for both buckets | |
| storage_list = await self.list_files("storage", use_worker=False) | |
| assets_list = await self.list_files("assets", use_worker=False) | |
| storage_size = sum( | |
| file.get("size", 0) for file in storage_list.get("files", []) | |
| ) | |
| assets_size = sum( | |
| file.get("size", 0) for file in assets_list.get("files", []) | |
| ) | |
| response = { | |
| "storage": { | |
| "file_count": len(storage_list.get("files", [])), | |
| "total_size": storage_size, | |
| }, | |
| "assets": { | |
| "file_count": len(assets_list.get("files", [])), | |
| "total_size": assets_size, | |
| }, | |
| "total": { | |
| "file_count": len(storage_list.get("files", [])) | |
| + len(assets_list.get("files", [])), | |
| "total_size": storage_size + assets_size, | |
| }, | |
| } | |
| return response | |
| except CloudflareError as e: | |
| logger.error(f"R2 storage stats failed: {e}") | |
| raise | |
| def create_file_stream(self, data: bytes) -> io.BytesIO: | |
| """Create a file stream from bytes""" | |
| return io.BytesIO(data) | |
| def get_public_url(self, key: str, bucket_type: str = "storage") -> str: | |
| """Get public URL for a file (if bucket is configured for public access)""" | |
| bucket_name = self._get_bucket_name(bucket_type) | |
| # This would depend on your R2 custom domain configuration | |
| # For now, return the worker endpoint | |
| if self.client.worker_url: | |
| return f"{self.client.worker_url}/api/files/{key}?bucket={bucket_type}" | |
| # Default R2 URL format (requires public access configuration) | |
| return f"https://pub-{bucket_name}.r2.dev/{key}" | |