Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from datasets import load_dataset, Dataset | |
| import json | |
| import os | |
| import time | |
| import warnings | |
| import re | |
| import concurrent.futures | |
| import threading | |
| from typing import List, Dict, Any | |
| warnings.filterwarnings("ignore") | |
| class TsgDocIndexer: | |
| def __init__(self, max_workers=10): | |
| self.indexer = self.load_indexer() | |
| self.main_ftp_url = "https://3gpp.org/ftp" | |
| self.dataset = load_dataset("OrganizedProgrammers/3GPPTDocLocation") | |
| self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE) | |
| self.max_workers = max_workers | |
| # Verrous pour les opérations thread-safe | |
| self.print_lock = threading.Lock() | |
| self.indexer_lock = threading.Lock() | |
| # Compteurs pour le suivi | |
| self.total_indexed = 0 | |
| self.processed_count = 0 | |
| self.total_count = 0 | |
| def load_indexer(self): | |
| """Load existing index if available""" | |
| all_docs = {} | |
| tdoc_locations = load_dataset("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"]) | |
| tdoc_locations = tdoc_locations["train"].to_list() | |
| for doc in tdoc_locations: | |
| all_docs[doc["doc_id"]] = doc["url"] | |
| return all_docs | |
| def save_indexer(self): | |
| """Save the updated index""" | |
| data = [] | |
| for doc_id, url in self.indexer.items(): | |
| data.append({"doc_id": doc_id, "url": url}) | |
| dataset = Dataset.from_list(data) | |
| dataset.push_to_hub("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"]) | |
| def get_docs_from_url(self, url): | |
| """Récupérer la liste des documents/répertoires depuis une URL""" | |
| try: | |
| response = requests.get(url, verify=False, timeout=10) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| return [item.get_text() for item in soup.select("tr td a")] | |
| except Exception as e: | |
| with self.print_lock: | |
| print(f"Erreur lors de l'accès à {url}: {e}") | |
| return [] | |
| def is_valid_document_pattern(self, filename): | |
| """Vérifier si le nom de fichier correspond au motif requis""" | |
| return bool(self.valid_doc_pattern.match(filename)) | |
| def is_zip_file(self, filename): | |
| """Vérifier si le fichier est un ZIP""" | |
| return filename.lower().endswith('.zip') | |
| def extract_doc_id(self, filename): | |
| """Extraire l'identifiant du document à partir du nom de fichier s'il correspond au motif""" | |
| if self.is_valid_document_pattern(filename): | |
| match = self.valid_doc_pattern.match(filename) | |
| if match: | |
| # Retourner le motif complet (comme S1-12345) | |
| full_id = filename.split('.')[0] # Enlever l'extension si présente | |
| return full_id.split('_')[0] # Enlever les suffixes après underscore si présents | |
| return None | |
| def process_zip_files(self, files_list, base_url, workshop=False): | |
| """Traiter une liste de fichiers pour trouver et indexer les ZIP valides""" | |
| indexed_count = 0 | |
| for file in files_list: | |
| if file in ['./', '../', 'ZIP/', 'zip/']: | |
| continue | |
| # Vérifier si c'est un fichier ZIP et s'il correspond au motif | |
| if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop): | |
| file_url = f"{base_url}/{file}" | |
| # Extraire l'ID du document | |
| doc_id = self.extract_doc_id(file) | |
| if doc_id is None: | |
| doc_id = file.split('.')[0] | |
| if doc_id: | |
| # Vérifier si ce fichier est déjà indexé | |
| with self.indexer_lock: | |
| if doc_id in self.indexer and self.indexer[doc_id] == file_url: | |
| continue | |
| # Ajouter ou mettre à jour l'index | |
| self.indexer[doc_id] = file_url | |
| indexed_count += 1 | |
| self.total_indexed += 1 | |
| return indexed_count | |
| def process_meeting(self, meeting, wg_url, workshop=False): | |
| """Traiter une réunion individuelle avec multithreading""" | |
| try: | |
| if meeting in ['./', '../']: | |
| return 0 | |
| meeting_url = f"{wg_url}/{meeting}" | |
| with self.print_lock: | |
| print(f" Vérification de la réunion: {meeting}") | |
| # Vérifier le contenu de la réunion | |
| meeting_contents = self.get_docs_from_url(meeting_url) | |
| key = None | |
| if "docs" in [x.lower() for x in meeting_contents]: | |
| key = "docs" | |
| elif "tdocs" in [x.lower() for x in meeting_contents]: | |
| key = "tdocs" | |
| elif "tdoc" in [x.lower() for x in meeting_contents]: | |
| key = "tdoc" | |
| if key is not None: | |
| docs_url = f"{meeting_url}/{key}" | |
| with self.print_lock: | |
| print(f" Vérification des documents dans {docs_url}") | |
| # Récupérer la liste des fichiers dans le dossier Docs | |
| docs_files = self.get_docs_from_url(docs_url) | |
| # 1. Indexer les fichiers ZIP directement dans le dossier Docs | |
| docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop) | |
| if docs_indexed_count > 0: | |
| with self.print_lock: | |
| print(f" {docs_indexed_count} nouveaux fichiers ZIP indexés dans Docs") | |
| # 2. Vérifier le sous-dossier ZIP s'il existe | |
| if "zip" in [x.lower() for x in docs_files]: | |
| zip_url = f"{docs_url}/zip" | |
| with self.print_lock: | |
| print(f" Vérification du dossier ZIP: {zip_url}") | |
| # Récupérer les fichiers dans le sous-dossier ZIP | |
| zip_files = self.get_docs_from_url(zip_url) | |
| # Indexer les fichiers ZIP dans le sous-dossier ZIP | |
| zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop) | |
| if zip_indexed_count > 0: | |
| with self.print_lock: | |
| print(f" {zip_indexed_count} nouveaux fichiers ZIP indexés dans le dossier ZIP") | |
| # Mise à jour du compteur de progression | |
| with self.indexer_lock: | |
| self.processed_count += 1 | |
| # Affichage de la progression | |
| with self.print_lock: | |
| progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0 | |
| print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)", end="") | |
| return 1 # Réunion traitée avec succès | |
| except Exception as e: | |
| with self.print_lock: | |
| print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}") | |
| return 0 | |
| def process_workgroup(self, wg, main_url): | |
| """Traiter un groupe de travail avec multithreading pour ses réunions""" | |
| if wg in ['./', '../']: | |
| return | |
| wg_url = f"{main_url}/{wg}" | |
| with self.print_lock: | |
| print(f" Vérification du groupe de travail: {wg}") | |
| # Récupérer les dossiers de réunion | |
| meeting_folders = self.get_docs_from_url(wg_url) | |
| # Ajouter au compteur total | |
| self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) | |
| # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| futures = [executor.submit(self.process_meeting, meeting, wg_url) | |
| for meeting in meeting_folders if meeting not in ['./', '../']] | |
| # Attendre que toutes les tâches soient terminées | |
| concurrent.futures.wait(futures) | |
| def index_all_tdocs(self): | |
| """Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading""" | |
| print("Démarrage de l'indexation des documents ZIP 3GPP (S1-S6, SP, C1-C6, CP)...") | |
| start_time = time.time() | |
| docs_count_before = len(self.indexer) | |
| # Principaux groupes TSG | |
| main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] # Ajouter d'autres si nécessaire | |
| for main_tsg in main_groups: | |
| print(f"\nIndexation de {main_tsg.upper()}...") | |
| main_url = f"{self.main_ftp_url}/{main_tsg}" | |
| # Récupérer les groupes de travail | |
| workgroups = self.get_docs_from_url(main_url) | |
| # Traiter chaque groupe de travail séquentiellement | |
| # (mais les réunions à l'intérieur seront traitées en parallèle) | |
| for wg in workgroups: | |
| self.process_workgroup(wg, main_url) | |
| docs_count_after = len(self.indexer) | |
| new_docs_count = docs_count_after - docs_count_before | |
| print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") | |
| print(f"Nouveaux documents ZIP indexés: {new_docs_count}") | |
| print(f"Total des documents dans l'index: {docs_count_after}") | |
| return self.indexer | |
| def index_all_workshops(self): | |
| print("Démarrage de l'indexation des workshops ZIP 3GPP...") | |
| start_time = time.time() | |
| docs_count_before = len(self.indexer) | |
| print("\nIndexation du dossier 'workshop'") | |
| main_url = f"{self.main_ftp_url}/workshop" | |
| # Récupérer les dossiers de réunion | |
| meeting_folders = self.get_docs_from_url(main_url) | |
| # Ajouter au compteur total | |
| self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) | |
| # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True) | |
| for meeting in meeting_folders if meeting not in ['./', '../']] | |
| concurrent.futures.wait(futures) | |
| docs_count_after = len(self.indexer) | |
| new_docs_count = docs_count_after - docs_count_before | |
| print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") | |
| print(f"Nouveaux documents ZIP indexés: {new_docs_count}") | |
| print(f"Total des documents dans l'index: {docs_count_after}") | |
| return self.indexer | |
| def main(): | |
| # Nombre de workers pour le multithreading (ajustable selon les ressources disponibles) | |
| max_workers = 64 | |
| indexer = TsgDocIndexer(max_workers=max_workers) | |
| try: | |
| indexer.index_all_tdocs() | |
| indexer.index_all_workshops() | |
| except Exception as e: | |
| print(f"Erreur lors de l'indexation: {e}") | |
| finally: | |
| indexer.save_indexer() | |
| print("Index sauvegardé.") | |
| if __name__ == "__main__": | |
| main() | |