Spaces:
Paused
Paused
| import inspect | |
| import re | |
| from typing import Any, Callable, Dict, Type, Union, Tuple, Optional | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| # _map_python_type_to_json_schema remains the same... | |
| def _map_python_type_to_json_schema(py_type: Type[Any]) -> Dict[str, Any]: | |
| """Maps Python types to JSON Schema type definitions.""" | |
| origin = getattr(py_type, "__origin__", None) | |
| args = getattr(py_type, "__args__", ()) | |
| if py_type is str: | |
| return {"type": "string"} | |
| elif py_type is int: | |
| return {"type": "integer"} | |
| elif py_type is float: | |
| return {"type": "number"} | |
| elif py_type is bool: | |
| return {"type": "boolean"} | |
| elif py_type is list or origin is list: | |
| items_schema = {"type": "string"} # Default for untyped lists | |
| if args and args[0] is not Any: | |
| items_schema = _map_python_type_to_json_schema(args[0]) | |
| return {"type": "array", "items": items_schema} | |
| elif py_type is dict or origin is dict: | |
| # Basic object type, doesn't specify properties for simplicity | |
| # Might need refinement if nested structures are common and need schema | |
| return {"type": "object", "additionalProperties": True} # Allow any properties | |
| elif origin is Union: | |
| # Handle Optional[T] -> T (represented as Union[T, NoneType]) | |
| # And basic Union types, favouring the first non-None type | |
| non_none_types = [t for t in args if t is not type(None)] | |
| if len(non_none_types) > 0: | |
| # If it was Optional[T], is_optional flag handles 'required' status later | |
| # For Union[A, B], just use the first type's schema for simplicity. | |
| # A more complex approach could use 'anyOf'. | |
| return _map_python_type_to_json_schema(non_none_types[0]) | |
| else: | |
| return {"type": "null"} # Should only happen for Union[NoneType] | |
| elif py_type is Any: | |
| # Any type can be represented loosely, e.g., allowing any type | |
| return {} # Represents any type in JSON Schema when empty | |
| else: | |
| # Default or unknown types | |
| # Consider logging a warning for unknown types | |
| return {"type": "string"} # Default to string if unknown | |
| def _parse_docstring(docstring: str) -> Tuple[Dict[str, str], str]: | |
| """ | |
| Parses a NumPy-style docstring to extract the main description and parameter descriptions. | |
| Handles "Parameters", "Args:", "Arguments:" sections. | |
| """ | |
| if not docstring: | |
| return {}, "" | |
| lines = docstring.strip().splitlines() | |
| main_description_lines = [] | |
| param_descriptions = {} | |
| # 1. Find main description and Parameter section variants | |
| potential_headers = {"parameters", "args:", "arguments:"} | |
| header_found = False | |
| param_definitions_start_index = -1 | |
| for i, line in enumerate(lines): | |
| stripped_line = line.strip() | |
| stripped_lower = stripped_line.lower() | |
| if not header_found: | |
| if stripped_lower in potential_headers: | |
| header_found = True | |
| # Check for optional '---' separator line immediately after | |
| if i + 1 < len(lines) and lines[i + 1].strip().startswith("---"): | |
| param_definitions_start_index = i + 2 | |
| else: | |
| param_definitions_start_index = i + 1 | |
| # Stop adding to main description *before* the header line | |
| elif stripped_line: | |
| main_description_lines.append(stripped_line) | |
| elif i >= param_definitions_start_index: | |
| # Break early if we found the header and are past the definition start line | |
| # The next loop will handle parsing from here | |
| break | |
| main_description = "\n".join(main_description_lines).strip() | |
| # 2. Process Parameters section if found | |
| if param_definitions_start_index != -1 and param_definitions_start_index < len( | |
| lines | |
| ): | |
| current_param_name = None | |
| current_param_desc_lines = [] | |
| param_def_indent = -1 | |
| expected_desc_indent = -1 | |
| for i in range(param_definitions_start_index, len(lines)): | |
| line = lines[i] | |
| current_indent = len(line) - len(line.lstrip(" ")) | |
| stripped_line = line.strip() | |
| # Regex to find 'name :' at the start of the stripped line | |
| param_match = re.match( | |
| r"^(?P<name>[a-zA-Z_]\w*)\s*:(?P<desc_start>.*)$", stripped_line | |
| ) | |
| # --- Determine if this line starts a new parameter --- | |
| is_new_parameter_line = False | |
| if param_match: | |
| # It looks like a parameter definition. Is it *really* a new one? | |
| # It's new if: | |
| # a) We aren't currently processing a param OR | |
| # b) It's not indented further than the *start* of the current param's description block | |
| # (to allow param names within descriptions, although uncommon in NumPy style) OR | |
| # c) It's strictly less indented than the expected description indent (clearly ends the block) OR | |
| # d) It's at the same or lesser indent level than the previous param def line itself. | |
| if not current_param_name: | |
| is_new_parameter_line = True | |
| else: | |
| if expected_desc_indent != -1: | |
| # If we have an established description indent | |
| if current_indent < expected_desc_indent: | |
| is_new_parameter_line = ( | |
| True # Definitely ends previous block | |
| ) | |
| elif current_indent <= param_def_indent: | |
| # If no description started yet, or if back at original indent | |
| is_new_parameter_line = True | |
| # --- Process based on whether it's a new parameter line --- | |
| if is_new_parameter_line: | |
| # Save the previous parameter's description | |
| if current_param_name: | |
| param_descriptions[current_param_name] = "\n".join( | |
| current_param_desc_lines | |
| ).strip() | |
| # Start the new parameter | |
| current_param_name = param_match.group("name") | |
| desc_start = param_match.group("desc_start").strip() | |
| current_param_desc_lines = ( | |
| [desc_start] if desc_start else [] | |
| ) # Use first line if present | |
| param_def_indent = current_indent | |
| expected_desc_indent = -1 # Reset for the new parameter | |
| elif current_param_name: | |
| # We are inside a parameter's block, and this line is not starting a new param | |
| if not stripped_line: | |
| # Handle blank lines within the description | |
| # Keep blank lines if they are indented same/more than expected desc indent OR | |
| # if they are more indented than param def and we haven't set expected yet. | |
| if ( | |
| expected_desc_indent != -1 | |
| and current_indent >= expected_desc_indent | |
| ) or ( | |
| expected_desc_indent == -1 and current_indent > param_def_indent | |
| ): | |
| current_param_desc_lines.append("") # Preserve paragraph breaks | |
| # Otherwise, ignore blank lines that break indentation pattern | |
| continue | |
| # This is a non-empty, non-param-starting line within a block | |
| if expected_desc_indent == -1: | |
| # This is the first line of the description text (after the 'name :' line) | |
| if current_indent > param_def_indent: | |
| expected_desc_indent = current_indent | |
| current_param_desc_lines.append(stripped_line) | |
| else: | |
| # Indentation is not correct for a description start. End of this param. | |
| if current_param_name: # Save description if any collected | |
| param_descriptions[current_param_name] = "\n".join( | |
| current_param_desc_lines | |
| ).strip() | |
| current_param_name = None | |
| # Assume end of parameters section, stop parsing this section | |
| break | |
| elif current_indent >= expected_desc_indent: | |
| # Continuation of the description (matches or exceeds expected indent) | |
| current_param_desc_lines.append(stripped_line) | |
| else: | |
| # Indentation decreased below expected description indent. End of this parameter's description. | |
| if current_param_name: | |
| param_descriptions[current_param_name] = "\n".join( | |
| current_param_desc_lines | |
| ).strip() | |
| current_param_name = None | |
| # Assume end of parameters section, stop parsing this section | |
| break | |
| else: | |
| # We are not inside a parameter block, and this line doesn't start one. | |
| # If the line has content, it must be the end of the Parameters section. | |
| if stripped_line: | |
| break # Stop parsing parameters section | |
| # Save the last parameter description after the loop finishes | |
| if current_param_name: | |
| param_descriptions[current_param_name] = "\n".join( | |
| current_param_desc_lines | |
| ).strip() | |
| # Filter out empty descriptions that might result from parsing issues or empty entries | |
| param_descriptions = {k: v for k, v in param_descriptions.items() if v} | |
| return param_descriptions, main_description | |
| # generate_anthropic_tool_schema remains the same, but needs slight adjustment for required logic | |
| def generate_anthropic_tool_schema( | |
| func: Callable[..., Any], description: str | None = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generates an Anthropic tool schema dictionary from a Python function. | |
| Args: | |
| func: The function to generate the schema for. | |
| description: Optional override for the tool description. If None, it's parsed from the docstring. | |
| """ | |
| signature = inspect.signature(func) | |
| docstring = inspect.getdoc(func) or "" | |
| param_docs, main_description = _parse_docstring(docstring) | |
| # Use provided description if available, otherwise use parsed main description | |
| final_description = description if description is not None else main_description | |
| properties = {} | |
| required_params = [] | |
| for name, param in signature.parameters.items(): | |
| if name in ("self", "cls"): # Skip self/cls for methods | |
| continue | |
| schema = {} | |
| param_type = param.annotation | |
| # --- Type Mapping --- | |
| if param_type is not inspect.Parameter.empty: | |
| type_schema = _map_python_type_to_json_schema(param_type) | |
| schema.update(type_schema) | |
| else: | |
| # Default to string if no type hint | |
| schema["type"] = "string" | |
| # --- Description --- | |
| # Get description from parsed docstring, fallback to empty string | |
| schema["description"] = param_docs.get(name, "").strip() | |
| # --- Required Status & Default --- | |
| is_optional = False | |
| if param.default is not inspect.Parameter.empty: | |
| # Has a default value, so not required by default | |
| if param.default is not None: # Append default only if not None | |
| default_str = f"Default: {param.default!r}." | |
| if schema["description"]: | |
| schema["description"] += f" {default_str}" | |
| else: | |
| schema["description"] = default_str | |
| else: | |
| # No default value. Check if type hint marks it as Optional | |
| origin = getattr(param_type, "__origin__", None) | |
| args = getattr(param_type, "__args__", ()) | |
| if origin is Union and type(None) in args: | |
| is_optional = True # Type hint is Optional[T] or Union[T, None] | |
| elif param_type is Any: | |
| # Assume Any could be None, treat as not strictly required unless logic dictates otherwise | |
| is_optional = True # Or base this on project conventions for 'Any' | |
| elif param_type is inspect.Parameter.empty: | |
| # No type hint, no default -> Assume required | |
| pass # is_optional remains False | |
| # If not Optional or Any without default, it's required | |
| if not is_optional: | |
| required_params.append(name) | |
| properties[name] = schema | |
| tool_schema = { | |
| "name": func.__name__, | |
| "description": final_description, # Use the determined description | |
| "input_schema": { | |
| "type": "object", | |
| "properties": properties, | |
| }, | |
| } | |
| # Only add 'required' key if there are required parameters | |
| if required_params: | |
| # Ensure uniqueness and sort for consistency | |
| tool_schema["input_schema"]["required"] = sorted(list(set(required_params))) | |
| return tool_schema | |
| def get_retry_session( | |
| retries: int = 3, | |
| backoff_factor: float = 1.0, | |
| status_forcelist: Tuple[int, ...] = (429, 500, 502, 503, 504), | |
| allowed_methods: Tuple[str, ...] = ("POST", "GET"), | |
| ) -> requests.Session: | |
| """Get a requests session with retry capabilities. | |
| Args: | |
| retries: Total number of retries to allow. | |
| backoff_factor: A backoff factor to apply between attempts after the second try. | |
| {backoff factor} * (2 ** ({number of total retries} - 1)) | |
| status_forcelist: A set of HTTP status codes that we should force a retry on. | |
| allowed_methods: A list of uppercase HTTP method verbs that we should allow retries on. | |
| Returns: | |
| A requests.Session object configured with retry logic. | |
| """ | |
| session = requests.Session() | |
| retry_strategy = Retry( | |
| total=retries, | |
| backoff_factor=backoff_factor, | |
| status_forcelist=status_forcelist, | |
| allowed_methods=list(allowed_methods), # Retry expects a list | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry_strategy) | |
| session.mount("https://", adapter) | |
| session.mount("http://", adapter) | |
| return session |