Spaces:
Running
Running
| import datetime | |
| import time | |
| import sys | |
| import json | |
| import traceback | |
| import requests | |
| import zipfile | |
| import uuid | |
| import os | |
| import re | |
| import subprocess | |
| import concurrent.futures | |
| import threading | |
| from io import StringIO, BytesIO | |
| from typing import List, Dict, Any | |
| import pandas as pd | |
| import numpy as np | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| # Caractères pour le formatage des versions | |
| chars = "0123456789abcdefghijklmnopqrstuvwxyz" | |
| # Verrous pour les opérations thread-safe | |
| print_lock = threading.Lock() | |
| dict_lock = threading.Lock() | |
| scope_lock = threading.Lock() | |
| # Dictionnaires globaux | |
| indexed_specifications = {} | |
| scopes_by_spec_num = {} | |
| processed_count = 0 | |
| total_count = 0 | |
| def get_text(specification: str, version: str): | |
| """Récupère les bytes du PDF à partir d'une spécification et d'une version.""" | |
| doc_id = specification | |
| series = doc_id.split(".")[0] | |
| response = requests.get( | |
| f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", | |
| verify=False, | |
| headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} | |
| ) | |
| if response.status_code != 200: | |
| raise Exception(f"Téléchargement du ZIP échoué pour {specification}-{version}") | |
| zip_bytes = BytesIO(response.content) | |
| with zipfile.ZipFile(zip_bytes) as zf: | |
| for file_name in zf.namelist(): | |
| if file_name.endswith("zip"): | |
| print("Another ZIP !") | |
| zip_bytes = BytesIO(zf.read(file_name)) | |
| zf = zipfile.ZipFile(zip_bytes) | |
| for file_name2 in zf.namelist(): | |
| if file_name2.endswith("doc") or file_name2.endswith("docx"): | |
| if "cover" in file_name2.lower(): | |
| print("COVER !") | |
| continue | |
| ext = file_name2.split(".")[-1] | |
| doc_bytes = zf.read(file_name2) | |
| temp_id = str(uuid.uuid4()) | |
| input_path = f"/tmp/{temp_id}.{ext}" | |
| output_path = f"/tmp/{temp_id}.txt" | |
| with open(input_path, "wb") as f: | |
| f.write(doc_bytes) | |
| subprocess.run([ | |
| "libreoffice", | |
| "--headless", | |
| "--convert-to", "txt", | |
| "--outdir", "/tmp", | |
| input_path | |
| ], check=True) | |
| with open(output_path, "r") as f: | |
| txt_data = [line.strip() for line in f if line.strip()] | |
| os.remove(input_path) | |
| os.remove(output_path) | |
| return txt_data | |
| elif file_name.endswith("doc") or file_name.endswith("docx"): | |
| if "cover" in file_name.lower(): | |
| print("COVER !") | |
| continue | |
| ext = file_name.split(".")[-1] | |
| doc_bytes = zf.read(file_name) | |
| temp_id = str(uuid.uuid4()) | |
| input_path = f"/tmp/{temp_id}.{ext}" | |
| output_path = f"/tmp/{temp_id}.txt" | |
| print("Ecriture") | |
| with open(input_path, "wb") as f: | |
| f.write(doc_bytes) | |
| print("Convertissement") | |
| subprocess.run([ | |
| "libreoffice", | |
| "--headless", | |
| "--convert-to", "txt", | |
| "--outdir", "/tmp", | |
| input_path | |
| ], check=True) | |
| print("Ecriture TXT") | |
| with open(output_path, "r", encoding="utf-8") as f: | |
| txt_data = [line.strip() for line in f if line.strip()] | |
| os.remove(input_path) | |
| os.remove(output_path) | |
| return txt_data | |
| raise Exception(f"Aucun fichier .doc/.docx trouvé dans le ZIP pour {specification}-{version}") | |
| def get_scope(specification: str, version: str): | |
| try: | |
| spec_text = get_text(specification, version) | |
| scp_i = 0 | |
| nxt_i = 0 | |
| for x in range(len(spec_text)): | |
| text = spec_text[x] | |
| if re.search(r"scope$", text, flags=re.IGNORECASE): | |
| scp_i = x | |
| nxt_i = scp_i + 10 | |
| if re.search(r"references$", text, flags=re.IGNORECASE): | |
| nxt_i = x | |
| return re.sub(r"\s+", " ", " ".join(spec_text[scp_i+1:nxt_i])) if len(spec_text[scp_i+1:nxt_i]) < 2 else "Not found" | |
| except Exception as e: | |
| traceback.print_exception(e) | |
| return "Not found (error)" | |
| def process_specification(spec: Dict[str, Any], columns: List[str]) -> None: | |
| """Traite une spécification individuelle avec multithreading.""" | |
| global processed_count, indexed_specifications, scopes_by_spec_num | |
| try: | |
| if spec.get('vers', None) is None: | |
| return | |
| doc_id = str(spec["spec_num"]) | |
| series = doc_id.split(".")[0] | |
| a, b, c = str(spec["vers"]).split(".") | |
| # Formatage de l'URL selon la version | |
| if not (int(a) > 35 or int(b) > 35 or int(c) > 35): | |
| version_code = f"{chars[int(a)]}{chars[int(b)]}{chars[int(c)]}" | |
| spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" | |
| else: | |
| x, y, z = str(a), str(b), str(c) | |
| while len(x) < 2: | |
| x = "0" + x | |
| while len(y) < 2: | |
| y = "0" + y | |
| while len(z) < 2: | |
| z = "0" + z | |
| version_code = f"{x}{y}{z}" | |
| spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" | |
| string = f"{spec['spec_num']}+-+{spec['title']}+-+{spec['type']}+-+{spec['vers']}+-+{spec['WG']}+-+Rel-{spec['vers'].split('.')[0]}" | |
| metadata = { | |
| "id": str(spec["spec_num"]), | |
| "title": spec["title"], | |
| "type": spec["type"], | |
| "release": str(spec["vers"].split(".")[0]), | |
| "version": str(spec["vers"]), | |
| "working_group": spec["WG"], | |
| "url": spec_url | |
| } | |
| # Vérification si le scope existe déjà pour ce numéro de spécification | |
| spec_num = str(spec["spec_num"]) | |
| with scope_lock: | |
| if spec_num in scopes_by_spec_num: | |
| # Réutilisation du scope existant | |
| metadata["scope"] = scopes_by_spec_num[spec_num] | |
| with print_lock: | |
| print(f"\nRéutilisation du scope pour {spec_num}") | |
| else: | |
| # Extraction du scope seulement si nécessaire | |
| if not (int(a) > 35 or int(b) > 35 or int(c) > 35): | |
| version_for_scope = f"{chars[int(a)]}{chars[int(b)]}{chars[int(c)]}" | |
| else: | |
| version_for_scope = version_code | |
| with print_lock: | |
| print(f"\nExtraction du scope pour {spec_num} (version {version_for_scope})") | |
| try: | |
| scope = get_scope(metadata["id"], version_for_scope) | |
| # Stockage du scope pour une utilisation future | |
| scopes_by_spec_num[spec_num] = scope | |
| metadata["scope"] = scope | |
| except Exception as e: | |
| error_msg = f"Erreur lors de l'extraction du scope: {str(e)}" | |
| metadata["scope"] = error_msg | |
| scopes_by_spec_num[spec_num] = error_msg | |
| # Mise à jour du dictionnaire global avec verrou | |
| with dict_lock: | |
| string += f"+-+{metadata['scope']}" if metadata['scope'] != " " or metadata['scope'] != "" or "not found" not in metadata['scope'].lower() else "" | |
| indexed_specifications[string] = metadata | |
| processed_count += 1 | |
| # Affichage de la progression avec verrou | |
| with print_lock: | |
| sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications") | |
| sys.stdout.flush() | |
| except Exception as e: | |
| with print_lock: | |
| print(f"\nErreur lors du traitement de {spec.get('spec_num', 'inconnu')}: {str(e)}") | |
| def main(): | |
| global total_count | |
| old_length = 0 | |
| start_time = time.time() | |
| # Récupération des spécifications depuis le site 3GPP | |
| print("Récupération des spécifications depuis 3GPP...") | |
| response = requests.get( | |
| f'https://www.3gpp.org/dynareport?code=status-report.htm', | |
| headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, | |
| verify=False | |
| ) | |
| # Analyse des tableaux HTML | |
| dfs = pd.read_html( | |
| StringIO(response.text), | |
| storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, | |
| encoding="utf-8" | |
| ) | |
| for x in range(len(dfs)): | |
| dfs[x] = dfs[x].replace({np.nan: None}) | |
| # Extraction des colonnes nécessaires | |
| columns_needed = [0, 1, 2, 3, 4] | |
| extracted_dfs = [df.iloc[:, columns_needed] for df in dfs] | |
| columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] | |
| # Préparation des spécifications | |
| specifications = [] | |
| for df in extracted_dfs: | |
| for index, row in df.iterrows(): | |
| doc = row.to_list() | |
| doc_dict = dict(zip(columns, doc)) | |
| specifications.append(doc_dict) | |
| total_count = len(specifications) | |
| print(f"Traitement de {total_count} spécifications avec multithreading...") | |
| try: | |
| # Vérification si un fichier de scopes existe déjà | |
| if os.path.exists("indexed_specifications.json"): | |
| with open("indexed_specifications.json", "r", encoding="utf-8") as f: | |
| global scopes_by_spec_num | |
| f_up = json.load(f) | |
| scopes_by_spec_num = f_up['scopes'] | |
| before = len(f_up['specs']) | |
| print(f"Chargement de {len(scopes_by_spec_num)} scopes depuis le cache.") | |
| # Utilisation de ThreadPoolExecutor pour le multithreading | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: | |
| futures = [executor.submit(process_specification, spec, columns) for spec in specifications] | |
| concurrent.futures.wait(futures) | |
| finally: | |
| # Sauvegarde des résultats | |
| result = { | |
| "specs": indexed_specifications, | |
| "scopes": scopes_by_spec_num, | |
| "last_indexed_date": datetime.datetime.today().strftime("%d-%m-%Y") | |
| } | |
| with open("indexed_specifications.json", "w", encoding="utf-8") as f: | |
| json.dump(result, f, indent=4, ensure_ascii=False) | |
| elapsed_time = time.time() - start_time | |
| print(f"\nTraitement terminé en {elapsed_time:.2f} secondes") | |
| print(f"Nouveaux specifications : {len(indexed_specifications) - before}") | |
| print(f"Résultats sauvegardés dans indexed_specifications.json") | |
| if __name__ == "__main__": | |
| main() | |