|
|
|
|
|
|
|
|
import argparse |
|
|
import base64 |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
from pathlib import Path |
|
|
import platform |
|
|
import re |
|
|
import urllib |
|
|
import urllib.parse |
|
|
import zipfile |
|
|
import tempfile |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import difflib |
|
|
import gradio as gr |
|
|
|
|
|
import log |
|
|
from project_settings import project_path, log_directory |
|
|
from toolbox.os.command import Command |
|
|
from toolbox.to_markdown.base_to_markdown import BaseToMarkdown |
|
|
from toolbox.unstructured_eval.unstructured_eval import ChunkSearcher, ChunkSimilarity |
|
|
|
|
|
log.setup_size_rotating(log_directory=log_directory) |
|
|
logger = logging.getLogger("main") |
|
|
|
|
|
|
|
|
def get_args(): |
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument( |
|
|
"--to_md_example_file_dir", |
|
|
default=(project_path / "data/files").as_posix(), |
|
|
type=str |
|
|
) |
|
|
parser.add_argument( |
|
|
"--chunk_similar_examples_json_file", |
|
|
default=(project_path / "examples_chunk_similar.json").as_posix(), |
|
|
type=str |
|
|
) |
|
|
args = parser.parse_args() |
|
|
return args |
|
|
|
|
|
|
|
|
latex_delimiters = [ |
|
|
{"left": "$$", "right": "$$", "display": True}, |
|
|
{"left": '$', "right": '$', "display": False} |
|
|
] |
|
|
|
|
|
|
|
|
def image_to_base64(image_path): |
|
|
with open(image_path, "rb") as image_file: |
|
|
return base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
|
|
|
|
|
def replace_image_with_base64(markdown_text: str, image_dir: str): |
|
|
pattern1 = r'\!\[(?:.*?)\]\((.+?)\)' |
|
|
def replace(match): |
|
|
relative_path = match.group(1) |
|
|
relative_path = urllib.parse.unquote(relative_path) |
|
|
full_path = os.path.join(image_dir, relative_path) |
|
|
base64_image = image_to_base64(full_path) |
|
|
return f"" |
|
|
markdown_text = re.sub(pattern1, replace, markdown_text) |
|
|
|
|
|
pattern2 = '<img src="(.+?)" id="(?:.*?)">' |
|
|
def replace(match): |
|
|
relative_path = match.group(1) |
|
|
full_path = os.path.join(image_dir, relative_path) |
|
|
base64_image = image_to_base64(full_path) |
|
|
return f"" |
|
|
markdown_text = re.sub(pattern2, replace, markdown_text) |
|
|
|
|
|
return markdown_text |
|
|
|
|
|
|
|
|
def load_markdown_from_zip(filename: str) -> str: |
|
|
filename = Path(filename) |
|
|
unzip_dir = filename.parent / filename.stem |
|
|
|
|
|
with zipfile.ZipFile(filename.as_posix(), "r") as f: |
|
|
f.extractall(unzip_dir) |
|
|
|
|
|
md_file = unzip_dir / f"{filename.stem}.md" |
|
|
|
|
|
with open(md_file, "r", encoding="utf-8") as f: |
|
|
md_content = f.read() |
|
|
|
|
|
return md_content |
|
|
|
|
|
|
|
|
to_md_engine_map = { |
|
|
"default": { |
|
|
"xlsx": "pandas", |
|
|
"html": "html_markdownify", |
|
|
"pdf": "pymupdf4llm", |
|
|
"pptx": "pptx2md", |
|
|
"docx": "docx2md", |
|
|
}, |
|
|
"aliyun": { |
|
|
"pdf": "aliyun", |
|
|
"html": "aliyun", |
|
|
|
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
def change_to_md_engine(engine: str): |
|
|
if engine not in to_md_engine_map.keys(): |
|
|
raise ValueError(f"Unsupported engine: {engine}") |
|
|
|
|
|
choices = to_md_engine_map[engine] |
|
|
choices = list(choices.keys()) |
|
|
return gr.Dropdown( |
|
|
choices=choices, |
|
|
value=choices[0], |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
|
|
|
def run_to_markdown(filename: str, engine: str, extension): |
|
|
|
|
|
logger.info(f"engine: {engine}, extension: {extension}, filename: {filename}") |
|
|
|
|
|
tmp_dir = Path(tempfile.gettempdir()) / "document_loaders" |
|
|
tmp_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
to_md_engine_name = to_md_engine_map[engine][extension] |
|
|
to_md: BaseToMarkdown = BaseToMarkdown.by_name(to_md_engine_name)(filename) |
|
|
output_zip_file = to_md.save_to_zip(output_dir=tmp_dir.as_posix()) |
|
|
|
|
|
text_content = load_markdown_from_zip(output_zip_file) |
|
|
|
|
|
image_dir = tmp_dir / Path(output_zip_file).stem |
|
|
md_content = replace_image_with_base64(text_content, image_dir=image_dir.as_posix()) |
|
|
|
|
|
return output_zip_file, md_content, text_content |
|
|
|
|
|
|
|
|
def run_chunk_similar(page_content: str, chunk: str, win_size_radio: float): |
|
|
searcher = ChunkSearcher() |
|
|
match_content = searcher.chunk_search( |
|
|
chunk, |
|
|
page_content, |
|
|
win_size_radio, |
|
|
) |
|
|
|
|
|
|
|
|
diff_list = difflib.Differ().compare(chunk, match_content) |
|
|
markdown_diff = "" |
|
|
for diff in diff_list: |
|
|
operation = diff[0] |
|
|
char = diff[-1] |
|
|
|
|
|
if operation == "-": |
|
|
if char in ["\t", "\n", "\r", "\f", "\v", "\u200B"]: |
|
|
char = f"{repr(char).strip("'")}{char}" |
|
|
markdown_diff += f"<span style=\"color:#000000; background-color:#FF0000\">{char}</span>" |
|
|
elif operation == "+": |
|
|
if char in ["\t", "\n", "\r", "\f", "\v", "\u200B"]: |
|
|
char = f"{repr(char).strip("'")}{char}" |
|
|
markdown_diff += f"<span style=\"color:#000000; background-color:#00CD00\">{char}</span>" |
|
|
else: |
|
|
markdown_diff += char |
|
|
|
|
|
|
|
|
chunk_similarity = ChunkSimilarity() |
|
|
scores = chunk_similarity.similar(chunk, match_content) |
|
|
|
|
|
markdown_scores = "" |
|
|
for idx, score in enumerate(scores): |
|
|
metric, score, note = score |
|
|
row = f"| {metric} | {score} | {note} |\n" |
|
|
markdown_scores += row |
|
|
if idx == 0: |
|
|
row= "| --- | --- | --- |\n" |
|
|
markdown_scores += row |
|
|
|
|
|
return match_content, markdown_diff, markdown_scores |
|
|
|
|
|
|
|
|
def shell(cmd: str): |
|
|
return Command.popen(cmd) |
|
|
|
|
|
|
|
|
def main(): |
|
|
args = get_args() |
|
|
|
|
|
|
|
|
to_md_example_file_dir = Path(args.to_md_example_file_dir) |
|
|
to_md_example_file_list = list() |
|
|
for filename in to_md_example_file_dir.glob("**/*.*"): |
|
|
filename = Path(filename).as_posix() |
|
|
to_md_example_file_list.append(filename) |
|
|
|
|
|
with open(args.chunk_similar_examples_json_file, "r", encoding="utf-8") as f: |
|
|
chunk_similar_examples = json.load(f) |
|
|
|
|
|
|
|
|
with gr.Blocks() as blocks: |
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("to_md"): |
|
|
with gr.Row(): |
|
|
with gr.Column(variant="panel", scale=5): |
|
|
engine_choices = list(to_md_engine_map.keys()) |
|
|
extension_choices = set() |
|
|
for engine in engine_choices: |
|
|
extension_choices.update(to_md_engine_map[engine].keys()) |
|
|
extension_choices = list(extension_choices) |
|
|
to_md_engine = gr.Dropdown(choices=engine_choices, value=engine_choices[0], label="engine") |
|
|
to_md_extension = gr.Dropdown(choices=extension_choices, value=extension_choices[0], label="extension") |
|
|
to_md_file = gr.File(value=None, label="file") |
|
|
with gr.Row(): |
|
|
to_md_button = gr.Button(value="convert", variant="primary") |
|
|
to_md_clear = gr.ClearButton(components=[to_md_file], value="clear") |
|
|
gr.Examples( |
|
|
examples=to_md_example_file_list, |
|
|
inputs=[to_md_file], |
|
|
) |
|
|
|
|
|
with gr.Column(variant="panel", scale=5): |
|
|
to_md_output_file = gr.File(label="convert result", interactive=False) |
|
|
with gr.Tabs(): |
|
|
with gr.Tab("Markdown rendering"): |
|
|
to_md_md = gr.Markdown( |
|
|
label="Markdown rendering", |
|
|
height=900, show_copy_button=True, |
|
|
latex_delimiters=latex_delimiters, line_breaks=True |
|
|
) |
|
|
with gr.Tab("Markdown text"): |
|
|
to_md_md_text = gr.TextArea(lines=45, show_copy_button=True) |
|
|
|
|
|
to_md_button.click( |
|
|
fn=run_to_markdown, |
|
|
inputs=[to_md_file, to_md_engine, to_md_extension], |
|
|
outputs=[to_md_output_file, to_md_md, to_md_md_text], |
|
|
) |
|
|
to_md_clear.add([ |
|
|
to_md_file, to_md_engine, to_md_extension, |
|
|
to_md_output_file, to_md_md, to_md_md_text, |
|
|
]) |
|
|
|
|
|
to_md_engine.change( |
|
|
change_to_md_engine, |
|
|
inputs=to_md_engine, |
|
|
outputs=to_md_extension, |
|
|
) |
|
|
|
|
|
with gr.TabItem("chunk_similar"): |
|
|
with gr.Row(): |
|
|
with gr.Column(variant="panel", scale=5): |
|
|
cs_page_content = gr.TextArea( |
|
|
label="page_content", |
|
|
lines=18, |
|
|
max_lines=100, |
|
|
) |
|
|
with gr.Column(variant="panel", scale=5): |
|
|
cs_chunk = gr.TextArea( |
|
|
label="chunk", |
|
|
) |
|
|
cs_match = gr.TextArea( |
|
|
label="match", |
|
|
) |
|
|
cs_diff = gr.Markdown( |
|
|
label="diff", |
|
|
latex_delimiters=latex_delimiters, line_breaks=True |
|
|
) |
|
|
with gr.Column(variant="panel", scale=5): |
|
|
cs_win_size_radio = gr.Slider(minimum=1, maximum=3, value=1.6, step=0.1, label="win_size_radio") |
|
|
with gr.Row(): |
|
|
cs_button = gr.Button(value="run", variant="primary") |
|
|
cs_clear = gr.ClearButton(value="clear") |
|
|
|
|
|
cs_scores = gr.Markdown( |
|
|
label="scores", |
|
|
) |
|
|
|
|
|
gr.Examples( |
|
|
examples=chunk_similar_examples, |
|
|
inputs=[cs_page_content, cs_chunk, cs_win_size_radio], |
|
|
outputs=[cs_match, cs_diff, cs_scores], |
|
|
fn=run_chunk_similar, |
|
|
) |
|
|
|
|
|
cs_button.click( |
|
|
fn=run_chunk_similar, |
|
|
inputs=[cs_page_content, cs_chunk, cs_win_size_radio], |
|
|
outputs=[cs_match, cs_diff, cs_scores], |
|
|
) |
|
|
cs_clear.add(components=[ |
|
|
cs_page_content, cs_chunk, cs_match, cs_scores |
|
|
]) |
|
|
|
|
|
with gr.TabItem("shell"): |
|
|
shell_text = gr.Textbox(label="cmd") |
|
|
shell_button = gr.Button("run") |
|
|
shell_output = gr.Textbox(label="output") |
|
|
|
|
|
shell_button.click( |
|
|
shell, |
|
|
inputs=[ |
|
|
shell_text, |
|
|
], |
|
|
outputs=[ |
|
|
shell_output |
|
|
], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
blocks.queue().launch( |
|
|
share=False if platform.system() == "Windows" else False, |
|
|
|
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
) |
|
|
return |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|