Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import posixpath | |
| from fastapi import HTTPException, Path, Query, Request | |
| from fastapi.responses import StreamingResponse | |
| from gradio_huggingfacehub_search import HuggingfaceHubSearch | |
| from huggingface_hub import HfApi, HfFileSystem | |
| from typing import Annotated, Any, NamedTuple | |
| from urllib.parse import urlencode | |
| from _hf_explorer import FileExplorer | |
| from _hf_gguf import standard_metadata, GGUFValueType, HuggingGGUFstream | |
| hfapi = HfApi() | |
| class MetadataState(NamedTuple): | |
| var: dict[str, Any] | |
| key: dict[str, tuple[int, Any]] | |
| add: dict[str, Any] | |
| rem: set | |
| def init_state( | |
| ): | |
| return MetadataState( | |
| var = {}, | |
| key = {}, | |
| add = {}, | |
| rem = set(), | |
| ) | |
| def human_readable_metadata( | |
| typ: int, | |
| val: Any, | |
| ) -> tuple[str, Any]: | |
| typ = GGUFValueType(typ).name | |
| if typ == 'ARRAY': | |
| val = '[[...], ...]' | |
| elif isinstance(val, list): | |
| typ = f'[{typ}][{len(val)}]' | |
| if len(val) > 8: | |
| val = str(val[:8])[:-1] + ', ...]' | |
| else: | |
| val = str(val) | |
| return typ, val | |
| with gr.Blocks( | |
| ) as blocks: | |
| with gr.Row(): | |
| hf_search = HuggingfaceHubSearch( | |
| label = "Search Huggingface Hub", | |
| placeholder = "Search for models on Huggingface", | |
| search_type = "model", | |
| sumbit_on_select = True, | |
| scale = 2, | |
| ) | |
| hf_branch = gr.Dropdown( | |
| None, | |
| label = "Branch", | |
| scale = 1, | |
| ) | |
| gr.LoginButton( | |
| "Sign in to access gated/private repos", | |
| scale = 1, | |
| ) | |
| hf_file = FileExplorer( | |
| visible=False, | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| meta_keys = gr.Dropdown( | |
| None, | |
| label = "Modify Metadata", | |
| allow_custom_value = True, | |
| visible = False, | |
| ) | |
| with gr.Column(): | |
| meta_types = gr.Dropdown( | |
| [e.name for e in GGUFValueType], | |
| label = "Metadata Type", | |
| type = "index", | |
| visible = False, | |
| ) | |
| with gr.Column(): | |
| btn_delete = gr.Button( | |
| "Remove Key", | |
| variant = 'stop', | |
| visible = False, | |
| ) | |
| meta_boolean = gr.Checkbox( | |
| visible = False, | |
| ) | |
| with gr.Row(): | |
| # Too slow unfortunately, needs a proper search box | |
| # meta_lookup = gr.Dropdown( | |
| # label = 'Lookup token', | |
| # type = 'index', | |
| # visible = False, | |
| # ) | |
| meta_number = gr.Number( | |
| visible = False, | |
| ) | |
| meta_string = gr.Textbox( | |
| visible = False, | |
| ) | |
| meta_array = gr.Matrix( | |
| None, | |
| label = "Unsupported", | |
| row_count = (1, 'fixed'), | |
| height = "1rem", | |
| interactive = False, | |
| visible = False, | |
| ) | |
| meta_changes = gr.HighlightedText( | |
| None, | |
| label = "Metadata Changes", | |
| color_map = {'add': 'green', 'rem': 'red'}, | |
| interactive = False, | |
| visible = False, | |
| ) | |
| btn_download = gr.Button( | |
| "Download GGUF", | |
| variant = "primary", | |
| visible = False, | |
| ) | |
| file_meta = gr.Matrix( | |
| None, | |
| col_count = (3, "fixed"), | |
| headers = [ | |
| "Metadata Name", | |
| "Type", | |
| "Value", | |
| ], | |
| datatype = ["str", "str", "str"], | |
| column_widths = ["35%", "15%", "50%"], | |
| wrap = True, | |
| interactive = False, | |
| visible = False, | |
| ) | |
| meta_state = gr.State() # init_state | |
| # BUG: For some reason using gr.State initial value turns tuple to list? | |
| meta_state.value = init_state() | |
| file_change_components = [ | |
| meta_changes, | |
| file_meta, | |
| meta_keys, | |
| btn_download, | |
| ] | |
| state_change_components = [ | |
| meta_state, | |
| ] + file_change_components | |
| def get_branches( | |
| repo: str, | |
| oauth_token: gr.OAuthToken | None = None, | |
| ): | |
| branches = [] | |
| try: | |
| refs = hfapi.list_repo_refs( | |
| repo, | |
| token = oauth_token.token if oauth_token else False, | |
| ) | |
| branches = [b.name for b in refs.branches] | |
| except Exception as e: | |
| raise gr.Error(e) | |
| return { | |
| hf_branch: gr.Dropdown( | |
| branches or None, | |
| value = "main" if "main" in branches else None, | |
| ), | |
| } | |
| def get_files( | |
| repo: str, | |
| branch: str | None, | |
| oauth_token: gr.OAuthToken | None = None, | |
| ): | |
| return { | |
| hf_file: FileExplorer( | |
| "**/*.gguf", | |
| file_count = "single", | |
| root_dir = repo, | |
| branch = branch, | |
| token = oauth_token.token if oauth_token else None, | |
| visible = True, | |
| ), | |
| meta_changes: gr.HighlightedText( | |
| None, | |
| visible = False, | |
| ), | |
| file_meta: gr.Matrix( | |
| # None, # FIXME (see Dataframe bug below) | |
| visible = False, | |
| ), | |
| meta_keys: gr.Dropdown( | |
| None, | |
| visible = False, | |
| ), | |
| btn_download: gr.Button( | |
| visible = False, | |
| ), | |
| } | |
| def load_metadata( | |
| repo_file: str | None, | |
| branch: str | None, | |
| progress: gr.Progress = gr.Progress(), | |
| oauth_token: gr.OAuthToken | None = None, | |
| ): | |
| m = [] | |
| meta = init_state() | |
| yield { | |
| meta_state: meta, | |
| file_meta: gr.Matrix( | |
| [['', '', '']] * 100, # FIXME: Workaround for Dataframe bug when user has selected data | |
| visible = True, | |
| ), | |
| meta_changes: gr.HighlightedText( | |
| None, | |
| visible = False, | |
| ), | |
| meta_keys: gr.Dropdown( | |
| None, | |
| visible = False, | |
| ), | |
| btn_download: gr.Button( | |
| visible = False, | |
| ), | |
| } | |
| if not repo_file: | |
| return | |
| fs = HfFileSystem( | |
| token = oauth_token.token if oauth_token else None, | |
| ) | |
| try: | |
| progress(0, desc = 'Loading file...') | |
| with fs.open( | |
| repo_file, | |
| "rb", | |
| revision = branch, | |
| block_size = 8 * 1024 * 1024, | |
| cache_type = "readahead", | |
| ) as fp: | |
| progress(0, desc = 'Reading header...') | |
| gguf = HuggingGGUFstream(fp) | |
| num_metadata = gguf.header['metadata'].value | |
| metadata = gguf.read_metadata() | |
| meta.var['repo_file'] = repo_file | |
| meta.var['branch'] = branch | |
| for k, v in progress.tqdm(metadata, desc = 'Reading metadata...', total = num_metadata, unit = f' of {num_metadata} metadata keys...'): | |
| m.append([k, *human_readable_metadata(v.type, v.value)]) | |
| meta.key[k] = (v.type, v.value) | |
| # FIXME | |
| # yield { | |
| # file_meta: gr.Matrix( | |
| # m, | |
| # ), | |
| # } | |
| except Exception as e: | |
| raise gr.Error(e) | |
| yield { | |
| meta_state: meta, | |
| file_meta: gr.Matrix( | |
| m, | |
| ), | |
| meta_keys: gr.Dropdown( | |
| sorted(meta.key.keys() | standard_metadata.keys()), | |
| value = '', | |
| visible = True, | |
| ), | |
| } | |
| def update_metakey( | |
| meta: MetadataState, | |
| key: str | None, | |
| ): | |
| typ = None | |
| if (val := meta.key.get(key, standard_metadata.get(key))) is not None: | |
| typ = GGUFValueType(val[0]).name | |
| elif key and key.startswith('tokenizer.chat_template.'): | |
| typ = GGUFValueType.STRING.name | |
| return { | |
| meta_types: gr.Dropdown( | |
| value = typ, | |
| interactive = False if key in meta.key else True, | |
| visible = True if key else False, | |
| ), | |
| btn_delete: gr.Button( | |
| visible = True if key in meta.key else False, | |
| ), | |
| } | |
| def update_metatype( | |
| meta: MetadataState, | |
| key: str, | |
| typ: int, | |
| ): | |
| val = None | |
| if (data := meta.key.get(key, standard_metadata.get(key))) is not None: | |
| typ = data[0] | |
| val = data[1] | |
| elif not key: | |
| typ = None | |
| if isinstance(val, list): | |
| # TODO: Support arrays? | |
| typ = GGUFValueType.ARRAY | |
| match typ: | |
| case GGUFValueType.INT8 | GGUFValueType.INT16 | GGUFValueType.INT32 | GGUFValueType.INT64 | GGUFValueType.UINT8 | GGUFValueType.UINT16 | GGUFValueType.UINT32 | GGUFValueType.UINT64 | GGUFValueType.FLOAT32 | GGUFValueType.FLOAT64: | |
| is_number = True | |
| case _: | |
| is_number = False | |
| return { | |
| meta_boolean: gr.Checkbox( | |
| value = val if typ == GGUFValueType.BOOL and data is not None else False, | |
| visible = True if typ == GGUFValueType.BOOL else False, | |
| ), | |
| meta_number: gr.Number( | |
| value = val if is_number and data is not None else 0, | |
| precision = 10 if typ == GGUFValueType.FLOAT32 or typ == GGUFValueType.FLOAT64 else 0, | |
| visible = True if is_number else False, | |
| ), | |
| meta_string: gr.Textbox( | |
| value = val if typ == GGUFValueType.STRING else '', | |
| visible = True if typ == GGUFValueType.STRING else False, | |
| ), | |
| meta_array: gr.Matrix( | |
| visible = True if typ == GGUFValueType.ARRAY else False, | |
| ), | |
| } | |
| # FIXME: Disabled for now due to Dataframe bug when user has selected data | |
| # @gr.on( | |
| # triggers = [ | |
| # file_meta.select, | |
| # ], | |
| # inputs = [ | |
| # ], | |
| # outputs = [ | |
| # meta_keys, | |
| # ], | |
| # ) | |
| # def select_metakey( | |
| # evt: gr.SelectData, | |
| # ): | |
| # return { | |
| # meta_keys: gr.Dropdown( | |
| # value = evt.row_value[0] if evt.selected else '', | |
| # ), | |
| # } | |
| def notify_state_change( | |
| meta: MetadataState, | |
| request: gr.Request, | |
| ): | |
| changes = [(k, 'rem') for k in meta.rem] | |
| for k, v in meta.add.items(): | |
| changes.append((k, 'add')) | |
| changes.append((str(v[1]), None)) | |
| m = [] | |
| for k, v in meta.key.items(): | |
| m.append([k, *human_readable_metadata(v[0], v[1])]) | |
| link = str(request.request.url_for('download', repo_file = meta.var['repo_file']).include_query_params(branch = meta.var['branch'])) | |
| if meta.rem or meta.add: | |
| link += '&' + urlencode( | |
| { | |
| 'rem': meta.rem, | |
| 'add': [json.dumps([k, *v], ensure_ascii = False) for k, v in meta.add.items()], | |
| }, | |
| doseq = True, | |
| safe = '[]{}:"\',', | |
| ) | |
| return { | |
| meta_state: meta, | |
| meta_changes: gr.HighlightedText( | |
| changes, | |
| visible = True if changes else False, | |
| ), | |
| file_meta: gr.Matrix( | |
| m, | |
| ), | |
| meta_keys: gr.Dropdown( | |
| sorted(meta.key.keys() | standard_metadata.keys()), | |
| value = '', | |
| ), | |
| btn_download: gr.Button( | |
| link = link, | |
| visible = True if changes else False, | |
| ), | |
| } | |
| def rem_metadata( | |
| meta: MetadataState, | |
| key: str, | |
| request: gr.Request, | |
| ): | |
| if key in meta.add: | |
| del meta.add[key] | |
| if key in meta.key: | |
| del meta.key[key] | |
| meta.rem.add(key) | |
| return notify_state_change( | |
| meta, | |
| request, | |
| ) | |
| def add_metadata( | |
| meta: MetadataState, | |
| key: str, | |
| typ: int | None, | |
| val: Any, | |
| request: gr.Request, | |
| ): | |
| if not key or typ is None: | |
| if key: | |
| gr.Warning('Missing required value type') | |
| return { | |
| meta_changes: gr.HighlightedText( | |
| ) | |
| } | |
| if key in meta.rem: | |
| meta.rem.remove(key) | |
| meta.key[key] = meta.add[key] = (typ, val) | |
| if key.startswith('tokenizer.chat_template.'): | |
| template = key[24:] | |
| if template not in meta.key.get('tokenizer.chat_templates', []): | |
| templates = [x[24:] for x in meta.key.keys() if x.startswith('tokenizer.chat_template.')] | |
| meta.key['tokenizer.chat_templates'] = meta.add['tokenizer.chat_templates'] = (GGUFValueType.STRING, templates) | |
| return notify_state_change( | |
| meta, | |
| request, | |
| ) | |
| meta_boolean.input( | |
| add_metadata, | |
| inputs = [ | |
| meta_state, | |
| meta_keys, | |
| meta_types, | |
| meta_boolean, | |
| ], | |
| outputs = [ | |
| ] + state_change_components, | |
| ) | |
| # meta_lookup.input( | |
| # lambda token: gr.Number(value = token), | |
| # inputs = meta_lookup, | |
| # outputs = meta_number, | |
| # ) | |
| meta_number.submit( | |
| add_metadata, | |
| inputs = [ | |
| meta_state, | |
| meta_keys, | |
| meta_types, | |
| meta_number, | |
| ], | |
| outputs = [ | |
| ] + state_change_components, | |
| ) | |
| meta_string.submit( | |
| add_metadata, | |
| inputs = [ | |
| meta_state, | |
| meta_keys, | |
| meta_types, | |
| meta_string, | |
| ], | |
| outputs = [ | |
| ] + state_change_components, | |
| ) | |
| meta_array.input( | |
| add_metadata, | |
| inputs = [ | |
| meta_state, | |
| meta_keys, | |
| meta_types, | |
| meta_array, | |
| ], | |
| outputs = [ | |
| ] + state_change_components, | |
| ) | |
| def stream_repo_file( | |
| repo_file: str, | |
| branch: str, | |
| add_meta: list[str] | None, | |
| rem_meta: list[str] | None, | |
| token: str | None = None, | |
| ): | |
| fs = HfFileSystem( | |
| token = token, | |
| ) | |
| with fs.open( | |
| repo_file, | |
| "rb", | |
| revision = branch, | |
| block_size = 8 * 1024 * 1024, | |
| cache_type = "readahead", | |
| ) as fp: | |
| if not rem_meta: | |
| rem_meta = [] | |
| if not add_meta: | |
| add_meta = [] | |
| gguf = HuggingGGUFstream(fp) | |
| for _ in gguf.read_metadata(): | |
| pass | |
| for k in rem_meta: | |
| gguf.remove_metadata(k) | |
| for k in add_meta: | |
| k = json.loads(k) | |
| if isinstance(k, list) and len(k) == 3: | |
| gguf.add_metadata(*k) | |
| yield gguf.filesize | |
| yield b''.join((v.data for k, v in gguf.header.items())) | |
| for k, v in gguf.metadata.items(): | |
| yield v.data | |
| while True: | |
| if not (data := fp.read(65536)): | |
| break | |
| yield data | |
| if __name__ == "__main__": | |
| blocks.queue( | |
| max_size = 10, | |
| default_concurrency_limit = 10, | |
| ) | |
| app, local_url, share_url = blocks.launch( | |
| show_api = False, | |
| prevent_thread_lock = True, | |
| ) | |
| async def download( | |
| request: Request, | |
| repo_file: Annotated[str, Path()], | |
| branch: Annotated[str, Query()] = "main", | |
| add: Annotated[list[str] | None, Query()] = None, | |
| rem: Annotated[list[str] | None, Query()] = None, | |
| ): | |
| token = request.session.get('oauth_info', {}).get('access_token') | |
| if posixpath.normpath(repo_file) != repo_file or '\\' in repo_file or repo_file.startswith('../') or repo_file.startswith('/') or repo_file.count('/') < 2: | |
| raise HTTPException( | |
| status_code = 404, | |
| detail = 'Invalid repository', | |
| ) | |
| stream = stream_repo_file( | |
| repo_file, | |
| branch, | |
| add, | |
| rem, | |
| token = token, | |
| ) | |
| size = next(stream) | |
| return StreamingResponse( | |
| stream, | |
| headers = { | |
| 'Content-Length': str(size), | |
| }, | |
| media_type = 'application/octet-stream', | |
| ) | |
| app.add_api_route( | |
| "/download/{repo_file:path}", | |
| download, | |
| methods = ["GET"], | |
| ) | |
| # app.openapi_schema = None | |
| # app.setup() | |
| blocks.block_thread() | |