Spaces:
Runtime error
Runtime error
| import torch | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| TrainingArguments, | |
| Trainer, | |
| DataCollatorWithPadding | |
| ) | |
| from datasets import load_dataset, DatasetDict | |
| import numpy as np | |
| from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support, classification_report | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from tqdm import tqdm | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class SentimentFineTuner: | |
| def __init__(self, model_name="5CD-AI/Vietnamese-Sentiment-visobert", dataset_name="uitnlp/vietnamese_students_feedback"): | |
| self.model_name = model_name | |
| self.dataset_name = dataset_name | |
| self.tokenizer = None | |
| self.model = None | |
| self.dataset = None | |
| self.tokenized_datasets = None | |
| def load_model_and_tokenizer(self): | |
| """Load the pre-trained model and tokenizer""" | |
| print(f"Loading model: {self.model_name}") | |
| print(f"Loading tokenizer...") | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
| self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) | |
| print("Model and tokenizer loaded successfully!") | |
| print(f"Model architecture: {self.model.config.architectures}") | |
| print(f"Number of labels: {self.model.config.num_labels}") | |
| def load_and_prepare_dataset(self): | |
| """Load and prepare the dataset""" | |
| print(f"Loading dataset: {self.dataset_name}") | |
| try: | |
| # Try loading the dataset directly | |
| self.dataset = load_dataset(self.dataset_name) | |
| except Exception as e: | |
| print(f"Error loading dataset directly: {e}") | |
| print("Attempting alternative dataset loading...") | |
| # Alternative approach: Create a synthetic Vietnamese sentiment dataset | |
| try: | |
| # Try to load a different Vietnamese dataset | |
| self.dataset = load_dataset("linhtranvi/5cdAI-Vietnamese-sentiment") | |
| print("Loaded alternative Vietnamese sentiment dataset!") | |
| except Exception as e2: | |
| print(f"Alternative dataset also failed: {e2}") | |
| print("Creating a sample Vietnamese sentiment dataset...") | |
| self.create_sample_dataset() | |
| return | |
| print("Dataset loaded successfully!") | |
| print(f"Dataset info: {self.dataset}") | |
| # Check the dataset structure | |
| print("\nDataset structure:") | |
| for split in self.dataset: | |
| print(f"{split}: {len(self.dataset[split])} samples") | |
| print(f"Columns: {self.dataset[split].column_names}") | |
| if len(self.dataset[split]) > 0: | |
| print(f"Sample data: {self.dataset[split][0]}") | |
| # The dataset should have sentiment labels | |
| # Let's check the unique sentiment labels | |
| if 'train' in self.dataset: | |
| train_df = pd.DataFrame(self.dataset['train']) | |
| if 'sentiment' in train_df.columns: | |
| print(f"\nSentiment distribution in training set:") | |
| print(train_df['sentiment'].value_counts()) | |
| elif 'label' in train_df.columns: | |
| print(f"\nLabel distribution in training set:") | |
| print(train_df['label'].value_counts()) | |
| def preprocess_function(self, examples): | |
| """Tokenize the dataset""" | |
| # Get the text column | |
| text_column = None | |
| for col in ['sentence', 'text', 'comment', 'feedback']: | |
| if col in examples: | |
| text_column = col | |
| break | |
| if text_column is None: | |
| # Use the first string column | |
| for col in examples: | |
| if isinstance(examples[col][0], str): | |
| text_column = col | |
| break | |
| if text_column is None: | |
| raise ValueError("No text column found in the dataset") | |
| # Get the label column | |
| label_column = None | |
| for col in ['sentiment', 'label', 'labels']: | |
| if col in examples: | |
| label_column = col | |
| break | |
| if label_column is None: | |
| raise ValueError("No label column found in the dataset") | |
| # Tokenize the text (matching original model max length) | |
| tokenized_inputs = self.tokenizer( | |
| examples[text_column], | |
| truncation=True, | |
| padding=False, | |
| max_length=256 # Matching original 5CD-AI/Vietnamese-Sentiment-visobert config | |
| ) | |
| # Add labels | |
| tokenized_inputs['labels'] = examples[label_column] | |
| return tokenized_inputs | |
| def tokenize_datasets(self): | |
| """Tokenize all datasets""" | |
| print("Tokenizing datasets...") | |
| self.tokenized_datasets = self.dataset.map( | |
| self.preprocess_function, | |
| batched=True, | |
| remove_columns=self.dataset['train'].column_names | |
| ) | |
| print("Tokenization completed!") | |
| def compute_metrics(self, eval_pred): | |
| """Compute evaluation metrics""" | |
| predictions, labels = eval_pred | |
| predictions = np.argmax(predictions, axis=1) | |
| accuracy = accuracy_score(labels, predictions) | |
| f1 = f1_score(labels, predictions, average='weighted') | |
| precision, recall, f1_weighted, _ = precision_recall_fscore_support(labels, predictions, average='weighted') | |
| return { | |
| 'accuracy': accuracy, | |
| 'f1': f1, | |
| 'precision': precision, | |
| 'recall': recall | |
| } | |
| def setup_trainer(self, output_dir="./sentiment_model", learning_rate=2e-5, batch_size=16, num_epochs=5): | |
| """Setup the trainer for fine-tuning""" | |
| # Data collator | |
| data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer) | |
| # Training arguments (matching original 5CD-AI/Vietnamese-Sentiment-visobert config) | |
| training_args = TrainingArguments( | |
| output_dir=output_dir, | |
| learning_rate=learning_rate, | |
| per_device_train_batch_size=batch_size, | |
| per_device_eval_batch_size=batch_size, | |
| num_train_epochs=num_epochs, | |
| weight_decay=0.01, | |
| eval_strategy="epoch", | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| metric_for_best_model="f1", | |
| greater_is_better=True, | |
| push_to_hub=False, | |
| logging_dir=f"{output_dir}/logs", | |
| logging_steps=10, | |
| save_total_limit=2, | |
| seed=42, | |
| # Original model specific parameters | |
| gradient_accumulation_steps=1, | |
| optim="adamw_torch", # AdamW with default betas=(0.9, 0.999), epsilon=1e-08 | |
| ) | |
| # Initialize trainer | |
| self.trainer = Trainer( | |
| model=self.model, | |
| args=training_args, | |
| train_dataset=self.tokenized_datasets["train"], | |
| eval_dataset=self.tokenized_datasets["test"] if "test" in self.tokenized_datasets else self.tokenized_datasets["validation"], | |
| tokenizer=self.tokenizer, | |
| data_collator=data_collator, | |
| compute_metrics=self.compute_metrics | |
| ) | |
| print("Trainer setup completed!") | |
| def train_model(self): | |
| """Train the model""" | |
| print("Starting training...") | |
| # Train the model | |
| train_result = self.trainer.train() | |
| print("Training completed!") | |
| print(f"Training loss: {train_result.training_loss}") | |
| # Save the model | |
| self.trainer.save_model() | |
| self.tokenizer.save_pretrained(self.trainer.args.output_dir) | |
| print(f"Model saved to: {self.trainer.args.output_dir}") | |
| return train_result | |
| def evaluate_model(self): | |
| """Evaluate the model""" | |
| print("Evaluating model...") | |
| # Evaluate on test set | |
| eval_results = self.trainer.evaluate() | |
| print("Evaluation results:") | |
| for key, value in eval_results.items(): | |
| print(f"{key}: {value:.4f}") | |
| # Get predictions for detailed analysis | |
| predictions = self.trainer.predict(self.tokenized_datasets["test"] if "test" in self.tokenized_datasets else self.tokenized_datasets["validation"]) | |
| y_pred = np.argmax(predictions.predictions, axis=1) | |
| y_true = predictions.label_ids | |
| # Print classification report | |
| print("\nClassification Report:") | |
| print(classification_report(y_true, y_pred)) | |
| return eval_results, y_pred, y_true | |
| def plot_training_history(self): | |
| """Plot training history""" | |
| if hasattr(self.trainer, 'state') and hasattr(self.trainer.state, 'log_history'): | |
| logs = self.trainer.state.log_history | |
| # Extract training and validation metrics | |
| train_loss = [log['train_loss'] for log in logs if 'train_loss' in log] | |
| eval_loss = [log['eval_loss'] for log in logs if 'eval_loss' in log] | |
| eval_f1 = [log['eval_f1'] for log in logs if 'eval_f1' in log] | |
| # Create plots | |
| fig, axes = plt.subplots(1, 3, figsize=(15, 5)) | |
| # Training loss | |
| axes[0].plot(train_loss, label='Training Loss') | |
| axes[0].set_title('Training Loss') | |
| axes[0].set_xlabel('Steps') | |
| axes[0].set_ylabel('Loss') | |
| axes[0].legend() | |
| # Evaluation loss | |
| axes[1].plot(eval_loss, label='Evaluation Loss') | |
| axes[1].set_title('Evaluation Loss') | |
| axes[1].set_xlabel('Epoch') | |
| axes[1].set_ylabel('Loss') | |
| axes[1].legend() | |
| # Evaluation F1 | |
| axes[2].plot(eval_f1, label='Evaluation F1') | |
| axes[2].set_title('Evaluation F1 Score') | |
| axes[2].set_xlabel('Epoch') | |
| axes[2].set_ylabel('F1 Score') | |
| axes[2].legend() | |
| plt.tight_layout() | |
| plt.savefig('training_history.png', dpi=300, bbox_inches='tight') | |
| plt.show() | |
| print("Training history plots saved as 'training_history.png'") | |
| def plot_confusion_matrix(self, y_true, y_pred): | |
| """Plot confusion matrix""" | |
| from sklearn.metrics import confusion_matrix | |
| cm = confusion_matrix(y_true, y_pred) | |
| plt.figure(figsize=(8, 6)) | |
| sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') | |
| plt.title('Confusion Matrix') | |
| plt.xlabel('Predicted') | |
| plt.ylabel('Actual') | |
| plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight') | |
| plt.show() | |
| print("Confusion matrix saved as 'confusion_matrix.png'") | |
| def create_sample_dataset(self): | |
| """Create a sample Vietnamese sentiment dataset for demonstration""" | |
| print("Creating sample Vietnamese sentiment dataset...") | |
| # Sample Vietnamese texts with sentiment labels | |
| sample_data = { | |
| "text": [ | |
| # Positive samples | |
| "Giảng viên dạy rất hay và tâm huyết, tôi học được nhiều kiến thức bổ ích.", | |
| "Môn học này rất thú vị và practical, giúp tôi áp dụng được vào thực tế.", | |
| "Thầy cô rất tận tình và hỗ trợ sinh viên, không khí lớp học rất tích cực.", | |
| "Nội dung môn học sâu sắc, cách truyền đạt dễ hiểu, tôi rất hài lòng.", | |
| "Phương pháp giảng dạy mới mẻ, hấp dẫn, khiến tôi say mê học tập.", | |
| # Negative samples | |
| "Môn học quá khó và nhàm chán, không có gì để học cả.", | |
| "Giảng viên dạy không rõ ràng, tốc độ quá nhanh, không theo kịp.", | |
| "Thời lượng quá ít nhưng nội dung nhiều, không thể học hết.", | |
| "Thầy cô ít quan tâm đến sinh viên, không giải thích khi có thắc mắc.", | |
| "Đồ án quá nặng, yêu cầu không rõ ràng, deadline quá gấp.", | |
| # Neutral samples | |
| "Môn học ổn định, không có gì đặc biệt để nhận xét.", | |
| "Nội dung cơ bản, phù hợp với chương trình đề ra.", | |
| "Lớp học bình thường, giảng viên dạy đúng theo giáo trình.", | |
| "Đủ kiến thức cơ bản, không quá khó cũng không quá dễ.", | |
| "Môn học như các môn khác, không có gì nổi bật.", | |
| # Additional samples | |
| "Tôi rất thích cách thầy cô tổ chức hoạt động nhóm, rất hiệu quả.", | |
| "Phòng học quá nóng, thiết bị cũ, ảnh hưởng đến việc học.", | |
| "Tài liệu học tập đầy đủ, có cả online và offline.", | |
| "Bài tập nhiều nhưng không quá khó, giúp củng cố kiến thức.", | |
| "Lịch học ổn, không trùng với môn học quan trọng khác." | |
| ], | |
| "label": [ | |
| # Labels: 0 = Negative, 1 = Neutral, 2 = Positive | |
| 2, 2, 2, 2, 2, # Positive (5 samples) | |
| 0, 0, 0, 0, 0, # Negative (5 samples) | |
| 1, 1, 1, 1, 1, # Neutral (5 samples) | |
| 2, 0, 1, 1, 1 # Additional mixed (5 samples) | |
| ] | |
| } | |
| from datasets import Dataset | |
| # Create dataset | |
| full_dataset = Dataset.from_dict(sample_data) | |
| # Split dataset | |
| train_test_split = full_dataset.train_test_split(test_size=0.2, seed=42) | |
| train_val_split = train_test_split["train"].train_test_split(test_size=0.25, seed=42) | |
| self.dataset = DatasetDict({ | |
| "train": train_val_split["train"], | |
| "validation": train_val_split["test"], | |
| "test": train_test_split["test"] | |
| }) | |
| print(f"Created sample dataset with {len(self.dataset['train'])} training, {len(self.dataset['validation'])} validation, and {len(self.dataset['test'])} test samples") | |
| # Print distribution | |
| train_df = pd.DataFrame(self.dataset['train']) | |
| print("\nSentiment distribution in training set:") | |
| label_counts = train_df['label'].value_counts().sort_index() | |
| for label, count in label_counts.items(): | |
| sentiment_name = ["Negative", "Neutral", "Positive"][label] | |
| print(f" {sentiment_name} (label {label}): {count} samples") | |
| def run_fine_tuning(self, output_dir="./fine_tuned_sentiment_model", learning_rate=2e-5, batch_size=16, num_epochs=5): | |
| """Run the complete fine-tuning pipeline""" | |
| print("=" * 60) | |
| print("VIETNAMESE SENTIMENT ANALYSIS FINE-TUNING") | |
| print("=" * 60) | |
| # Load model and tokenizer | |
| self.load_model_and_tokenizer() | |
| # Load and prepare dataset | |
| self.load_and_prepare_dataset() | |
| # Tokenize datasets | |
| self.tokenize_datasets() | |
| # Setup trainer | |
| self.setup_trainer(output_dir, learning_rate, batch_size, num_epochs) | |
| # Train model | |
| train_result = self.train_model() | |
| # Evaluate model | |
| eval_results, y_pred, y_true = self.evaluate_model() | |
| # Plot results | |
| self.plot_training_history() | |
| self.plot_confusion_matrix(y_true, y_pred) | |
| print("=" * 60) | |
| print("FINE-TUNING COMPLETED SUCCESSFULLY!") | |
| print("=" * 60) | |
| print(f"Model saved to: {output_dir}") | |
| print(f"Final evaluation F1: {eval_results['eval_f1']:.4f}") | |
| print(f"Final evaluation accuracy: {eval_results['eval_accuracy']:.4f}") | |
| return train_result, eval_results | |
| def main(): | |
| """Main function to run the fine-tuning""" | |
| # Initialize the fine-tuner | |
| fine_tuner = SentimentFineTuner() | |
| # Run fine-tuning (matching original model configuration) | |
| train_result, eval_results = fine_tuner.run_fine_tuning( | |
| output_dir="./vietnamese_sentiment_finetuned", | |
| learning_rate=2e-5, # Same as original model | |
| batch_size=16, # Recommended batch size | |
| num_epochs=5 # Same as original model | |
| ) | |
| print("Fine-tuning completed successfully!") | |
| if __name__ == "__main__": | |
| main() | |