Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import speech_recognition as sr | |
| import numpy as np | |
| from pydub import AudioSegment | |
| import io | |
| import tempfile | |
| import os | |
| import wave | |
| import threading | |
| import queue | |
| import time | |
| # تنظیمات اولیه | |
| recognizer = sr.Recognizer() | |
| recognizer.energy_threshold = 400 # کمی بالاتر برای real-time | |
| recognizer.dynamic_energy_threshold = True | |
| recognizer.pause_threshold = 0.5 # حساستر به pause | |
| # صف برای accumulate chunkهای real-time | |
| audio_queue = queue.Queue(maxsize=10) # کوچکتر برای real-time | |
| PROCESS_INTERVAL = 3.0 # هر 3 ثانیه process | |
| MIN_DURATION = 1.5 # حداقل 1.5s صوت برای process | |
| # Transcript global | |
| current_transcript = "" | |
| transcript_lock = threading.Lock() | |
| # Background thread برای پردازش queue | |
| def background_processor(): | |
| accumulated = [] | |
| last_process = time.time() | |
| while True: | |
| try: | |
| # Get chunk if available | |
| if not audio_queue.empty(): | |
| audio_tuple = audio_queue.get(timeout=0.2) | |
| if audio_tuple and audio_tuple[1] is not None: | |
| rate, data = audio_tuple | |
| data = np.clip(data, -1.0, 1.0) # Normalize | |
| accumulated.append((rate, data)) | |
| print(f"Accumulated chunk: {len(data)/rate:.2f}s") | |
| # Process if interval passed and enough audio | |
| now = time.time() | |
| total_dur = sum(len(d)/r for r, d in accumulated) if accumulated else 0 | |
| if (now - last_process >= PROCESS_INTERVAL) and total_dur >= MIN_DURATION: | |
| if accumulated: | |
| merged_rate = accumulated[0][0] | |
| merged_data = np.concatenate([d for _, d in accumulated]) | |
| text = process_audio_chunk((merged_rate, merged_data)) | |
| if text and text not in ["", "[خطا در اتصال]"]: | |
| with transcript_lock: | |
| if current_transcript: | |
| current_transcript += " " + text | |
| else: | |
| current_transcript = text | |
| print(f"✅ Processed: {text[:50]}...") # Log کوتاه | |
| # Reset | |
| accumulated = [] | |
| last_process = now | |
| time.sleep(0.2) # Poll faster for responsiveness | |
| except Exception as e: | |
| print(f"Background error: {e}") | |
| time.sleep(1) | |
| # Start background thread | |
| processor_thread = threading.Thread(target=background_processor, daemon=True) | |
| processor_thread.start() | |
| def numpy_to_audio_segment(audio_data, sample_rate): | |
| if audio_data is None or len(audio_data) == 0: | |
| return None | |
| if audio_data.dtype in [np.float32, np.float64]: | |
| audio_data = np.clip(audio_data, -1.0, 1.0) | |
| audio_data = (audio_data * 32767).astype(np.int16) | |
| buffer = io.BytesIO() | |
| with wave.open(buffer, 'wb') as wav_file: | |
| wav_file.setnchannels(1) | |
| wav_file.setsampwidth(2) | |
| wav_file.setframerate(sample_rate) | |
| wav_file.writeframes(audio_data.tobytes()) | |
| buffer.seek(0) | |
| return AudioSegment.from_wav(buffer) | |
| def process_audio_chunk(audio_tuple): | |
| try: | |
| if audio_tuple is None: | |
| return "" | |
| sample_rate, audio_data = audio_tuple | |
| duration = len(audio_data) / sample_rate if sample_rate else 0 | |
| if duration < MIN_DURATION: | |
| return "" | |
| audio_segment = numpy_to_audio_segment(audio_data, sample_rate) | |
| if audio_segment is None or len(audio_segment) < MIN_DURATION * 1000: | |
| return "" | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| audio_segment.export(tmp.name, format="wav") | |
| tmp_path = tmp.name | |
| with sr.AudioFile(tmp_path) as source: | |
| recognizer.adjust_for_ambient_noise(source, duration=0.3) # Quick adjust | |
| audio = recognizer.record(source) | |
| # Google recognition | |
| text = "" | |
| try: | |
| text = recognizer.recognize_google(audio, language='fa-IR') # Persian first | |
| # اگر key داری: text = recognizer.recognize_google(audio, language='fa-IR', key="YOUR_GOOGLE_API_KEY") | |
| except sr.UnknownValueError: | |
| try: | |
| text = recognizer.recognize_google(audio, language='en-US') | |
| except sr.UnknownValueError: | |
| print("No speech in chunk") | |
| text = "" | |
| except sr.RequestError as e: | |
| print(f"Google API error: {e}") | |
| text = "[خطا اتصال]" | |
| if os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| return text.strip() | |
| except Exception as e: | |
| print(f"Process error: {e}") | |
| return "" | |
| def handle_realtime_audio(audio): | |
| """Handle incoming audio chunks from microphone""" | |
| if audio is None: | |
| return gr.update() | |
| try: | |
| audio_queue.put(audio, block=False) | |
| except queue.Full: | |
| print("Queue full, skip") | |
| return gr.update() | |
| def get_current_transcript(): | |
| """Poll transcript for UI update""" | |
| with transcript_lock: | |
| return current_transcript | |
| def clear_transcript(): | |
| global current_transcript | |
| with transcript_lock: | |
| current_transcript = "" | |
| # Clear queue | |
| while not audio_queue.empty(): | |
| try: | |
| audio_queue.get_nowait() | |
| except queue.Empty: | |
| break | |
| return "" | |
| # File transcription (unchanged) | |
| def transcribe_file(audio_file, chunk_duration=30): | |
| if audio_file is None: | |
| yield "لطفاً فایل آپلود کنید", "" | |
| return | |
| try: | |
| audio = AudioSegment.from_file(audio_file) | |
| duration_ms = len(audio) | |
| chunk_ms = chunk_duration * 1000 | |
| all_text = [] | |
| num_chunks = (duration_ms + chunk_ms - 1) // chunk_ms | |
| for i in range(0, duration_ms, chunk_ms): | |
| chunk = audio[i:i + chunk_ms] | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| chunk.export(tmp.name, format="wav") | |
| tmp_path = tmp.name | |
| try: | |
| with sr.AudioFile(tmp_path) as source: | |
| audio_data = recognizer.record(source) | |
| try: | |
| text = recognizer.recognize_google(audio_data, language='fa-IR') | |
| except sr.UnknownValueError: | |
| try: | |
| text = recognizer.recognize_google(audio_data, language='en-US') | |
| except sr.UnknownValueError: | |
| text = "" | |
| except sr.RequestError: | |
| text = "[خطا اتصال]" | |
| if text and text != "[خطا اتصال]": | |
| all_text.append(text) | |
| except Exception as e: | |
| print(f"File chunk error: {e}") | |
| if os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| progress = min((i + chunk_ms) / duration_ms * 100, 100) | |
| yield " ".join(all_text), f"پیشرفت: {progress:.1f}% - بخش {(i // chunk_ms)+1} از {num_chunks}" | |
| time.sleep(0.5) | |
| yield " ".join(all_text), "کامل شد! ✅" | |
| except Exception as e: | |
| yield f"خطا: {e}", "خطا ❌" | |
| def save_text(text): | |
| if not text.strip(): | |
| return None | |
| temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8') | |
| temp_file.write(text) | |
| temp_file.close() | |
| return temp_file.name | |
| # Gradio UI (Fixed: Separate clicks, Timer with load event) | |
| with gr.Blocks( | |
| title="تبدیل گفتار به متن - Real-time Fixed", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { font-family: 'Vazir', 'Tahoma', sans-serif !important; } | |
| .rtl { direction: rtl; text-align: right; } | |
| """ | |
| ) as demo: | |
| gr.HTML(""" | |
| <div style="text-align: center; max-width: 800px; margin: 0 auto;"> | |
| <h1 style="font-size: 2.5em; margin-bottom: 0.5em;">🎤 تبدیل گفتار به متن</h1> | |
| <p style="font-size: 1.1em; color: #666; margin-bottom: 2em;"> | |
| Real-time با Google API - پردازش background | قابل توزیع روی مرورگرها | |
| </p> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| with gr.TabItem("🎙️ ضبط مستقیم", id="realtime_tab") as realtime_tab: | |
| gr.Markdown("### فعال کنید و 5+ ثانیه واضح صحبت کنید (متن هر 3s آپدیت میشه)", elem_classes="rtl") | |
| with gr.Row(): | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| type="numpy", | |
| label="میکروفون (ضبط رو شروع کن)", | |
| elem_classes="rtl" | |
| ) | |
| realtime_output = gr.Textbox( | |
| label="متن live", | |
| placeholder="پس از 3-5s صحبت، متن ظاهر میشه...", | |
| lines=10, | |
| elem_classes="rtl", | |
| rtl=True, | |
| show_copy_button=True, | |
| interactive=False | |
| ) | |
| clear_btn = gr.Button("🗑️ پاک کردن", variant="secondary") | |
| # Events (Fixed: Separate clicks) | |
| audio_input.change( | |
| handle_realtime_audio, | |
| inputs=[audio_input], | |
| outputs=[realtime_output] # Update output on change | |
| ) | |
| # Timer for live update: Start when tab loads | |
| timer = gr.Timer(value=2.0, active=False) | |
| def start_timer(): | |
| timer.change(active=True) | |
| return get_current_transcript() | |
| realtime_tab.select(start_timer, outputs=[realtime_output]) | |
| timer.tick(get_current_transcript, outputs=[realtime_output]) | |
| clear_btn.click(clear_transcript, outputs=[realtime_output]) | |
| with gr.TabItem("📁 فایل صوتی"): | |
| gr.Markdown("### فایل آپلود کن و تبدیل کن", elem_classes="rtl") | |
| with gr.Row(): | |
| file_input = gr.Audio(sources=["upload"], type="filepath", label="فایل صوتی", elem_classes="rtl") | |
| chunk_slider = gr.Slider(10, 60, 30, 5, label="بخشبندی (s)", elem_classes="rtl") | |
| process_btn = gr.Button("🚀 تبدیل", variant="primary") | |
| progress_label = gr.Textbox(label="وضعیت", interactive=False, elem_classes="rtl") | |
| file_output = gr.Textbox(label="متن", lines=10, elem_classes="rtl", rtl=True, show_copy_button=True) | |
| with gr.Row(): | |
| save_btn = gr.Button("💾 ذخیره", variant="secondary") | |
| clear_file_btn = gr.Button("🗑️ پاک", variant="secondary") | |
| download_file = gr.File(label="دانلود TXT", visible=False, elem_classes="rtl") | |
| process_btn.click(transcribe_file, [file_input, chunk_slider], [file_output, progress_label]) | |
| save_btn.click(save_text, file_output, download_file).then(lambda: gr.update(visible=True), outputs=[download_file]) | |
| clear_file_btn.click(lambda: ("", ""), [file_output, progress_label]) | |
| with gr.Accordion("📖 راهنما", open=False, elem_classes="rtl"): | |
| gr.Markdown(""" | |
| ### استفاده: | |
| - **Real-time**: تب رو باز کن، میکروفون فعال کن، 5s+ صحبت کن. هر 3s متن آپدیت میشه (background). | |
| - **فایل**: آپلود و دکمه بزن. | |
| ### نکات: | |
| - 🗣️ واضح صحبت کن، نویز کم. | |
| - 🌐 اینترنت پایدار (Google API). | |
| - 📱 روی موبایل/دسکتاپ کار میکنه (mic access بده). | |
| - ⚠️ محدودیت Google: ~60 req/min - برای حجم بالا، API key اضافه کن (در کد کامنت). | |
| - توزیع: share لینک رو share کن، همه browserها ساپورت. | |
| """, elem_classes="rtl") | |
| gr.HTML('<div style="text-align: center; margin-top: 2em; padding: 1em; background: #f8f9fa;"><p style="color: #666;">نسخه 2.3 - Fixed Timer & Clicks | Google Backend</p></div>') | |
| if __name__ == "__main__": | |
| demo.queue().launch(share=True, show_error=True, server_name="0.0.0.0", server_port=7860) |