Spaces:
Running
Running
| import os | |
| import time | |
| import warnings | |
| from dotenv import load_dotenv | |
| import numpy as np | |
| import requests | |
| import pandas as pd | |
| warnings.filterwarnings("ignore") | |
| os.environ["CURL_CA_BUNDLE"] = "" | |
| load_dotenv() | |
| from huggingface_hub import configure_http_backend | |
| def backend_factory() -> requests.Session: | |
| session = requests.Session() | |
| session.verify = False | |
| return session | |
| configure_http_backend(backend_factory=backend_factory) | |
| from datasets import load_dataset, Dataset | |
| from datasets.data_files import EmptyDatasetError | |
| import threading | |
| import zipfile | |
| import sys | |
| import fitz | |
| import re | |
| import json | |
| import traceback | |
| import io | |
| import concurrent.futures | |
| import hashlib | |
| CHARS = "0123456789abcdefghijklmnopqrstuvwxyz" | |
| DICT_LOCK = threading.Lock() | |
| DOCUMENT_LOCK = threading.Lock() | |
| STOP_EVENT = threading.Event() | |
| documents_by_spec_num = {} | |
| try: | |
| spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent", token=os.environ.get("HF_TOKEN")) | |
| spec_contents = spec_contents["train"].to_list() | |
| for section in spec_contents: | |
| if section["doc_id"] not in documents_by_spec_num.keys(): | |
| documents_by_spec_num[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} | |
| else: | |
| documents_by_spec_num[section["doc_id"]]["content"][section["section"]] = section["content"] | |
| except EmptyDatasetError as e: | |
| print("Base de données vide !") | |
| indexed_specifications = {} | |
| specifications_passed = set() | |
| processed_count = 0 | |
| total_count = 0 | |
| session = requests.Session() | |
| req = session.post("https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")})) | |
| print("Récupération des spécifications depuis ETSI...", req.status_code) | |
| url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1" | |
| url_tr = url_ts.replace("stdType=TS", "stdType=TR") | |
| data_ts = requests.get(url_ts, verify=False).content | |
| data_tr = requests.get(url_tr, verify=False).content | |
| df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False) | |
| df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False) | |
| backup_ts = df_ts["ETSI deliverable"] | |
| backup_tr = df_tr["ETSI deliverable"] | |
| df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)") | |
| df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)") | |
| version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") | |
| version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") | |
| df_ts["Version"] = version1[0] | |
| df_tr["Version"] = version2[0] | |
| def ver_tuple(v): | |
| return tuple(map(int, v.split("."))) | |
| df_ts["temp"] = df_ts["Version"].apply(ver_tuple) | |
| df_tr["temp"] = df_tr["Version"].apply(ver_tuple) | |
| df_ts["Type"] = "TS" | |
| df_tr["Type"] = "TR" | |
| df = pd.concat([df_ts, df_tr]) | |
| unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()] | |
| unique_df = unique_df.drop(columns="temp") | |
| unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))] | |
| df = df.drop(columns="temp") | |
| df = df[(~df["title"].str.contains("3GPP", case=True, na=False))] | |
| def get_text(specification: str): | |
| if STOP_EVENT.is_set(): | |
| return None, [] | |
| print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True) | |
| response = session.get( | |
| unique_df[unique_df["ETSI deliverable"] == specification].iloc[0]["PDF link"], | |
| verify=False, | |
| headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} | |
| ) | |
| if response.status_code != 200: | |
| print(f"\n[ERREUR] Echec du téléchargement du PDF pour {specification}. {req.status_code}", flush=True) | |
| return None, [] | |
| pdf = fitz.open(stream=response.content, filetype="pdf") | |
| return pdf, pdf.get_toc() | |
| def get_spec_content(specification: str): | |
| def extract_sections(text, titles): | |
| sections = {} | |
| # On trie les titres selon leur position dans le texte | |
| sorted_titles = sorted(titles, key=lambda t: text.find(t)) | |
| for i, title in enumerate(sorted_titles): | |
| start = text.find(title) | |
| if i + 1 < len(sorted_titles): | |
| end = text.find(sorted_titles[i + 1]) | |
| sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) | |
| else: | |
| sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) | |
| return sections | |
| if STOP_EVENT.is_set(): | |
| return {} | |
| print("\n[INFO] Tentative de récupération du texte", flush=True) | |
| pdf, doc_toc = get_text(specification) | |
| text = [] | |
| first = 0 | |
| for level, title, page in doc_toc: | |
| first = page - 1 | |
| break | |
| for page in pdf[first:]: | |
| text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) | |
| text = "\n".join(text) | |
| if not text or STOP_EVENT.is_set() or not doc_toc: | |
| print("\n[ERREUR] Pas de texte/table of contents trouvé !") | |
| return {} | |
| print(f"\n[INFO] Texte {specification} récupéré", flush=True) | |
| titles = [] | |
| for level, title, page in doc_toc: | |
| if STOP_EVENT.is_set(): | |
| return {} | |
| if title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: | |
| titles.append('\n'.join(title.strip().split(" ", 1))) | |
| return extract_sections(text, titles) | |
| def hasher(specification: str, version: str): | |
| return hashlib.md5(f"{specification}{version}".encode()).hexdigest() | |
| def get_scope(content): | |
| for title, text in content.items(): | |
| if title.lower().endswith("scope"): | |
| return text | |
| return "" | |
| def process_specification(spec): | |
| global processed_count, indexed_specifications, documents_by_spec_num | |
| if STOP_EVENT.is_set(): | |
| return | |
| try: | |
| version = spec.get('Version') | |
| if not version: return | |
| doc_id = str(spec.get("ETSI deliverable")) | |
| document = None | |
| with DOCUMENT_LOCK: | |
| if doc_id in documents_by_spec_num and documents_by_spec_num[doc_id]["hash"] == hasher(doc_id, version) and not doc_id in specifications_passed: | |
| document = documents_by_spec_num[doc_id] | |
| specifications_passed.add(doc_id) | |
| print(f"\n[INFO] Document déjà présent pour {doc_id} (version {spec['Version']})", flush=True) | |
| elif doc_id in specifications_passed: | |
| document = documents_by_spec_num[doc_id] | |
| print(f"\n[INFO] Document déjà présent pour {doc_id} [dernière version présent]") | |
| else: | |
| print(f"\n[INFO] Tentative de récupération du document {doc_id} (version {spec['Version']})", flush=True) | |
| document = get_spec_content(doc_id) | |
| if document: | |
| documents_by_spec_num[doc_id] = {"content": document, "hash": hasher(doc_id, version)} | |
| document = {"content": document, "hash": hasher(doc_id, version)} | |
| specifications_passed.add(doc_id) | |
| print(f"\n[INFO] Document extrait pour {doc_id} (version {spec['Version']})", flush=True) | |
| string_key = f"{doc_id}+-+{spec['title']}+-+{spec['Type']}+-+{spec['Version']}" | |
| metadata = { | |
| "id": str(doc_id), | |
| "title": spec["title"], | |
| "type": spec["Type"], | |
| "version": version, | |
| "url": spec["PDF link"], | |
| "scope": "" if not document else get_scope(document["content"]) | |
| } | |
| with DICT_LOCK: | |
| indexed_specifications[string_key] = metadata | |
| processed_count += 1 | |
| sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications...") | |
| sys.stdout.flush() | |
| except Exception as e: | |
| traceback.print_exception(e) | |
| print(f"\n[ERREUR] Échec du traitement de {doc_id} {version}: {e}", flush=True) | |
| def sauvegarder(indexed_specifications, documents_by_spec_num): | |
| print("\nSauvegarde en cours...", flush=True) | |
| flat_metadata = [metadata for _, metadata in indexed_specifications.items()] | |
| flat_docs = [] | |
| for doc_id, data in documents_by_spec_num.items(): | |
| for title, content in data["content"].items(): | |
| flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) | |
| push_spec_content = Dataset.from_list(flat_docs) | |
| push_spec_metadata = Dataset.from_list(flat_metadata) | |
| push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF_TOKEN"]) | |
| push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF_TOKEN"]) | |
| print("Sauvegarde terminée.", flush=True) | |
| def main(): | |
| global total_count | |
| start_time = time.time() | |
| specifications = df.to_dict(orient="records") | |
| total_count = len(specifications) | |
| print(f"Traitement de {total_count} spécifications avec multithreading...") | |
| try: | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: | |
| futures = [executor.submit(process_specification, spec) for spec in specifications] | |
| while True: | |
| if all(f.done() for f in futures): | |
| break | |
| if STOP_EVENT.is_set(): | |
| break | |
| time.sleep(0.35) | |
| except Exception as e: | |
| print(f"\nErreur inattendue dans le ThreadPool : {e}", flush=True) | |
| print("\nSauvegarde des résultats...", flush=True) | |
| sauvegarder(indexed_specifications, documents_by_spec_num) | |
| elapsed_time = time.time() - start_time | |
| print(f"\nTraitement terminé en {elapsed_time:.2f} secondes.", flush=True) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\nInterruption détectée (Ctrl+C). Arrêt des tâches en cours...", flush=True) | |
| STOP_EVENT.set() | |
| time.sleep(2) | |
| sauvegarder(indexed_specifications, documents_by_spec_num) | |
| print("Arrêt propre du script.", flush=True) | |
| sys.exit(0) | |
| except Exception as e: | |
| print(f"\nErreur inattendue : {e}", flush=True) | |
| sauvegarder(indexed_specifications, documents_by_spec_num) | |
| sys.exit(1) | |
| # print(get_spec_content("188 005-1")) |