Spaces:
				
			
			
	
			
			
					
		Running
		
			on 
			
			CPU Upgrade
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
			on 
			
			CPU Upgrade
	| import os | |
| import json | |
| import io | |
| import zipfile | |
| from google.oauth2.service_account import Credentials | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseDownload | |
| class ZipDownloader: | |
| def __init__(self, service_account_json: str): | |
| self.creds = Credentials.from_service_account_info(json.loads(service_account_json)) | |
| self.service = build("drive", "v3", credentials=self.creds) | |
| def get_drive_file_size(self, file_id: str) -> int: | |
| file_metadata = self.service.files().get(fileId=file_id, fields='size').execute() | |
| return int(file_metadata['size']) | |
| def download_zip_from_drive(self, file_id: str, output_path: str) -> str: | |
| expected_size = self.get_drive_file_size(file_id) | |
| local_zip_path = os.path.join(output_path, "downloaded.zip") | |
| os.makedirs(output_path, exist_ok=True) | |
| # Use BytesIO instead of writing directly to disk | |
| buffer = io.BytesIO() | |
| request = self.service.files().get_media(fileId=file_id) | |
| downloader = MediaIoBaseDownload(buffer, request) | |
| print(f"β¬οΈ Downloading ZIP file from Drive ID: {file_id}") | |
| done = False | |
| while not done: | |
| status, done = downloader.next_chunk() | |
| print(f" β¬ Progress: {int(status.progress() * 100)}%") | |
| buffer.seek(0) | |
| actual_size = len(buffer.getvalue()) | |
| print(f"π¦ Expected size: {expected_size} bytes") | |
| print(f"π₯ Downloaded size: {actual_size} bytes") | |
| if actual_size != expected_size: | |
| raise IOError(f"β Downloaded file is incomplete! {actual_size} < {expected_size}") | |
| with open(local_zip_path, 'wb') as f: | |
| f.write(buffer.read()) | |
| print(f"β ZIP saved to: {local_zip_path}") | |
| return local_zip_path | |
| def unzip(self, zip_path: str, extract_to: str): | |
| """ | |
| Unzips the downloaded ZIP file to a specified directory, with error handling. | |
| """ | |
| print(f"π Extracting ZIP: {zip_path} -> {extract_to}") | |
| os.makedirs(extract_to, exist_ok=True) | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| bad_file = zip_ref.testzip() | |
| if bad_file: | |
| raise zipfile.BadZipFile(f"Corrupted file inside ZIP: {bad_file}") | |
| zip_ref.extractall(extract_to) | |
| print("β Extraction complete.") | |
| except zipfile.BadZipFile as e: | |
| print(f"β ZIP file is corrupted: {e}") | |
| raise | |
| except EOFError as e: | |
| print(f"β Unexpected end of file during extraction: {e}") | |
| raise | |