Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| import gradio as gr | |
| import pandas as pd | |
| import asyncio | |
| import aiohttp | |
| import dns.resolver | |
| import smtplib | |
| import socket | |
| from typing import List, Dict, Tuple | |
| from dataclasses import dataclass | |
| import random | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import re | |
| class EmailResult: | |
| email: str | |
| exists: bool | |
| smtp_valid: bool | |
| mx_valid: bool | |
| disposable: bool | |
| response_time: int | |
| risk_score: int | |
| class RealEmailValidator: | |
| def __init__(self): | |
| self.disposable_domains = self._load_disposable_domains() | |
| self.common_emails = self._load_common_patterns() | |
| def _load_disposable_domains(self): | |
| """Загрузка списка временных email-доменов""" | |
| return { | |
| 'tempmail.com', '10minutemail.com', 'guerrillamail.com', | |
| 'mailinator.com', 'yopmail.com', 'throwawaymail.com', | |
| 'fakeinbox.com', 'temp-mail.org', 'trashmail.com' | |
| } | |
| def _load_common_patterns(self): | |
| """Загрузка распространенных немецких имен для генерации""" | |
| german_first_names = [ | |
| 'max', 'paul', 'leon', 'felix', 'lukas', 'tim', 'david', 'elias', | |
| 'ben', 'jonas', 'luca', 'finn', 'sofia', 'emily', 'nina', 'jana', | |
| 'amelie', 'anna', 'lisa', 'maria', 'hannah', 'laura', 'lea', 'emma' | |
| ] | |
| german_last_names = [ | |
| 'muller', 'schmidt', 'schneider', 'fischer', 'weber', 'meyer', | |
| 'wagner', 'becker', 'schulz', 'hoffmann', 'schaefer', 'koch', | |
| 'richter', 'klein', 'wolf', 'schroeder', 'neumann', 'schwarz' | |
| ] | |
| return german_first_names, german_last_names | |
| def generate_german_emails(self, domain: str, count: int) -> List[str]: | |
| """Генерация реалистичных немецких email-адресов""" | |
| first_names, last_names = self.common_emails | |
| emails = [] | |
| patterns = [ | |
| '{first}.{last}', | |
| '{first}_{last}', | |
| '{first}{last}', | |
| '{last}.{first}', | |
| '{first[0]}{last}', | |
| '{first}{last}{number}', | |
| '{first}.{last}{number}' | |
| ] | |
| for i in range(count): | |
| pattern = random.choice(patterns) | |
| first = random.choice(first_names) | |
| last = random.choice(last_names) | |
| number = random.randint(1, 999) if '{number}' in pattern else '' | |
| email = pattern.format( | |
| first=first, | |
| last=last, | |
| number=number, | |
| first0=first[0] | |
| ) | |
| email = re.sub(r'[^a-zA-Z0-9._-]', '', email) + '@' + domain | |
| emails.append(email.lower()) | |
| return list(set(emails))[:count] | |
| async def check_mx_record(self, domain: str) -> bool: | |
| """Проверка MX-записей домена""" | |
| try: | |
| answers = dns.resolver.resolve(domain, 'MX') | |
| return len(answers) > 0 | |
| except: | |
| return False | |
| async def smtp_verify(self, email: str) -> Tuple[bool, int]: | |
| """Проверка email через SMTP""" | |
| start_time = time.time() | |
| domain = email.split('@')[1] | |
| try: | |
| # Получаем MX-записи | |
| mx_records = [] | |
| try: | |
| answers = dns.resolver.resolve(domain, 'MX') | |
| mx_records = sorted([(r.preference, str(r.exchange)) for r in answers]) | |
| except: | |
| return False, 0 | |
| if not mx_records: | |
| return False, 0 | |
| # Пробуем подключиться к SMTP-серверу | |
| for preference, mx in mx_records[:3]: # Проверяем первые 3 MX | |
| try: | |
| with smtplib.SMTP(timeout=10) as server: | |
| server.set_debuglevel(0) | |
| server.connect(mx) | |
| server.helo() | |
| server.mail('test@example.com') | |
| code, message = server.rcpt(email) | |
| response_time = int((time.time() - start_time) * 1000) | |
| # Коды ответов, указывающие на существование email | |
| if code in [250, 251]: | |
| return True, response_time | |
| else: | |
| return False, response_time | |
| except (smtplib.SMTPServerDisconnected, smtplib.SMTPConnectError, | |
| socket.timeout, socket.gaierror): | |
| continue | |
| except Exception as e: | |
| print(f"SMTP error for {email}: {e}") | |
| return False, int((time.time() - start_time) * 1000) | |
| def check_disposable(self, email: str) -> bool: | |
| """Проверка на временный email""" | |
| domain = email.split('@')[1].lower() | |
| return domain in self.disposable_domains | |
| def calculate_risk_score(self, email: str, exists: bool, disposable: bool) -> int: | |
| """Расчет оценки риска""" | |
| score = 0 | |
| if not exists: | |
| score += 80 | |
| if disposable: | |
| score += 90 | |
| # Проверка паттернов | |
| local_part = email.split('@')[0] | |
| if re.match(r'^[a-z]+\.[a-z]+$', local_part): # name.surname | |
| score -= 20 | |
| elif re.match(r'^[a-z]+[0-9]+$', local_part): # name123 | |
| score += 10 | |
| elif len(local_part) < 5: | |
| score += 30 | |
| return min(100, max(0, score)) | |
| async def validate_single_email(self, email: str) -> EmailResult: | |
| """Валидация одного email-адреса""" | |
| try: | |
| disposable = self.check_disposable(email) | |
| mx_valid = await self.check_mx_record(email.split('@')[1]) | |
| smtp_valid, response_time = await self.smtp_verify(email) | |
| exists = mx_valid and smtp_valid | |
| risk_score = self.calculate_risk_score(email, exists, disposable) | |
| return EmailResult( | |
| email=email, | |
| exists=exists, | |
| smtp_valid=smtp_valid, | |
| mx_valid=mx_valid, | |
| disposable=disposable, | |
| response_time=response_time, | |
| risk_score=risk_score | |
| ) | |
| except Exception as e: | |
| print(f"Validation error for {email}: {e}") | |
| return EmailResult( | |
| email=email, | |
| exists=False, | |
| smtp_valid=False, | |
| mx_valid=False, | |
| disposable=False, | |
| response_time=0, | |
| risk_score=100 | |
| ) | |
| async def validate_emails_batch(self, emails: List[str]) -> List[EmailResult]: | |
| """Пакетная валидация email-адресов""" | |
| tasks = [self.validate_single_email(email) for email in emails] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| valid_results = [] | |
| for result in results: | |
| if isinstance(result, EmailResult): | |
| valid_results.append(result) | |
| else: | |
| print(f"Error in validation: {result}") | |
| return valid_results | |
| # Инициализация валидатора | |
| validator = RealEmailValidator() | |
| async def validate_emails_interface(domain, count, use_custom_emails, custom_emails_text): | |
| """Интерфейс для Gradio""" | |
| # Генерация или использование кастомных email-адресов | |
| if use_custom_emails and custom_emails_text: | |
| emails = [email.strip() for email in custom_emails_text.split('\n') if email.strip()] | |
| else: | |
| emails = validator.generate_german_emails(domain, count) | |
| # Валидация | |
| results = await validator.validate_emails_batch(emails) | |
| # Подготовка данных для таблицы | |
| df_data = [] | |
| stats = { | |
| 'total': len(results), | |
| 'valid': 0, | |
| 'invalid': 0, | |
| 'disposable': 0, | |
| 'high_risk': 0 | |
| } | |
| for result in results: | |
| status = "✅ Valid" if result.exists else "❌ Invalid" | |
| mx_status = "✅" if result.mx_valid else "❌" | |
| smtp_status = "✅" if result.smtp_valid else "❌" | |
| disposable_status = "⚠️ Yes" if result.disposable else "✅ No" | |
| risk_color = "🔴" if result.risk_score > 70 else "🟡" if result.risk_score > 30 else "🟢" | |
| df_data.append({ | |
| "Email": result.email, | |
| "Status": status, | |
| "MX Record": mx_status, | |
| "SMTP Check": smtp_status, | |
| "Disposable": disposable_status, | |
| "Response Time": f"{result.response_time}ms", | |
| "Risk Score": f"{risk_color} {result.risk_score}%" | |
| }) | |
| # Статистика | |
| if result.exists: | |
| stats['valid'] += 1 | |
| else: | |
| stats['invalid'] += 1 | |
| if result.disposable: | |
| stats['disposable'] += 1 | |
| if result.risk_score > 70: | |
| stats['high_risk'] += 1 | |
| # Форматирование статистики | |
| stats_text = f""" | |
| ## 📊 Validation Statistics | |
| | Metric | Value | Percentage | | |
| |--------|-------|------------| | |
| | Total Emails | {stats['total']} | 100% | | |
| | Valid Emails | {stats['valid']} | {stats['valid']/stats['total']*100:.1f}% | | |
| | Invalid Emails | {stats['invalid']} | {stats['invalid']/stats['total']*100:.1f}% | | |
| | Disposable Emails | {stats['disposable']} | {stats['disposable']/stats['total']*100:.1f}% | | |
| | High Risk Emails | {stats['high_risk']} | {stats['high_risk']/stats['total']*100:.1f}% | | |
| """ | |
| df = pd.DataFrame(df_data) | |
| return df, stats_text | |
| def validate_emails_sync(domain, count, use_custom_emails, custom_emails_text): | |
| """Синхронная обертка для асинхронной функции""" | |
| return asyncio.run(validate_emails_interface(domain, count, use_custom_emails, custom_emails_text)) | |
| # Создание интерфейса Gradio | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Real Email Validator") as demo: | |
| gr.Markdown(""" | |
| # 🔍 Real Email Address Validator | |
| *Professional tool for real email validation and verification* | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### 🎯 Generation Settings") | |
| domain = gr.Dropdown( | |
| choices=["gmail.com", "yahoo.com", "hotmail.com", "outlook.com", | |
| "web.de", "gmx.de", "freenet.de", "t-online.de"], | |
| value="gmail.com", | |
| label="Target Domain" | |
| ) | |
| count = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| value=10, | |
| step=1, | |
| label="Number of Emails to Generate" | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("### 📧 Custom Emails") | |
| use_custom_emails = gr.Checkbox( | |
| label="Use custom email list", | |
| value=False | |
| ) | |
| custom_emails_text = gr.Textbox( | |
| label="Enter emails (one per line)", | |
| lines=5, | |
| placeholder="user1@domain.com\nuser2@domain.com\n...", | |
| visible=False | |
| ) | |
| validate_btn = gr.Button("🔍 Validate Emails", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| stats_output = gr.Markdown(label="Statistics") | |
| with gr.Row(): | |
| results_table = gr.Dataframe( | |
| headers=["Email", "Status", "MX Record", "SMTP Check", "Disposable", "Response Time", "Risk Score"], | |
| datatype=["str", "str", "str", "str", "str", "str", "str"], | |
| label="Email Validation Results", | |
| interactive=False, | |
| wrap=True | |
| ) | |
| gr.Markdown(""" | |
| ### 🔧 Validation Methods: | |
| - **MX Record Check**: Verifies domain mail server configuration | |
| - **SMTP Verification**: Real SMTP handshake to check email existence | |
| - **Disposable Detection**: Identifies temporary email services | |
| - **Risk Assessment**: Comprehensive risk scoring based on multiple factors | |
| ### ⚠️ Important Notes: | |
| - This tool performs **real SMTP verification** - use responsibly | |
| - Some mail servers may block verification attempts | |
| - Results may vary based on server configurations | |
| - Respect rate limits and terms of service | |
| - For educational and authorized testing only | |
| """) | |
| # Обработчики видимости | |
| def toggle_custom_emails(use_custom): | |
| return gr.Textbox(visible=use_custom) | |
| use_custom_emails.change( | |
| fn=toggle_custom_emails, | |
| inputs=[use_custom_emails], | |
| outputs=[custom_emails_text] | |
| ) | |
| # Основной обработчик | |
| validate_btn.click( | |
| fn=validate_emails_sync, | |
| inputs=[domain, count, use_custom_emails, custom_emails_text], | |
| outputs=[results_table, stats_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |