|
|
import os |
|
|
import random |
|
|
import spacy |
|
|
import nltk |
|
|
from nltk.corpus import wordnet |
|
|
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM |
|
|
from textblob import TextBlob |
|
|
import requests |
|
|
import json |
|
|
from pathlib import Path |
|
|
import torch |
|
|
|
|
|
class AdvancedTextAugmenter: |
|
|
def __init__(self): |
|
|
self.setup_dependencies() |
|
|
self.setup_models() |
|
|
|
|
|
def setup_dependencies(self): |
|
|
"""Configure les dépendances nécessaires""" |
|
|
try: |
|
|
|
|
|
nltk.download('wordnet', quiet=True) |
|
|
nltk.download('averaged_perceptron_tagger', quiet=True) |
|
|
nltk.download('punkt', quiet=True) |
|
|
|
|
|
|
|
|
try: |
|
|
self.nlp = spacy.load("fr_core_news_sm") |
|
|
except OSError: |
|
|
print("Modèle spaCy français non trouvé. Installation...") |
|
|
os.system("python -m spacy download fr_core_news_sm") |
|
|
self.nlp = spacy.load("fr_core_news_sm") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Erreur lors de la configuration: {e}") |
|
|
print("Installez les dépendances avec: pip install spacy nltk transformers textblob torch") |
|
|
|
|
|
def setup_models(self): |
|
|
"""Configure les modèles de transformation""" |
|
|
try: |
|
|
|
|
|
self.paraphraser = pipeline( |
|
|
"text2text-generation", |
|
|
model="plguillou/t5-base-fr-sum-cnndm", |
|
|
tokenizer="plguillou/t5-base-fr-sum-cnndm", |
|
|
device=0 if torch.cuda.is_available() else -1 |
|
|
) |
|
|
|
|
|
|
|
|
self.translator_fr_en = pipeline( |
|
|
"translation_fr_to_en", |
|
|
model="Helsinki-NLP/opus-mt-fr-en", |
|
|
device=0 if torch.cuda.is_available() else -1 |
|
|
) |
|
|
|
|
|
self.translator_en_fr = pipeline( |
|
|
"translation_en_to_fr", |
|
|
model="Helsinki-NLP/opus-mt-en-fr", |
|
|
device=0 if torch.cuda.is_available() else -1 |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Erreur lors du chargement des modèles: {e}") |
|
|
print("Utilisation de méthodes alternatives...") |
|
|
self.paraphraser = None |
|
|
self.translator_fr_en = None |
|
|
self.translator_en_fr = None |
|
|
|
|
|
def get_wordnet_synonyms(self, word, pos_tag): |
|
|
"""Récupère les synonymes via WordNet""" |
|
|
synonyms = set() |
|
|
|
|
|
|
|
|
wordnet_pos = self.get_wordnet_pos(pos_tag) |
|
|
|
|
|
if wordnet_pos: |
|
|
for syn in wordnet.synsets(word, pos=wordnet_pos, lang='fra'): |
|
|
for lemma in syn.lemmas(lang='fra'): |
|
|
synonym = lemma.name().replace('_', ' ') |
|
|
if synonym.lower() != word.lower(): |
|
|
synonyms.add(synonym) |
|
|
|
|
|
return list(synonyms) |
|
|
|
|
|
def get_wordnet_pos(self, treebank_tag): |
|
|
"""Convertit les tags POS vers le format WordNet""" |
|
|
if treebank_tag.startswith('J'): |
|
|
return wordnet.ADJ |
|
|
elif treebank_tag.startswith('V'): |
|
|
return wordnet.VERB |
|
|
elif treebank_tag.startswith('N'): |
|
|
return wordnet.NOUN |
|
|
elif treebank_tag.startswith('R'): |
|
|
return wordnet.ADV |
|
|
else: |
|
|
return None |
|
|
|
|
|
def synonym_replacement(self, text, replace_ratio=0.3): |
|
|
"""Méthode 1: Remplacement par synonymes via WordNet et spaCy - CORRIGÉE""" |
|
|
doc = self.nlp(text) |
|
|
result_tokens = [] |
|
|
|
|
|
for token in doc: |
|
|
|
|
|
if token.i > 0: |
|
|
|
|
|
prev_token = doc[token.i - 1] |
|
|
spaces_between = text[prev_token.idx + len(prev_token.text):token.idx] |
|
|
result_tokens.append(spaces_between) |
|
|
|
|
|
if (not token.is_stop and not token.is_punct and |
|
|
token.pos_ in ['NOUN', 'ADJ', 'VERB', 'ADV'] and |
|
|
random.random() < replace_ratio): |
|
|
|
|
|
|
|
|
synonyms = self.get_wordnet_synonyms(token.lemma_, token.tag_) |
|
|
|
|
|
if synonyms: |
|
|
synonym = random.choice(synonyms) |
|
|
|
|
|
if token.text[0].isupper(): |
|
|
synonym = synonym.capitalize() |
|
|
result_tokens.append(synonym) |
|
|
else: |
|
|
result_tokens.append(token.text) |
|
|
else: |
|
|
result_tokens.append(token.text) |
|
|
|
|
|
|
|
|
return ''.join(result_tokens) |
|
|
|
|
|
def back_translation(self, text): |
|
|
"""Méthode 2: Back-translation FR->EN->FR""" |
|
|
if not self.translator_fr_en or not self.translator_en_fr: |
|
|
return self.fallback_paraphrase(text) |
|
|
|
|
|
try: |
|
|
|
|
|
english = self.translator_fr_en(text, max_length=512)[0]['translation_text'] |
|
|
|
|
|
|
|
|
back_translated = self.translator_en_fr(english, max_length=512)[0]['translation_text'] |
|
|
|
|
|
return back_translated |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Erreur back-translation: {e}") |
|
|
return self.fallback_paraphrase(text) |
|
|
|
|
|
def neural_paraphrasing(self, text): |
|
|
"""Méthode 3: Paraphrase neuronale avec T5""" |
|
|
if not self.paraphraser: |
|
|
return self.fallback_paraphrase(text) |
|
|
|
|
|
try: |
|
|
|
|
|
input_text = f"paraphrase: {text}" |
|
|
|
|
|
result = self.paraphraser( |
|
|
input_text, |
|
|
max_length=len(text.split()) * 2, |
|
|
num_return_sequences=1, |
|
|
temperature=0.8, |
|
|
do_sample=True |
|
|
) |
|
|
|
|
|
return result[0]['generated_text'] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Erreur paraphrase neuronale: {e}") |
|
|
return self.fallback_paraphrase(text) |
|
|
|
|
|
def fallback_paraphrase(self, text): |
|
|
"""Méthode de secours utilisant des transformations linguistiques - CORRIGÉE""" |
|
|
doc = self.nlp(text) |
|
|
|
|
|
|
|
|
sentences = [sent.text.strip() for sent in doc.sents] |
|
|
|
|
|
paraphrased_sentences = [] |
|
|
for sentence in sentences: |
|
|
sent_doc = self.nlp(sentence) |
|
|
|
|
|
|
|
|
result_tokens = [] |
|
|
for token in sent_doc: |
|
|
|
|
|
if token.i > 0: |
|
|
prev_token = sent_doc[token.i - 1] |
|
|
spaces_between = sentence[prev_token.idx + len(prev_token.text):token.idx] |
|
|
result_tokens.append(spaces_between) |
|
|
|
|
|
if token.pos_ == 'ADP': |
|
|
prep_alternatives = { |
|
|
'dans': 'à travers', 'sur': 'au-dessus de', |
|
|
'avec': 'en compagnie de', 'pour': 'en faveur de' |
|
|
} |
|
|
result_tokens.append(prep_alternatives.get(token.text.lower(), token.text)) |
|
|
else: |
|
|
result_tokens.append(token.text) |
|
|
|
|
|
paraphrased_sentences.append(''.join(result_tokens)) |
|
|
|
|
|
return ' '.join(paraphrased_sentences) |
|
|
|
|
|
def contextual_word_insertion(self, text, insert_ratio=0.1): |
|
|
"""Méthode 4: Insertion contextuelle de mots - CORRIGÉE""" |
|
|
doc = self.nlp(text) |
|
|
result = "" |
|
|
|
|
|
adverb_intensifiers = ['vraiment', 'particulièrement', 'extrêmement', 'assez', 'plutôt'] |
|
|
conjunctions = ['également', 'aussi', 'de plus', 'par ailleurs'] |
|
|
|
|
|
for i, token in enumerate(doc): |
|
|
|
|
|
if token.i > 0: |
|
|
prev_token = doc[token.i - 1] |
|
|
spaces_between = text[prev_token.idx + len(prev_token.text):token.idx] |
|
|
result += spaces_between |
|
|
|
|
|
|
|
|
if (token.pos_ == 'ADJ' and random.random() < insert_ratio): |
|
|
result += random.choice(adverb_intensifiers) + " " |
|
|
|
|
|
result += token.text |
|
|
|
|
|
|
|
|
if (token.text in ['.', '!', '?'] and i < len(doc) - 1 and |
|
|
random.random() < insert_ratio): |
|
|
result += " " + random.choice(conjunctions) + "," |
|
|
|
|
|
return result |
|
|
|
|
|
def process_single_file(self, file_path, output_counter): |
|
|
"""Traite un seul fichier et génère ses variations""" |
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
original_text = f.read().strip() |
|
|
|
|
|
if not original_text: |
|
|
return output_counter |
|
|
|
|
|
print(f"Traitement de: {file_path.name}") |
|
|
|
|
|
|
|
|
print(" → Génération variation 1 (synonymes + insertion)...") |
|
|
variation_1 = self.synonym_replacement(original_text) |
|
|
variation_1 = self.contextual_word_insertion(variation_1) |
|
|
|
|
|
|
|
|
print(" → Génération variation 2 (back-translation/paraphrase)...") |
|
|
if random.choice([True, False]): |
|
|
variation_2 = self.back_translation(original_text) |
|
|
else: |
|
|
variation_2 = self.neural_paraphrasing(original_text) |
|
|
|
|
|
|
|
|
output_file_1 = f"template{output_counter}.txt" |
|
|
with open(output_file_1, 'w', encoding='utf-8') as f: |
|
|
f.write(variation_1) |
|
|
|
|
|
output_file_2 = f"template{output_counter + 1}.txt" |
|
|
with open(output_file_2, 'w', encoding='utf-8') as f: |
|
|
f.write(variation_2) |
|
|
|
|
|
print(f" ✓ Créé: {output_file_1}, {output_file_2}") |
|
|
|
|
|
return output_counter + 2 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Erreur lors du traitement de {file_path}: {e}") |
|
|
return output_counter |
|
|
|
|
|
def augment_dataset(self, input_directory=".", output_prefix="template", start_number=419): |
|
|
"""Traite tous les fichiers texte du répertoire""" |
|
|
|
|
|
print("=== AUGMENTATION AVANCÉE DE DONNÉES TEXTUELLES ===\n") |
|
|
|
|
|
|
|
|
text_files = sorted(list(Path(input_directory).glob("*.txt"))) |
|
|
|
|
|
if not text_files: |
|
|
print("❌ Aucun fichier .txt trouvé dans le répertoire.") |
|
|
return |
|
|
|
|
|
print(f"📁 Trouvé {len(text_files)} fichiers à traiter...") |
|
|
print(f"🚀 Démarrage de la génération à partir de {output_prefix}{start_number}.txt\n") |
|
|
|
|
|
output_counter = start_number |
|
|
processed_files = 0 |
|
|
|
|
|
for file_path in text_files: |
|
|
output_counter = self.process_single_file(file_path, output_counter) |
|
|
processed_files += 1 |
|
|
|
|
|
if processed_files % 50 == 0: |
|
|
print(f"📊 Progression: {processed_files}/{len(text_files)} fichiers traités\n") |
|
|
|
|
|
total_generated = output_counter - start_number |
|
|
print(f"\n🎉 TERMINÉ!") |
|
|
print(f"📈 Statistiques:") |
|
|
print(f" • Fichiers originaux: {len(text_files)}") |
|
|
print(f" • Nouveaux fichiers générés: {total_generated}") |
|
|
print(f" • Total final: {len(text_files) + total_generated}") |
|
|
print(f" • Facteur de multiplication: x{(len(text_files) + total_generated) / len(text_files):.1f}") |
|
|
|
|
|
|
|
|
def install_dependencies(): |
|
|
"""Installe les dépendances nécessaires""" |
|
|
import subprocess |
|
|
import sys |
|
|
|
|
|
packages = [ |
|
|
"spacy", "nltk", "transformers", "textblob", "torch", "sentencepiece" |
|
|
] |
|
|
|
|
|
for package in packages: |
|
|
try: |
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", package]) |
|
|
except: |
|
|
print(f"Impossible d'installer {package}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("Vérification des dépendances...") |
|
|
|
|
|
try: |
|
|
augmenter = AdvancedTextAugmenter() |
|
|
|
|
|
|
|
|
augmenter.augment_dataset( |
|
|
input_directory="data_txt", |
|
|
output_prefix="template", |
|
|
start_number=419 |
|
|
) |
|
|
|
|
|
except ImportError as e: |
|
|
print(f"Dépendances manquantes: {e}") |
|
|
print("Installation automatique...") |
|
|
install_dependencies() |
|
|
print("Relancez le script après l'installation.") |