Spaces:
Runtime error
Runtime error
| import json | |
| import pyarrow as pa | |
| import pyarrow.parquet as pq | |
| import pandas as pd | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from tqdm import tqdm | |
| def process_item(item): | |
| # Read the image and convert it to byte format | |
| with open(item["image"], "rb") as img_file: | |
| img_bytes = img_file.read() | |
| record = { | |
| "image": img_bytes, | |
| "conversations": json.dumps(item["conversations"]) # Serialize as JSON string | |
| } | |
| return record | |
| # Read the JSON file | |
| with open('merged_half.json', 'r') as file: | |
| data = json.load(file) | |
| local_path = 'merged_first_half.parquet' | |
| # Get the number of CPU cores in the system | |
| cpu_count = os.cpu_count() | |
| # Process data in batches | |
| batch_size = 100000 # Can be adjusted based on actual needs | |
| num_batches = (len(data) + batch_size - 1) // batch_size | |
| # Local file path | |
| # local_path = 'final_data_4ch.parquet' | |
| # Initialize ParquetWriter | |
| with open(local_path, 'wb') as local_file: | |
| writer = None | |
| for batch_index in range(num_batches): | |
| start_index = batch_index * batch_size | |
| end_index = min((batch_index + 1) * batch_size, len(data)) | |
| batch_data = data[start_index:end_index] | |
| # Use ThreadPoolExecutor for parallel processing | |
| records = [] | |
| with ThreadPoolExecutor(max_workers=cpu_count) as executor: | |
| future_to_record = {executor.submit(process_item, item): item for item in batch_data} | |
| for future in tqdm(as_completed(future_to_record), total=len(future_to_record), | |
| desc=f"Processing Batch {batch_index + 1}/{num_batches}"): | |
| try: | |
| record = future.result() | |
| records.append(record) | |
| except Exception as exc: | |
| print(f'Generated an exception: {exc}') | |
| # Create a PyArrow table | |
| table = pa.Table.from_pandas(pd.DataFrame(records)) | |
| # If it's the first batch, set the writer and schema | |
| if writer is None: | |
| writer = pq.ParquetWriter(local_file, table.schema, version='2.6', use_dictionary=True, compression='snappy') | |
| # Write to the Parquet file in chunks | |
| for i in tqdm(range(0, len(table), 4), desc=f"Writing Batch {batch_index + 1}/{num_batches} to Parquet"): | |
| writer.write_table(table.slice(i, 4)) | |
| writer.close() | |
| print("Completed: Batches saved as Parquet files to local directory") | |