import gradio as gr import pandas as pd import random import string import re import asyncio import aiohttp import smtplib import dns.resolver from email.utils import parseaddr import time from datetime import datetime import json from typing import List, Dict, Tuple, Optional import threading from concurrent.futures import ThreadPoolExecutor import csv # Конфигурация CONFIG = { "domains": ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "example.com"], "common_names": ["john", "jane", "mike", "sarah", "david", "lisa", "robert", "emma"], "smtp_servers": { "gmail.com": ("smtp.gmail.com", 587), "yahoo.com": ("smtp.mail.yahoo.com", 587), "hotmail.com": ("smtp-mail.outlook.com", 587), "outlook.com": ("smtp-mail.outlook.com", 587), } } class EmailValidator: def __init__(self): self.session = None self.results_cache = {} async def __aenter__(self): self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=10), connector=aiohttp.TCPConnector(limit=100) ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() def validate_format(self, email: str) -> bool: """Проверка формата email""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None def check_mx_record(self, domain: str) -> bool: """Проверка MX записи домена""" try: records = dns.resolver.resolve(domain, 'MX') return len(records) > 0 except: return False async def check_smtp_connection(self, email: str) -> Dict[str, any]: """Проверка SMTP соединения""" domain = email.split('@')[1] if domain not in CONFIG["smtp_servers"]: return {"exists": "unknown", "method": "smtp_not_supported"} smtp_host, smtp_port = CONFIG["smtp_servers"][domain] try: with smtplib.SMTP(smtp_host, smtp_port, timeout=10) as server: server.starttls() # Проверка существования email через VRFY (если поддерживается) try: response = server.verify(email) if response.startswith("250"): return {"exists": "valid", "method": "smtp_vrfy"} except: pass # Альтернативная проверка через RCPT TO server.mail("test@example.com") code, message = server.rcpt(email) if code == 250: return {"exists": "valid", "method": "smtp_rcpt"} else: return {"exists": "invalid", "method": "smtp_rcpt"} except Exception as e: return {"exists": "error", "method": "smtp_error", "error": str(e)} async def check_disposable_email(self, email: str) -> bool: """Проверка на временный email""" domain = email.split('@')[1] disposable_domains = [ "10minutemail.com", "guerrillamail.com", "mailinator.com", "tempmail.org", "yopmail.com", "throwaway.email" ] return domain in disposable_domains async def validate_email(self, email: str) -> Dict[str, any]: """Комплексная проверка email""" if email in self.results_cache: return self.results_cache[email] result = { "email": email, "format_valid": False, "domain_valid": False, "mx_valid": False, "smtp_valid": "unknown", "is_disposable": False, "timestamp": datetime.now().isoformat() } # Проверка формата result["format_valid"] = self.validate_format(email) if not result["format_valid"]: result["overall_status"] = "invalid" self.results_cache[email] = result return result # Проверка домена domain = email.split('@')[1] result["domain_valid"] = len(domain) > 0 and '.' in domain # Проверка MX записи result["mx_valid"] = self.check_mx_record(domain) # Проверка на временный email result["is_disposable"] = await self.check_disposable_email(email) # SMTP проверка (асинхронная) smtp_result = await self.check_smtp_connection(email) result["smtp_valid"] = smtp_result["exists"] result["smtp_method"] = smtp_result["method"] # Определение общего статуса if result["smtp_valid"] == "valid": result["overall_status"] = "valid" elif result["smtp_valid"] == "invalid": result["overall_status"] = "invalid" elif result["is_disposable"]: result["overall_status"] = "disposable" elif not result["mx_valid"]: result["overall_status"] = "invalid_domain" else: result["overall_status"] = "unknown" self.results_cache[email] = result return result class EmailGenerator: @staticmethod def generate_single(domain: str = None, name_pattern: str = "random") -> str: """Генерация одного email""" if domain is None: domain = random.choice(CONFIG["domains"]) if name_pattern == "random": name = random.choice(CONFIG["common_names"]) elif name_pattern == "numbers": name = random.choice(CONFIG["common_names"]) + str(random.randint(1, 999)) else: # Случайная строка name = ''.join(random.choices(string.ascii_lowercase, k=8)) return f"{name}@{domain}" @staticmethod def generate_batch(count: int, domain: str = None, name_pattern: str = "random") -> List[str]: """Генерация списка email""" emails = [] for _ in range(count): emails.append(EmailGenerator.generate_single(domain, name_pattern)) return emails def generate_emails(count: int, domain: str, name_pattern: str) -> List[str]: """Генерация email для Gradio""" try: count = int(count) if count <= 0 or count > 1000: return ["Ошибка: количество должно быть от 1 до 1000"] domain = domain if domain else None emails = EmailGenerator.generate_batch(count, domain, name_pattern) return emails except Exception as e: return [f"Ошибка генерации: {str(e)}"] async def validate_emails_batch(emails: List[str], progress=gr.Progress()) -> Tuple[List[Dict], str]: """Проверка списка email""" if not emails: return [], "Нет email для проверки" results = [] async with EmailValidator() as validator: tasks = [] for i, email in enumerate(emails): if email and '@' in email: tasks.append(validator.validate_email(email.strip())) # Обработка с прогресс-баром total = len(tasks) for i, task in enumerate(asyncio.as_completed(tasks)): try: result = await task results.append(result) progress((i + 1) / total, f"Проверено {i + 1}/{total} email") except Exception as e: results.append({ "email": emails[i], "error": str(e), "overall_status": "error" }) return results, f"Проверено {len(results)} email" def validate_emails_sync(emails: List[str]) -> Tuple[List[Dict], str]: """Синхронная обертка для асинхронной проверки""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(validate_emails_batch(emails)) finally: loop.close() def export_results(results: List[Dict], format_type: str) -> str: """Экспорт результатов""" if not results: return "Нет результатов для экспорта" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") try: if format_type == "CSV": filename = f"email_validation_{timestamp}.csv" with open(filename, 'w', newline='', encoding='utf-8') as csvfile: if results and "error" not in results[0]: fieldnames = ["email", "overall_status", "format_valid", "domain_valid", "mx_valid", "smtp_valid", "is_disposable", "timestamp"] else: fieldnames = ["email", "error", "overall_status"] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(results) return f"Экспортировано в {filename}" elif format_type == "JSON": filename = f"email_validation_{timestamp}.json" with open(filename, 'w', encoding='utf-8') as jsonfile: json.dump(results, jsonfile, indent=2, ensure_ascii=False) return f"Экспортировано в {filename}" elif format_type == "Excel": filename = f"email_validation_{timestamp}.xlsx" df = pd.DataFrame(results) df.to_excel(filename, index=False) return f"Экспортировано в {filename}" except Exception as e: return f"Ошибка экспорта: {str(e)}" def get_statistics(results: List[Dict]) -> Dict: """Получение статистики""" if not results: return {} stats = { "total": len(results), "valid": 0, "invalid": 0, "disposable": 0, "unknown": 0, "error": 0 } for result in results: status = result.get("overall_status", "error") if status in stats: stats[status] += 1 return stats def create_interface(): """Создание интерфейса""" with gr.Blocks( title="Email Generator & Validator", theme=gr.themes.Soft(), css=""" .header { text-align: center; margin-bottom: 2rem; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 2rem; border-radius: 10px; color: white; } .stats-box { background: #f8f9fa; padding: 1rem; border-radius: 8px; margin: 1rem 0; } .success { color: #28a745; font-weight: bold; } .error { color: #dc3545; font-weight: bold; } .warning { color: #ffc107; font-weight: bold; } """ ) as demo: with gr.Row(): gr.HTML("""

📧 Email Generator & Validator

Генерация и проверка email адресов с расширенной функциональностью

Built with anycoder

""") with gr.Tabs(): # Таб 1: Генерация email with gr.TabItem("🔧 Генерация Email"): with gr.Row(): with gr.Column(scale=2): gr.Markdown("### Настройки генерации") count_input = gr.Number( label="Количество email", value=10, minimum=1, maximum=1000, step=1 ) domain_input = gr.Dropdown( label="Домен (оставьте пустым для случайного)", choices=CONFIG["domains"] + [""], value="" ) pattern_input = gr.Radio( label="Шаблон имени", choices=[ ("Случайное имя", "random"), ("Имя + цифры", "numbers"), ("Случайная строка", "random_string") ], value="random" ) generate_btn = gr.Button("🚀 Сгенерировать", variant="primary") with gr.Column(scale=3): generated_emails = gr.Textbox( label="Сгенерированные email", lines=10, placeholder="Здесь появятся сгенерированные email..." ) generate_btn.click( fn=generate_emails, inputs=[count_input, domain_input, pattern_input], outputs=generated_emails ) # Таб 2: Валидация email with gr.TabItem("✅ Валидация Email"): with gr.Row(): with gr.Column(): gr.Markdown("### Ввод email для проверки") email_input = gr.Textbox( label="Email адреса (каждый с новой строки)", lines=10, placeholder="Введите email адреса для проверки..." ) validate_btn = gr.Button("🔍 Проверить", variant="primary") with gr.Column(): gr.Markdown("### Результаты проверки") validation_results = gr.Dataframe( label="Результаты", headers=["Email", "Статус", "Формат", "Домен", "MX", "SMTP"], datatype=["str", "str", "bool", "bool", "bool", "str"], interactive=False ) validate_btn.click( fn=validate_emails_sync, inputs=email_input, outputs=[validation_results, gr.Textbox(visible=False)] ) # Таб 3: Статистика и экспорт with gr.TabItem("📊 Статистика и Экспорт"): with gr.Row(): with gr.Column(): gr.Markdown("### Статистика проверки") stats_display = gr.JSON(label="Статистика") gr.Markdown("### Экспорт результатов") format_choice = gr.Radio( label="Формат экспорта", choices=["CSV", "JSON", "Excel"], value="CSV" ) export_btn = gr.Button("💾 Экспортировать", variant="secondary") export_status = gr.Textbox(label="Статус экспорта") with gr.Column(): gr.Markdown("### Детальные результаты") detailed_results = gr.Dataframe( label="Все результаты", interactive=False ) # Функции обновления def update_stats_and_details(emails_list): results, _ = validate_emails_sync(emails_list.split('\n') if emails_list else []) stats = get_statistics(results) # Форматирование результатов для отображения formatted_results = [] for r in results: formatted_results.append([ r.get("email", ""), r.get("overall_status", ""), r.get("format_valid", False), r.get("domain_valid", False), r.get("mx_valid", False), r.get("smtp_valid", "unknown") ]) return stats, formatted_results, results # Привязка событий email_input.change( fn=update_stats_and_details, inputs=email_input, outputs=[stats_display, detailed_results, gr.State()] ) def export_from_state(results, format_type): return export_results(results, format_type) export_btn.click( fn=export_from_state, inputs=[gr.State(), format_choice], outputs=export_status ) return demo if __name__ == "__main__": demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False )