Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import time | |
| import os | |
| from pathlib import Path | |
| import subprocess | |
| from concrete.ml.deployment import FHEModelClient | |
| from requests import head | |
| import numpy | |
| import os | |
| from pathlib import Path | |
| import requests | |
| import json | |
| import base64 | |
| import subprocess | |
| import shutil | |
| import time | |
| import pandas as pd | |
| import pickle | |
| import numpy as np | |
| import pdb | |
| # This repository's directory | |
| REPO_DIR = Path(__file__).parent | |
| subprocess.Popen(["uvicorn", "server:app"], cwd=REPO_DIR) | |
| # subprocess.Popen(["uvicorn", "server:app", "--port", "3000"], cwd=REPO_DIR) | |
| # if not exists, create a directory for the FHE keys called .fhe_keys | |
| if not os.path.exists(".fhe_keys"): | |
| os.mkdir(".fhe_keys") | |
| # if not exists, create a directory for the tmp files called tmp | |
| if not os.path.exists("tmp"): | |
| os.mkdir("tmp") | |
| # Wait 4 sec for the server to start | |
| time.sleep(4) | |
| # Encrypted data limit for the browser to display | |
| # (encrypted data is too large to display in the browser) | |
| ENCRYPTED_DATA_BROWSER_LIMIT = 500 | |
| N_USER_KEY_STORED = 20 | |
| def clean_tmp_directory(): | |
| # Allow 20 user keys to be stored. | |
| # Once that limitation is reached, deleted the oldest. | |
| path_sub_directories = sorted( | |
| [f for f in Path(".fhe_keys/").iterdir() if f.is_dir()], key=os.path.getmtime | |
| ) | |
| user_ids = [] | |
| if len(path_sub_directories) > N_USER_KEY_STORED: | |
| n_files_to_delete = len(path_sub_directories) - N_USER_KEY_STORED | |
| for p in path_sub_directories[:n_files_to_delete]: | |
| user_ids.append(p.name) | |
| shutil.rmtree(p) | |
| list_files_tmp = Path("tmp/").iterdir() | |
| # Delete all files related to user_id | |
| for file in list_files_tmp: | |
| for user_id in user_ids: | |
| if file.name.endswith(f"{user_id}.npy"): | |
| file.unlink() | |
| def keygen(): | |
| # Clean tmp directory if needed | |
| clean_tmp_directory() | |
| print("Initializing FHEModelClient...") | |
| # Let's create a user_id | |
| user_id = numpy.random.randint(0, 2**32) | |
| fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{user_id}") | |
| fhe_api.load() | |
| # Generate a fresh key | |
| fhe_api.generate_private_and_evaluation_keys(force=True) | |
| evaluation_key = fhe_api.get_serialized_evaluation_keys() | |
| numpy.save(f"tmp/tmp_evaluation_key_{user_id}.npy", evaluation_key) | |
| return [list(evaluation_key)[:ENCRYPTED_DATA_BROWSER_LIMIT], user_id] | |
| def encode_quantize_encrypt(test_file, eval_key): | |
| ugly = ['Machine', 'SizeOfOptionalHeader', 'Characteristics', | |
| 'MajorLinkerVersion', 'MinorLinkerVersion', 'SizeOfCode', | |
| 'SizeOfInitializedData', 'SizeOfUninitializedData', | |
| 'AddressOfEntryPoint', 'BaseOfCode', 'BaseOfData', 'ImageBase', | |
| 'SectionAlignment', 'FileAlignment', 'MajorOperatingSystemVersion', | |
| 'MinorOperatingSystemVersion', 'MajorImageVersion', 'MinorImageVersion', | |
| 'MajorSubsystemVersion', 'MinorSubsystemVersion', 'SizeOfImage', | |
| 'SizeOfHeaders', 'CheckSum', 'Subsystem', 'DllCharacteristics', | |
| 'SizeOfStackReserve', 'SizeOfStackCommit', 'SizeOfHeapReserve', | |
| 'SizeOfHeapCommit', 'LoaderFlags', 'NumberOfRvaAndSizes', 'SectionsNb', | |
| 'SectionsMeanEntropy', 'SectionsMinEntropy', 'SectionsMaxEntropy', | |
| 'SectionsMeanRawsize', 'SectionsMinRawsize', | |
| 'SectionsMeanVirtualsize', 'SectionsMinVirtualsize', | |
| 'SectionMaxVirtualsize', 'ImportsNbDLL', 'ImportsNb', | |
| 'ImportsNbOrdinal', 'ExportNb', 'ResourcesNb', 'ResourcesMeanEntropy', | |
| 'ResourcesMinEntropy', 'ResourcesMaxEntropy', 'ResourcesMeanSize', | |
| 'ResourcesMinSize', 'ResourcesMaxSize', 'LoadConfigurationSize', | |
| 'VersionInformationSize'] | |
| fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{eval_key}") | |
| fhe_api.load() | |
| from PE_main import extract_infos | |
| # expect [1, 53] but we get (53) | |
| # pdb.set_trace() | |
| # features = pickle.loads(open(os.path.join("features.pkl"), "rb").read()) | |
| encodings = extract_infos(test_file) | |
| encodings = list(map(lambda x: encodings[x], ugly)) | |
| encodings = np.array(encodings).reshape(1, -1) | |
| quantized_encodings = fhe_api.model.quantize_input(encodings).astype(numpy.uint8) | |
| encrypted_quantized_encoding = fhe_api.quantize_encrypt_serialize(encodings) | |
| numpy.save( | |
| f"tmp/tmp_encrypted_quantized_encoding_{eval_key[1]}.npy", | |
| encrypted_quantized_encoding, | |
| ) | |
| # Compute size | |
| encrypted_quantized_encoding_shorten = list(encrypted_quantized_encoding)[:ENCRYPTED_DATA_BROWSER_LIMIT] | |
| encrypted_quantized_encoding_shorten_hex = "".join(f"{i:02x}" for i in encrypted_quantized_encoding_shorten) | |
| return (encodings[0],quantized_encodings[0],encrypted_quantized_encoding_shorten_hex) | |
| def run_fhe(user_id): | |
| encoded_data_path = Path(f"tmp/tmp_encrypted_quantized_encoding_{user_id}.npy") | |
| encrypted_quantized_encoding = numpy.load(encoded_data_path) | |
| # Read evaluation_key from the file | |
| evaluation_key = numpy.load(f"tmp/tmp_evaluation_key_{user_id}.npy") | |
| # Use base64 to encode the encodings and evaluation key | |
| encrypted_quantized_encoding = base64.b64encode(encrypted_quantized_encoding).decode() | |
| encoded_evaluation_key = base64.b64encode(evaluation_key).decode() | |
| query = {} | |
| query["evaluation_key"] = encoded_evaluation_key | |
| query["encrypted_encoding"] = encrypted_quantized_encoding | |
| headers = {"Content-type": "application/json"} | |
| response = requests.post( | |
| "http://localhost:3000/predict", | |
| data=json.dumps(query), | |
| headers=headers, | |
| ) | |
| encrypted_prediction = base64.b64decode(response.json()["encrypted_prediction"]) | |
| numpy.save(f"tmp/tmp_encrypted_prediction_{user_id}.npy", encrypted_prediction) | |
| encrypted_prediction_shorten = list(encrypted_prediction)[:ENCRYPTED_DATA_BROWSER_LIMIT] | |
| encrypted_prediction_shorten_hex = "".join(f"{i:02x}" for i in encrypted_prediction_shorten) | |
| return encrypted_prediction_shorten_hex | |
| def decrypt_prediction(user_id): | |
| encoded_data_path = Path(f"tmp/tmp_encrypted_prediction_{user_id}.npy") | |
| # Read encrypted_prediction from the file | |
| encrypted_prediction = numpy.load(encoded_data_path).tobytes() | |
| fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{user_id}") | |
| fhe_api.load() | |
| # We need to retrieve the private key that matches the client specs (see issue #18) | |
| fhe_api.generate_private_and_evaluation_keys(force=False) | |
| predictions = fhe_api.deserialize_decrypt_dequantize(encrypted_prediction) | |
| return predictions | |
| def process_pipeline(test_file): | |
| eval_key = keygen() | |
| encodings = encode_quantize_encrypt(test_file, eval_key) | |
| encrypted_quantized_encoding = run_fhe(eval_key[1]) | |
| encrypted_prediction = decrypt_prediction(eval_key[1]) | |
| return eval_key, encodings, encrypted_quantized_encoding, encrypted_prediction | |
| if __name__ == "__main__": | |
| with gr.Blocks() as demo: | |
| print("Starting the FHE Model") | |
| fn = (process_pipeline,) | |
| inputs = ( | |
| [ | |
| gr.File(label="Test File"), | |
| ], | |
| ) | |
| outputs = ( | |
| [ | |
| gr.Textbox(label="Evaluation Key"), | |
| gr.Textbox(label="Encodings"), | |
| gr.Textbox(label="Encrypted Quantized Encoding"), | |
| gr.Textbox(label="Encrypted Prediction"), | |
| ], | |
| ) | |
| title = ("FHE Model",) | |
| description = ("This is a FHE Model",) | |
| demo.launch() #share=True) | |