Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import numpy as np | |
| import os | |
| import datetime | |
| import torch | |
| import soundfile | |
| from wavmark.utils import file_reader | |
| import wavmark | |
| def my_read_file(audio_path, max_second, default_sr=16000): | |
| signal, sr, audio_length_second = file_reader.read_as_single_channel_16k(audio_path, default_sr) | |
| if audio_length_second > max_second: | |
| signal = signal[0:default_sr * max_second] | |
| audio_length_second = max_second | |
| return signal, sr, audio_length_second | |
| def add_watermark(audio_path, watermark_text, max_second_encode=60): | |
| assert len(watermark_text) == 16 | |
| watermark_npy = np.array([int(i) for i in watermark_text]) | |
| signal, sr, audio_length_second = my_read_file(audio_path, max_second_encode) | |
| watermarked_signal, _ = wavmark.encode_watermark(model, signal, watermark_npy, show_progress=False) | |
| tmp_file_name = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + "_" + watermark_text + ".wav" | |
| tmp_file_path = '/tmp/' + tmp_file_name | |
| soundfile.write(tmp_file_path, watermarked_signal, sr) | |
| return tmp_file_path | |
| def decode_watermark(audio_path, max_second_decode=30): | |
| assert os.path.exists(audio_path) | |
| signal, sr, audio_length_second = my_read_file(audio_path, max_second_decode) | |
| payload_decoded, _ = wavmark.decode_watermark(model, signal, show_progress=False) | |
| if payload_decoded is None: | |
| return "No Watermark" | |
| return "".join([str(i) for i in payload_decoded]) | |
| def create_default_value(len_start_bit=16): | |
| def_val_npy = np.random.choice([0, 1], size=32 - len_start_bit) | |
| return "".join([str(i) for i in def_val_npy]) | |
| def main(): | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| gr.Markdown("# Audio WaterMarking") | |
| with gr.Row(): | |
| gr.Markdown("You can upload an audio file and encode a custom 16-bit watermark or perform decoding from a watermarked audio. See [WaveMark toolkit](https://github.com/wavmark/wavmark) for further details.") | |
| with gr.Row(): | |
| audio_file = gr.Audio(label="Upload Audio", type="filepath") | |
| action = gr.Radio(["Add Watermark", "Decode Watermark"], label="Select Action") | |
| watermark_text = gr.Textbox(label="The watermark (0, 1 list of length-16):", value=create_default_value()) | |
| submit_button = gr.Button("Submit") | |
| with gr.Row(): | |
| output = gr.Audio(label="Processed Audio") | |
| decode_output = gr.Textbox(label="Decoded Watermark") | |
| def process_audio(audio_file, action, watermark_text): | |
| if action == "Add Watermark" and audio_file: | |
| return add_watermark(audio_file, watermark_text), None | |
| elif action == "Decode Watermark" and audio_file: | |
| return None, decode_watermark(audio_file) | |
| else: | |
| return None, None | |
| submit_button.click(process_audio, inputs=[audio_file, action, watermark_text], outputs=[output, decode_output]) | |
| demo.launch() | |
| if __name__ == "__main__": | |
| default_sr = 16000 | |
| max_second_encode = 60 | |
| max_second_decode = 30 | |
| len_start_bit = 16 | |
| device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') | |
| model = wavmark.load_model().to(device) | |
| main() | |