File size: 17,051 Bytes
1f29f01
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# import os
# _LOCAL_TMP = "/fsx/guilherme/tmp"
# try:
#     os.makedirs(_LOCAL_TMP, exist_ok=True)
#     os.environ.setdefault("TMPDIR", _LOCAL_TMP)
#     os.environ.setdefault("TEMP", _LOCAL_TMP)
#     os.environ.setdefault("TMP", _LOCAL_TMP)
#     _GRADIO_TMP = os.path.join(_LOCAL_TMP, "gradio")
#     os.makedirs(_GRADIO_TMP, exist_ok=True)
#     os.environ.setdefault("GRADIO_TEMP_DIR", _GRADIO_TMP)
# except Exception:
#     pass

import gradio as gr
from datatrove.pipeline.readers import ParquetReader
from default_wiki_pipeline import _parse_and_clean_wikicode, mwparserfromhell

lang_list = ['ab', 'ace', 'ady', 'af', 'als', 'alt', 'ami', 'am', 'ang', 'anp', 'an', 'arc', 'ar', 'ary', 'arz', 'ast', 'as', 'atj', 'avk', 'av', 'awa', 'ay', 'azb', 'az', 'ban', 'bar', 'bat_smg', 'ba', 'bbc', 'bcl', 'be', 'bg', 'bh', 'bi', 'bjn', 'blk', 'bm', 'bn', 'bo', 'bpy', 'br', 'bs', 'bug', 'bxr', 'ca', 'cbk_zam', 'cdo', 'ceb', 'ce', 'chr', 'ch', 'chy', 'ckb', 'co', 'crh', 'cr', 'csb', 'cs', 'cu', 'cv', 'cy', 'dag', 'da', 'de', 'dga', 'din', 'diq', 'dsb', 'dty', 'dv', 'dz', 'ee', 'el', 'eml', 'en', 'eo', 'es', 'et', 'eu', 'ext', 'fat', 'fa', 'ff', 'fiu_vro', 'fi', 'fj', 'fon', 'fo', 'frp', 'frr', 'fr', 'fur', 'fy', 'gag', 'gan', 'ga', 'gcr', 'gd', 'glk', 'gl', 'gn', 'gom', 'gor', 'got', 'gpe', 'guc', 'gur', 'gu', 'guw', 'gv', 'hak', 'ha', 'haw', 'he', 'hif', 'hi', 'hr', 'hsb', 'ht', 'hu', 'hy', 'hyw', 'ia', 'id', 'ie', 'ig', 'ik', 'ilo', 'inh', 'io', 'is', 'it', 'iu', 'jam', 'ja', 'jbo', 'jv', 'kaa', 'kab', 'ka', 'kbd', 'kbp', 'kcg', 'kg', 'ki', 'kk', 'kl', 'km', 'kn', 'koi', 'ko', 'krc', 'ksh', 'ks', 'ku', 'kv', 'kw', 'ky', 'lad', 'la', 'lbe', 'lb', 'lez', 'lfn', 'lg', 'lij', 'li', 'lld', 'lmo', 'ln', 'lo', 'ltg', 'lt', 'lv', 'mad', 'mai', 'map_bms', 'mdf', 'mg', 'mhr', 'min', 'mi', 'mk', 'ml', 'mni', 'mn', 'mnw', 'mrj', 'mr', 'ms', 'mt', 'mwl', 'myv', 'my', 'mzn', 'nah', 'nap', 'nds_nl', 'nds', 'ne', 'new', 'nia', 'nl', 'nn', 'nov', 'no', 'nqo', 'nrm', 'nso', 'nv', 'ny', 'oc', 'olo', 'om', 'or', 'os', 'pag', 'pam', 'pap', 'pa', 'pcd', 'pcm', 'pdc', 'pfl', 'pih', 'pi', 'pl', 'pms', 'pnb', 'pnt', 'ps', 'pt', 'pwn', 'qu', 'rm', 'rmy', 'rn', 'roa_rup', 'roa_tara', 'ro', 'rue', 'ru', 'rw', 'sah', 'sat', 'sa', 'scn', 'sco', 'sc', 'sd', 'se', 'sg', 'shi', 'shn', 'sh', 'simple', 'si', 'skr', 'sk', 'sl', 'smn', 'sm', 'sn', 'so', 'sq', 'srn', 'sr', 'ss', 'stq', 'st', 'su', 'sv', 'sw', 'szl', 'szy', 'ta', 'tay', 'tcy', 'tet', 'te', 'tg', 'th', 'ti', 'tk', 'tl', 'tly', 'tn', 'to', 'tpi', 'trv', 'tr', 'ts', 'tt', 'tum', 'tw', 'tyv', 'ty', 'udm', 'ug', 'uk', 'ur', 'uz', 'vec', 'vep', 've', 'vi', 'vls', 'vo', 'war', 'wa', 'wo', 'wuu', 'xal', 'xh', 'xmf', 'yi', 'yo', 'za', 'zea', 'zgh', 'zh_classical', 'zh_min_nan', 'zh_yue', 'zh', 'zu']


def _build_header_markdown(doc) -> str:
    meta = doc.metadata or {}
    title = meta.get("title") or ""
    page_id = meta.get("page_id") or meta.get("id") or ""
    wikidata_id = meta.get("wikidata_id") or ""
    url = meta.get("url") or ""
    parts = []
    if title:
        parts.append(f"**Title**: {title}")
    if page_id:
        parts.append(f"**Page ID**: {page_id}")
    if wikidata_id:
        parts.append(f"**Wikidata ID**: {wikidata_id}")
    header = "  |  ".join(parts)
    if url:
        header += f"\n[{url}]({url})"
    return header
def matches_filters(doc, require_has_math: bool | None, require_has_infobox: bool | None) -> bool:
    meta = doc.metadata or {}
    if require_has_math and not bool(meta.get("has_math")):
        return False
    if require_has_infobox and not meta.get("infoboxes"):
        return False
    return True


def find_next_matching_from(docs_cache, reader_iter, start_idx: int, require_has_math: bool | None, require_has_infobox: bool | None):
    # Scan cache first
    i = max(-1, start_idx)
    while i + 1 < len(docs_cache):
        i += 1
        if matches_filters(docs_cache[i], require_has_math, require_has_infobox):
            return i, docs_cache, reader_iter
    # Stream until found or exhausted
    while True:
        prev_len = len(docs_cache)
        docs_cache, reader_iter = _ensure_until_index(docs_cache, reader_iter, prev_len)
        if len(docs_cache) == prev_len:
            break
        if matches_filters(docs_cache[-1], require_has_math, require_has_infobox):
            return len(docs_cache) - 1, docs_cache, reader_iter
    return -1, docs_cache, reader_iter

def render_iframe(url: str, height: int = 800) -> str:
    safe_url = url or "about:blank"
    return (
        f'<iframe src="{safe_url}" '
        f'style="width:100%; height:{height}px; border:0;" loading="lazy"></iframe>'
    )


def _safe_url_from_metadata(meta: dict) -> str:
    meta = meta or {}
    return meta.get("url") or ""


def _extract_language(meta: dict) -> str:
    # Try common metadata fields for language code
    meta = meta or {}
    lang = meta.get("lang") or meta.get("language")
    if lang:
        return str(lang)
    wiki = meta.get("wiki") or meta.get("wikiname") or ""
    base = str(wiki).removesuffix("_namespace_0") if wiki else ""
    if base.endswith("wiki"):
        return base[:-4]
    return base or "en"


def _ensure_until_index(docs_cache, reader_iter, target_idx: int):
    if reader_iter is None:
        return docs_cache, reader_iter
    while len(docs_cache) <= target_idx:
        try:
            nxt = next(reader_iter)
        except StopIteration:
            break
        docs_cache.append(nxt)
    return docs_cache, reader_iter


def on_select_language(lang: str, require_has_math: bool, require_has_infobox: bool):
    """Load documents for the selected language from HF Parquet and display."""
    language = (lang or "").strip()
    if not language:
        return (-1, [], None, "Select a language.", {}, "", [], render_iframe(""))
    try:
        path = f"hf://datasets/HuggingFaceFW/finewiki/data/{language}wiki"
        reader_iter = ParquetReader(path)()
    except Exception as e:
        return (-1, [], None, f"Failed to read: {e}", {}, "", [], render_iframe(""))
    docs_cache = []
    docs_cache, reader_iter = _ensure_until_index(docs_cache, reader_iter, 0)
    if not docs_cache:
        return (-1, [], reader_iter, "No documents found.", {}, "", [], render_iframe(""))
    # Find first doc matching filters (starting before 0)
    idx, docs_cache, reader_iter = find_next_matching_from(docs_cache, reader_iter, -1, require_has_math, require_has_infobox)
    if idx == -1:
        return (-1, docs_cache, reader_iter, "No documents match filters.", {}, "", [], render_iframe(""))
    left, left_meta, md, info, right, header = render_idx(docs_cache, idx)
    return (idx, docs_cache, reader_iter, left, left_meta, header, md, info, right)


def on_find(docs_cache, idx: int, reader_iter, id_query: str, require_has_math: bool, require_has_infobox: bool):
    query = (id_query or "").strip()
    if not docs_cache and reader_iter is None:
        return -1, docs_cache, reader_iter, "No documents loaded.", {}, "", [], render_iframe("")
    if not query:
        docs_cache, reader_iter = _ensure_until_index(docs_cache, reader_iter, 0)
        new_idx, docs_cache, reader_iter = find_next_matching_from(docs_cache, reader_iter, -1, require_has_math, require_has_infobox)
        if new_idx == -1:
            return (-1, docs_cache, reader_iter, "No documents match filters.", {}, "", [], render_iframe(""))
        left, left_meta, md, info, right, header = render_idx(docs_cache, new_idx)
        return new_idx, docs_cache, reader_iter, left, left_meta, header, md, info, right
    # Exact match in cache
    for i, doc in enumerate(docs_cache):
        meta = (getattr(doc, "metadata", None) or {})
        doc_id = (getattr(doc, "id", None) or "")
        url = meta.get("url") or ""
        if doc_id == query or meta.get("wikidata_id") == query or url == query:
            left, left_meta, md, info, right, header = render_idx(docs_cache, i)
            if matches_filters(doc, require_has_math, require_has_infobox):
                return i, docs_cache, reader_iter, left, left_meta, header, md, info, right
            new_idx, docs_cache, reader_iter = find_next_matching_from(docs_cache, reader_iter, i, require_has_math, require_has_infobox)
            if new_idx == -1:
                return (-1, docs_cache, reader_iter, "No documents match filters.", {}, "", [], render_iframe(""))
            left, left_meta, md, info, right, header = render_idx(docs_cache, new_idx)
            return new_idx, docs_cache, reader_iter, left, left_meta, header, md, info, right
    # Suffix match in cache
    for i, doc in enumerate(docs_cache):
        doc_id = (getattr(doc, "id", None) or "")
        meta = (getattr(doc, "metadata", None) or {})
        url = meta.get("url") or ""
        if doc_id.endswith(f"/{query}") or url.endswith(query):
            left, left_meta, md, info, right, header = render_idx(docs_cache, i)
            if matches_filters(doc, require_has_math, require_has_infobox):
                return i, docs_cache, reader_iter, left, left_meta, header, md, info, right
            new_idx, docs_cache, reader_iter = find_next_matching_from(docs_cache, reader_iter, i, require_has_math, require_has_infobox)
            if new_idx == -1:
                return (-1, docs_cache, reader_iter, "No documents match filters.", {}, "", [], render_iframe(""))
            left, left_meta, md, info, right, header = render_idx(docs_cache, new_idx)
            return new_idx, docs_cache, reader_iter, left, left_meta, header, md, info, right
    # Stream forward until found or exhausted
    found_idx = None
    while True:
        prev_len = len(docs_cache)
        docs_cache, reader_iter = _ensure_until_index(docs_cache, reader_iter, prev_len)
        if len(docs_cache) == prev_len:
            break
        doc = docs_cache[-1]
        meta = (getattr(doc, "metadata", None) or {})
        doc_id = (getattr(doc, "id", None) or "")
        url = meta.get("url") or ""
        if doc_id == query or meta.get("wikidata_id") == query or url.endswith(query) or url == query or doc_id.endswith(f"/{query}"):
            found_idx = len(docs_cache) - 1
            break
    if found_idx is not None:
        new_idx, docs_cache, reader_iter = find_next_matching_from(docs_cache, reader_iter, found_idx - 1, require_has_math, require_has_infobox)
        if new_idx == -1:
            return (-1, docs_cache, reader_iter, "No documents match filters.", {}, "", [], render_iframe(""))
        left, left_meta, md, info, right, header = render_idx(docs_cache, new_idx)
        return new_idx, docs_cache, reader_iter, left, left_meta, header, md, info, right
    target_idx = 0 if docs_cache else -1
    if target_idx == -1:
        return -1, docs_cache, reader_iter, "No documents found.", {}, "", [], render_iframe("")
    new_idx, docs_cache, reader_iter = find_next_matching_from(docs_cache, reader_iter, target_idx - 1, require_has_math, require_has_infobox)
    if new_idx == -1:
        return (-1, docs_cache, reader_iter, "No documents match filters.", {}, "", [], render_iframe(""))
    left, left_meta, md, info, right, header = render_idx(docs_cache, new_idx)
    return new_idx, docs_cache, reader_iter, left, left_meta, header, md, info, right


def show_doc(doc):
    left = getattr(doc, "text", "")
    meta = getattr(doc, "metadata", None) or {}
    # Clean markdown using default_wiki_pipeline helper
    md_text = meta.get("wikitext")
    md_clean = _parse_and_clean_wikicode(md_text, parser=mwparserfromhell, language=_extract_language(meta))
    info = meta.get("infoboxes", [])
    right = render_iframe(_safe_url_from_metadata(meta))
    header = _build_header_markdown(doc)
    return left, meta, md_clean, info, right, header


def render_idx(docs, idx: int):
    if not docs:
        return "No documents.", {}, "", [], render_iframe(""), ""
    idx = max(0, min(idx, len(docs) - 1))
    doc = docs[idx]
    left, left_meta, md, info, right, header = show_doc(doc)
    return left, left_meta, md, info, right, header


def on_prev(docs_cache, idx: int, reader_iter, require_has_math: bool, require_has_infobox: bool):
    if not docs_cache:
        # Try to ensure at least first doc is loaded
        docs_cache, reader_iter = _ensure_until_index(docs_cache, reader_iter, 0)
        if not docs_cache:
            return idx, docs_cache, reader_iter, "No documents.", {}, "", [], render_iframe("")
    new_idx = max(0, idx - 1)
    # Apply filters going backwards by scanning from start to new_idx
    filtered_idx = new_idx
    if new_idx >= 0:
        for i in range(new_idx, -1, -1):
            if matches_filters(docs_cache[i], require_has_math, require_has_infobox):
                filtered_idx = i
                break
    left, left_meta, md, info, right, header = render_idx(docs_cache, filtered_idx)
    return filtered_idx, docs_cache, reader_iter, left, left_meta, header, md, info, right


def on_next(docs_cache, idx: int, reader_iter, require_has_math: bool, require_has_infobox: bool):
    target_idx = idx + 1 if idx >= 0 else 0
    docs_cache, reader_iter = _ensure_until_index(docs_cache, reader_iter, target_idx)
    if not docs_cache:
        return idx, docs_cache, reader_iter, "No documents.", {}, "", [], render_iframe("")
    new_idx = min(len(docs_cache) - 1, target_idx)
    # Apply filters forward
    new_idx, docs_cache, reader_iter = find_next_matching_from(docs_cache, reader_iter, idx, require_has_math, require_has_infobox)
    if new_idx == -1:
        return idx, docs_cache, reader_iter, "No documents match filters.", {}, "", [], render_iframe("")
    left, left_meta, md, info, right, header = render_idx(docs_cache, new_idx)
    return new_idx, docs_cache, reader_iter, left, left_meta, header, md, info, right


with gr.Blocks() as demo:
    idx_state = gr.State(value=-1)
    docs_state = gr.State(value=[])
    iter_state = gr.State(value=None)

    with gr.Row():
        # Full-width controls row for navigation
        with gr.Column():
            with gr.Row():
                language_select = gr.Dropdown(choices=lang_list, value="en", label="Language")
            with gr.Row():
                prev_btn = gr.Button("Previous")
                next_btn = gr.Button("Next")
            header_md = gr.Markdown()
        with gr.Column():
            with gr.Row():
                require_has_math = gr.Checkbox(label="Has math", value=False)
                require_has_infobox = gr.Checkbox(label="Has infobox", value=False)
            with gr.Row():
                id_input = gr.Textbox(label="Wikidata ID/URL/Page ID", placeholder="e.g., Q42 or https://... or 12345", lines=1)
                find_btn = gr.Button("Find")
            with gr.Row():
                show_wiki = gr.Checkbox(label="Show wikimedia/wikipedia extraction", value=False)
                show_preview = gr.Checkbox(label="Show preview", value=True)
                show_infoboxes = gr.Checkbox(label="Show infoboxes", value=True)
    with gr.Row():
        with gr.Column():
            left_text = gr.Textbox(label="FineWiki extractions", lines=30)
            left_meta = gr.JSON(label="Metadata")
        with gr.Column():
            right_markdown = gr.Textbox(label="wikimedia/wikipedia extraction", lines=30)
            right_iframe = gr.HTML(label="Original Page")
            right_infoboxes = gr.JSON(label="Infoboxes")

    language_select.change(on_select_language, inputs=[language_select, require_has_math, require_has_infobox], outputs=[idx_state, docs_state, iter_state, left_text, left_meta, header_md, right_markdown, right_infoboxes, right_iframe])
    demo.load(on_select_language, inputs=[language_select, require_has_math, require_has_infobox], outputs=[idx_state, docs_state, iter_state, left_text, left_meta, header_md, right_markdown, right_infoboxes, right_iframe])
    find_btn.click(on_find, inputs=[docs_state, idx_state, iter_state, id_input, require_has_math, require_has_infobox], outputs=[idx_state, docs_state, iter_state, left_text, left_meta, header_md, right_markdown, right_infoboxes, right_iframe])

    # Visibility toggles driven directly by checkbox changes
    show_wiki.change(lambda v: gr.update(visible=v), inputs=[show_wiki], outputs=[right_markdown])
    show_preview.change(lambda v: gr.update(visible=v), inputs=[show_preview], outputs=[right_iframe])
    show_infoboxes.change(lambda v: gr.update(visible=v), inputs=[show_infoboxes], outputs=[right_infoboxes])
    prev_btn.click(on_prev, inputs=[docs_state, idx_state, iter_state, require_has_math, require_has_infobox], outputs=[idx_state, docs_state, iter_state, left_text, left_meta, header_md, right_markdown, right_infoboxes, right_iframe])
    next_btn.click(on_next, inputs=[docs_state, idx_state, iter_state, require_has_math, require_has_infobox], outputs=[idx_state, docs_state, iter_state, left_text, left_meta, header_md, right_markdown, right_infoboxes, right_iframe])


if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7641)