speh32 / app.py
suprimedev's picture
Update app.py
4742671 verified
import gradio as gr
import speech_recognition as sr
import numpy as np
from pydub import AudioSegment
import io
import tempfile
import os
import wave
import threading
import queue
import time
# تنظیمات اولیه
recognizer = sr.Recognizer()
recognizer.energy_threshold = 400 # کمی بالاتر برای real-time
recognizer.dynamic_energy_threshold = True
recognizer.pause_threshold = 0.5 # حساس‌تر به pause
# صف برای accumulate chunk‌های real-time
audio_queue = queue.Queue(maxsize=10) # کوچک‌تر برای real-time
PROCESS_INTERVAL = 3.0 # هر 3 ثانیه process
MIN_DURATION = 1.5 # حداقل 1.5s صوت برای process
# Transcript global
current_transcript = ""
transcript_lock = threading.Lock()
# Background thread برای پردازش queue
def background_processor():
accumulated = []
last_process = time.time()
while True:
try:
# Get chunk if available
if not audio_queue.empty():
audio_tuple = audio_queue.get(timeout=0.2)
if audio_tuple and audio_tuple[1] is not None:
rate, data = audio_tuple
data = np.clip(data, -1.0, 1.0) # Normalize
accumulated.append((rate, data))
print(f"Accumulated chunk: {len(data)/rate:.2f}s")
# Process if interval passed and enough audio
now = time.time()
total_dur = sum(len(d)/r for r, d in accumulated) if accumulated else 0
if (now - last_process >= PROCESS_INTERVAL) and total_dur >= MIN_DURATION:
if accumulated:
merged_rate = accumulated[0][0]
merged_data = np.concatenate([d for _, d in accumulated])
text = process_audio_chunk((merged_rate, merged_data))
if text and text not in ["", "[خطا در اتصال]"]:
with transcript_lock:
if current_transcript:
current_transcript += " " + text
else:
current_transcript = text
print(f"✅ Processed: {text[:50]}...") # Log کوتاه
# Reset
accumulated = []
last_process = now
time.sleep(0.2) # Poll faster for responsiveness
except Exception as e:
print(f"Background error: {e}")
time.sleep(1)
# Start background thread
processor_thread = threading.Thread(target=background_processor, daemon=True)
processor_thread.start()
def numpy_to_audio_segment(audio_data, sample_rate):
if audio_data is None or len(audio_data) == 0:
return None
if audio_data.dtype in [np.float32, np.float64]:
audio_data = np.clip(audio_data, -1.0, 1.0)
audio_data = (audio_data * 32767).astype(np.int16)
buffer = io.BytesIO()
with wave.open(buffer, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_data.tobytes())
buffer.seek(0)
return AudioSegment.from_wav(buffer)
def process_audio_chunk(audio_tuple):
try:
if audio_tuple is None:
return ""
sample_rate, audio_data = audio_tuple
duration = len(audio_data) / sample_rate if sample_rate else 0
if duration < MIN_DURATION:
return ""
audio_segment = numpy_to_audio_segment(audio_data, sample_rate)
if audio_segment is None or len(audio_segment) < MIN_DURATION * 1000:
return ""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
audio_segment.export(tmp.name, format="wav")
tmp_path = tmp.name
with sr.AudioFile(tmp_path) as source:
recognizer.adjust_for_ambient_noise(source, duration=0.3) # Quick adjust
audio = recognizer.record(source)
# Google recognition
text = ""
try:
text = recognizer.recognize_google(audio, language='fa-IR') # Persian first
# اگر key داری: text = recognizer.recognize_google(audio, language='fa-IR', key="YOUR_GOOGLE_API_KEY")
except sr.UnknownValueError:
try:
text = recognizer.recognize_google(audio, language='en-US')
except sr.UnknownValueError:
print("No speech in chunk")
text = ""
except sr.RequestError as e:
print(f"Google API error: {e}")
text = "[خطا اتصال]"
if os.path.exists(tmp_path):
os.unlink(tmp_path)
return text.strip()
except Exception as e:
print(f"Process error: {e}")
return ""
def handle_realtime_audio(audio):
"""Handle incoming audio chunks from microphone"""
if audio is None:
return gr.update()
try:
audio_queue.put(audio, block=False)
except queue.Full:
print("Queue full, skip")
return gr.update()
def get_current_transcript():
"""Poll transcript for UI update"""
with transcript_lock:
return current_transcript
def clear_transcript():
global current_transcript
with transcript_lock:
current_transcript = ""
# Clear queue
while not audio_queue.empty():
try:
audio_queue.get_nowait()
except queue.Empty:
break
return ""
# File transcription (unchanged)
def transcribe_file(audio_file, chunk_duration=30):
if audio_file is None:
yield "لطفاً فایل آپلود کنید", ""
return
try:
audio = AudioSegment.from_file(audio_file)
duration_ms = len(audio)
chunk_ms = chunk_duration * 1000
all_text = []
num_chunks = (duration_ms + chunk_ms - 1) // chunk_ms
for i in range(0, duration_ms, chunk_ms):
chunk = audio[i:i + chunk_ms]
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
chunk.export(tmp.name, format="wav")
tmp_path = tmp.name
try:
with sr.AudioFile(tmp_path) as source:
audio_data = recognizer.record(source)
try:
text = recognizer.recognize_google(audio_data, language='fa-IR')
except sr.UnknownValueError:
try:
text = recognizer.recognize_google(audio_data, language='en-US')
except sr.UnknownValueError:
text = ""
except sr.RequestError:
text = "[خطا اتصال]"
if text and text != "[خطا اتصال]":
all_text.append(text)
except Exception as e:
print(f"File chunk error: {e}")
if os.path.exists(tmp_path):
os.unlink(tmp_path)
progress = min((i + chunk_ms) / duration_ms * 100, 100)
yield " ".join(all_text), f"پیشرفت: {progress:.1f}% - بخش {(i // chunk_ms)+1} از {num_chunks}"
time.sleep(0.5)
yield " ".join(all_text), "کامل شد! ✅"
except Exception as e:
yield f"خطا: {e}", "خطا ❌"
def save_text(text):
if not text.strip():
return None
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8')
temp_file.write(text)
temp_file.close()
return temp_file.name
# Gradio UI (Fixed: Separate clicks, Timer with load event)
with gr.Blocks(
title="تبدیل گفتار به متن - Real-time Fixed",
theme=gr.themes.Soft(),
css="""
.gradio-container { font-family: 'Vazir', 'Tahoma', sans-serif !important; }
.rtl { direction: rtl; text-align: right; }
"""
) as demo:
gr.HTML("""
<div style="text-align: center; max-width: 800px; margin: 0 auto;">
<h1 style="font-size: 2.5em; margin-bottom: 0.5em;">🎤 تبدیل گفتار به متن</h1>
<p style="font-size: 1.1em; color: #666; margin-bottom: 2em;">
Real-time با Google API - پردازش background | قابل توزیع روی مرورگرها
</p>
</div>
""")
with gr.Tabs():
with gr.TabItem("🎙️ ضبط مستقیم", id="realtime_tab") as realtime_tab:
gr.Markdown("### فعال کنید و 5+ ثانیه واضح صحبت کنید (متن هر 3s آپدیت می‌شه)", elem_classes="rtl")
with gr.Row():
audio_input = gr.Audio(
sources=["microphone"],
type="numpy",
label="میکروفون (ضبط رو شروع کن)",
elem_classes="rtl"
)
realtime_output = gr.Textbox(
label="متن live",
placeholder="پس از 3-5s صحبت، متن ظاهر می‌شه...",
lines=10,
elem_classes="rtl",
rtl=True,
show_copy_button=True,
interactive=False
)
clear_btn = gr.Button("🗑️ پاک کردن", variant="secondary")
# Events (Fixed: Separate clicks)
audio_input.change(
handle_realtime_audio,
inputs=[audio_input],
outputs=[realtime_output] # Update output on change
)
# Timer for live update: Start when tab loads
timer = gr.Timer(value=2.0, active=False)
def start_timer():
timer.change(active=True)
return get_current_transcript()
realtime_tab.select(start_timer, outputs=[realtime_output])
timer.tick(get_current_transcript, outputs=[realtime_output])
clear_btn.click(clear_transcript, outputs=[realtime_output])
with gr.TabItem("📁 فایل صوتی"):
gr.Markdown("### فایل آپلود کن و تبدیل کن", elem_classes="rtl")
with gr.Row():
file_input = gr.Audio(sources=["upload"], type="filepath", label="فایل صوتی", elem_classes="rtl")
chunk_slider = gr.Slider(10, 60, 30, 5, label="بخش‌بندی (s)", elem_classes="rtl")
process_btn = gr.Button("🚀 تبدیل", variant="primary")
progress_label = gr.Textbox(label="وضعیت", interactive=False, elem_classes="rtl")
file_output = gr.Textbox(label="متن", lines=10, elem_classes="rtl", rtl=True, show_copy_button=True)
with gr.Row():
save_btn = gr.Button("💾 ذخیره", variant="secondary")
clear_file_btn = gr.Button("🗑️ پاک", variant="secondary")
download_file = gr.File(label="دانلود TXT", visible=False, elem_classes="rtl")
process_btn.click(transcribe_file, [file_input, chunk_slider], [file_output, progress_label])
save_btn.click(save_text, file_output, download_file).then(lambda: gr.update(visible=True), outputs=[download_file])
clear_file_btn.click(lambda: ("", ""), [file_output, progress_label])
with gr.Accordion("📖 راهنما", open=False, elem_classes="rtl"):
gr.Markdown("""
### استفاده:
- **Real-time**: تب رو باز کن، میکروفون فعال کن، 5s+ صحبت کن. هر 3s متن آپدیت می‌شه (background).
- **فایل**: آپلود و دکمه بزن.
### نکات:
- 🗣️ واضح صحبت کن، نویز کم.
- 🌐 اینترنت پایدار (Google API).
- 📱 روی موبایل/دسکتاپ کار می‌کنه (mic access بده).
- ⚠️ محدودیت Google: ~60 req/min - برای حجم بالا، API key اضافه کن (در کد کامنت).
- توزیع: share لینک رو share کن، همه browserها ساپورت.
""", elem_classes="rtl")
gr.HTML('<div style="text-align: center; margin-top: 2em; padding: 1em; background: #f8f9fa;"><p style="color: #666;">نسخه 2.3 - Fixed Timer & Clicks | Google Backend</p></div>')
if __name__ == "__main__":
demo.queue().launch(share=True, show_error=True, server_name="0.0.0.0", server_port=7860)