|
|
import logging |
|
|
import re |
|
|
from enum import Enum |
|
|
from io import BytesIO |
|
|
from pathlib import Path, PurePath |
|
|
from typing import ( |
|
|
TYPE_CHECKING, |
|
|
Dict, |
|
|
Iterable, |
|
|
List, |
|
|
Literal, |
|
|
Optional, |
|
|
Set, |
|
|
Type, |
|
|
Union, |
|
|
) |
|
|
|
|
|
import filetype |
|
|
from docling_core.types.doc import ( |
|
|
DocItem, |
|
|
DocItemLabel, |
|
|
DoclingDocument, |
|
|
PictureItem, |
|
|
SectionHeaderItem, |
|
|
TableItem, |
|
|
TextItem, |
|
|
) |
|
|
from docling_core.types.doc.document import ListItem |
|
|
from docling_core.types.legacy_doc.base import ( |
|
|
BaseText, |
|
|
Figure, |
|
|
GlmTableCell, |
|
|
PageDimensions, |
|
|
PageReference, |
|
|
Prov, |
|
|
Ref, |
|
|
) |
|
|
from docling_core.types.legacy_doc.base import Table as DsSchemaTable |
|
|
from docling_core.types.legacy_doc.base import TableCell |
|
|
from docling_core.types.legacy_doc.document import ( |
|
|
CCSDocumentDescription as DsDocumentDescription, |
|
|
) |
|
|
from docling_core.types.legacy_doc.document import CCSFileInfoObject as DsFileInfoObject |
|
|
from docling_core.types.legacy_doc.document import ExportedCCSDocument as DsDocument |
|
|
from docling_core.utils.file import resolve_source_to_stream |
|
|
from docling_core.utils.legacy import docling_document_to_legacy |
|
|
from pydantic import BaseModel |
|
|
from typing_extensions import deprecated |
|
|
|
|
|
from docling.backend.abstract_backend import ( |
|
|
AbstractDocumentBackend, |
|
|
PaginatedDocumentBackend, |
|
|
) |
|
|
from docling.datamodel.base_models import ( |
|
|
AssembledUnit, |
|
|
ConversionStatus, |
|
|
DocumentStream, |
|
|
ErrorItem, |
|
|
FormatToExtensions, |
|
|
FormatToMimeType, |
|
|
InputFormat, |
|
|
MimeTypeToFormat, |
|
|
Page, |
|
|
) |
|
|
from docling.datamodel.settings import DocumentLimits |
|
|
from docling.utils.profiling import ProfilingItem |
|
|
from docling.utils.utils import create_file_hash, create_hash |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from docling.document_converter import FormatOption |
|
|
|
|
|
_log = logging.getLogger(__name__) |
|
|
|
|
|
layout_label_to_ds_type = { |
|
|
DocItemLabel.TITLE: "title", |
|
|
DocItemLabel.DOCUMENT_INDEX: "table", |
|
|
DocItemLabel.SECTION_HEADER: "subtitle-level-1", |
|
|
DocItemLabel.CHECKBOX_SELECTED: "checkbox-selected", |
|
|
DocItemLabel.CHECKBOX_UNSELECTED: "checkbox-unselected", |
|
|
DocItemLabel.CAPTION: "caption", |
|
|
DocItemLabel.PAGE_HEADER: "page-header", |
|
|
DocItemLabel.PAGE_FOOTER: "page-footer", |
|
|
DocItemLabel.FOOTNOTE: "footnote", |
|
|
DocItemLabel.TABLE: "table", |
|
|
DocItemLabel.FORMULA: "equation", |
|
|
DocItemLabel.LIST_ITEM: "paragraph", |
|
|
DocItemLabel.CODE: "paragraph", |
|
|
DocItemLabel.PICTURE: "figure", |
|
|
DocItemLabel.TEXT: "paragraph", |
|
|
DocItemLabel.PARAGRAPH: "paragraph", |
|
|
DocItemLabel.FORM: DocItemLabel.FORM.value, |
|
|
DocItemLabel.KEY_VALUE_REGION: DocItemLabel.KEY_VALUE_REGION.value, |
|
|
} |
|
|
|
|
|
_EMPTY_DOCLING_DOC = DoclingDocument(name="dummy") |
|
|
|
|
|
|
|
|
class InputDocument(BaseModel): |
|
|
file: PurePath |
|
|
document_hash: str |
|
|
valid: bool = True |
|
|
limits: DocumentLimits = DocumentLimits() |
|
|
format: InputFormat |
|
|
|
|
|
filesize: Optional[int] = None |
|
|
page_count: int = 0 |
|
|
|
|
|
_backend: AbstractDocumentBackend |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
path_or_stream: Union[BytesIO, Path], |
|
|
format: InputFormat, |
|
|
backend: Type[AbstractDocumentBackend], |
|
|
filename: Optional[str] = None, |
|
|
limits: Optional[DocumentLimits] = None, |
|
|
): |
|
|
super().__init__( |
|
|
file="", document_hash="", format=InputFormat.PDF |
|
|
) |
|
|
|
|
|
self.limits = limits or DocumentLimits() |
|
|
self.format = format |
|
|
|
|
|
try: |
|
|
if isinstance(path_or_stream, Path): |
|
|
self.file = path_or_stream |
|
|
self.filesize = path_or_stream.stat().st_size |
|
|
if self.filesize > self.limits.max_file_size: |
|
|
self.valid = False |
|
|
else: |
|
|
self.document_hash = create_file_hash(path_or_stream) |
|
|
self._init_doc(backend, path_or_stream) |
|
|
|
|
|
elif isinstance(path_or_stream, BytesIO): |
|
|
assert ( |
|
|
filename is not None |
|
|
), "Can't construct InputDocument from stream without providing filename arg." |
|
|
self.file = PurePath(filename) |
|
|
self.filesize = path_or_stream.getbuffer().nbytes |
|
|
|
|
|
if self.filesize > self.limits.max_file_size: |
|
|
self.valid = False |
|
|
else: |
|
|
self.document_hash = create_file_hash(path_or_stream) |
|
|
self._init_doc(backend, path_or_stream) |
|
|
else: |
|
|
raise RuntimeError( |
|
|
f"Unexpected type path_or_stream: {type(path_or_stream)}" |
|
|
) |
|
|
|
|
|
|
|
|
if self.valid and self._backend.is_valid(): |
|
|
if self._backend.supports_pagination() and isinstance( |
|
|
self._backend, PaginatedDocumentBackend |
|
|
): |
|
|
self.page_count = self._backend.page_count() |
|
|
if not self.page_count <= self.limits.max_num_pages: |
|
|
self.valid = False |
|
|
elif self.page_count < self.limits.page_range[0]: |
|
|
self.valid = False |
|
|
|
|
|
except (FileNotFoundError, OSError) as e: |
|
|
self.valid = False |
|
|
_log.exception( |
|
|
f"File {self.file.name} not found or cannot be opened.", exc_info=e |
|
|
) |
|
|
|
|
|
except RuntimeError as e: |
|
|
self.valid = False |
|
|
_log.exception( |
|
|
f"An unexpected error occurred while opening the document {self.file.name}", |
|
|
exc_info=e, |
|
|
) |
|
|
|
|
|
|
|
|
def _init_doc( |
|
|
self, |
|
|
backend: Type[AbstractDocumentBackend], |
|
|
path_or_stream: Union[BytesIO, Path], |
|
|
) -> None: |
|
|
self._backend = backend(self, path_or_stream=path_or_stream) |
|
|
if not self._backend.is_valid(): |
|
|
self.valid = False |
|
|
|
|
|
|
|
|
class DocumentFormat(str, Enum): |
|
|
V2 = "v2" |
|
|
V1 = "v1" |
|
|
|
|
|
|
|
|
class ConversionResult(BaseModel): |
|
|
input: InputDocument |
|
|
|
|
|
status: ConversionStatus = ConversionStatus.PENDING |
|
|
errors: List[ErrorItem] = [] |
|
|
|
|
|
pages: List[Page] = [] |
|
|
assembled: AssembledUnit = AssembledUnit() |
|
|
timings: Dict[str, ProfilingItem] = {} |
|
|
|
|
|
document: DoclingDocument = _EMPTY_DOCLING_DOC |
|
|
|
|
|
@property |
|
|
@deprecated("Use document instead.") |
|
|
def legacy_document(self): |
|
|
return docling_document_to_legacy(self.document) |
|
|
|
|
|
|
|
|
class _DummyBackend(AbstractDocumentBackend): |
|
|
def __init__(self, *args, **kwargs): |
|
|
super().__init__(*args, **kwargs) |
|
|
|
|
|
def is_valid(self) -> bool: |
|
|
return False |
|
|
|
|
|
@classmethod |
|
|
def supported_formats(cls) -> Set[InputFormat]: |
|
|
return set() |
|
|
|
|
|
@classmethod |
|
|
def supports_pagination(cls) -> bool: |
|
|
return False |
|
|
|
|
|
def unload(self): |
|
|
return super().unload() |
|
|
|
|
|
|
|
|
class _DocumentConversionInput(BaseModel): |
|
|
|
|
|
path_or_stream_iterator: Iterable[Union[Path, str, DocumentStream]] |
|
|
headers: Optional[Dict[str, str]] = None |
|
|
limits: Optional[DocumentLimits] = DocumentLimits() |
|
|
|
|
|
def docs( |
|
|
self, format_options: Dict[InputFormat, "FormatOption"] |
|
|
) -> Iterable[InputDocument]: |
|
|
for item in self.path_or_stream_iterator: |
|
|
obj = ( |
|
|
resolve_source_to_stream(item, self.headers) |
|
|
if isinstance(item, str) |
|
|
else item |
|
|
) |
|
|
format = self._guess_format(obj) |
|
|
backend: Type[AbstractDocumentBackend] |
|
|
if format not in format_options.keys(): |
|
|
_log.error( |
|
|
f"Input document {obj.name} does not match any allowed format." |
|
|
) |
|
|
backend = _DummyBackend |
|
|
else: |
|
|
backend = format_options[format].backend |
|
|
|
|
|
if isinstance(obj, Path): |
|
|
yield InputDocument( |
|
|
path_or_stream=obj, |
|
|
format=format, |
|
|
filename=obj.name, |
|
|
limits=self.limits, |
|
|
backend=backend, |
|
|
) |
|
|
elif isinstance(obj, DocumentStream): |
|
|
yield InputDocument( |
|
|
path_or_stream=obj.stream, |
|
|
format=format, |
|
|
filename=obj.name, |
|
|
limits=self.limits, |
|
|
backend=backend, |
|
|
) |
|
|
else: |
|
|
raise RuntimeError(f"Unexpected obj type in iterator: {type(obj)}") |
|
|
|
|
|
def _guess_format(self, obj: Union[Path, DocumentStream]) -> Optional[InputFormat]: |
|
|
content = b"" |
|
|
formats: list[InputFormat] = [] |
|
|
|
|
|
if isinstance(obj, Path): |
|
|
mime = filetype.guess_mime(str(obj)) |
|
|
if mime is None: |
|
|
ext = obj.suffix[1:] |
|
|
mime = _DocumentConversionInput._mime_from_extension(ext) |
|
|
if mime is None: |
|
|
with obj.open("rb") as f: |
|
|
content = f.read(1024) |
|
|
|
|
|
elif isinstance(obj, DocumentStream): |
|
|
content = obj.stream.read(8192) |
|
|
obj.stream.seek(0) |
|
|
mime = filetype.guess_mime(content) |
|
|
if mime is None: |
|
|
ext = ( |
|
|
obj.name.rsplit(".", 1)[-1] |
|
|
if ("." in obj.name and not obj.name.startswith(".")) |
|
|
else "" |
|
|
) |
|
|
mime = _DocumentConversionInput._mime_from_extension(ext) |
|
|
|
|
|
mime = mime or _DocumentConversionInput._detect_html_xhtml(content) |
|
|
mime = mime or "text/plain" |
|
|
formats = MimeTypeToFormat.get(mime, []) |
|
|
if formats: |
|
|
if len(formats) == 1 and mime not in ("text/plain"): |
|
|
return formats[0] |
|
|
else: |
|
|
return _DocumentConversionInput._guess_from_content( |
|
|
content, mime, formats |
|
|
) |
|
|
else: |
|
|
return None |
|
|
|
|
|
@staticmethod |
|
|
def _guess_from_content( |
|
|
content: bytes, mime: str, formats: list[InputFormat] |
|
|
) -> Optional[InputFormat]: |
|
|
"""Guess the input format of a document by checking part of its content.""" |
|
|
input_format: Optional[InputFormat] = None |
|
|
content_str = content.decode("utf-8") |
|
|
|
|
|
if mime == "application/xml": |
|
|
match_doctype = re.search(r"<!DOCTYPE [^>]+>", content_str) |
|
|
if match_doctype: |
|
|
xml_doctype = match_doctype.group() |
|
|
if InputFormat.XML_USPTO in formats and any( |
|
|
item in xml_doctype |
|
|
for item in ( |
|
|
"us-patent-application-v4", |
|
|
"us-patent-grant-v4", |
|
|
"us-grant-025", |
|
|
"patent-application-publication", |
|
|
) |
|
|
): |
|
|
input_format = InputFormat.XML_USPTO |
|
|
|
|
|
if ( |
|
|
InputFormat.XML_PUBMED in formats |
|
|
and "/NLM//DTD JATS" in xml_doctype |
|
|
): |
|
|
input_format = InputFormat.XML_PUBMED |
|
|
|
|
|
elif mime == "text/plain": |
|
|
if InputFormat.XML_USPTO in formats and content_str.startswith("PATN\r\n"): |
|
|
input_format = InputFormat.XML_USPTO |
|
|
|
|
|
return input_format |
|
|
|
|
|
@staticmethod |
|
|
def _mime_from_extension(ext): |
|
|
mime = None |
|
|
if ext in FormatToExtensions[InputFormat.ASCIIDOC]: |
|
|
mime = FormatToMimeType[InputFormat.ASCIIDOC][0] |
|
|
elif ext in FormatToExtensions[InputFormat.HTML]: |
|
|
mime = FormatToMimeType[InputFormat.HTML][0] |
|
|
elif ext in FormatToExtensions[InputFormat.MD]: |
|
|
mime = FormatToMimeType[InputFormat.MD][0] |
|
|
elif ext in FormatToExtensions[InputFormat.JSON_DOCLING]: |
|
|
mime = FormatToMimeType[InputFormat.JSON_DOCLING][0] |
|
|
elif ext in FormatToExtensions[InputFormat.PDF]: |
|
|
mime = FormatToMimeType[InputFormat.PDF][0] |
|
|
return mime |
|
|
|
|
|
@staticmethod |
|
|
def _detect_html_xhtml( |
|
|
content: bytes, |
|
|
) -> Optional[Literal["application/xhtml+xml", "application/xml", "text/html"]]: |
|
|
"""Guess the mime type of an XHTML, HTML, or XML file from its content. |
|
|
|
|
|
Args: |
|
|
content: A short piece of a document from its beginning. |
|
|
|
|
|
Returns: |
|
|
The mime type of an XHTML, HTML, or XML file, or None if the content does |
|
|
not match any of these formats. |
|
|
""" |
|
|
content_str = content.decode("ascii", errors="ignore").lower() |
|
|
|
|
|
content_str = re.sub(r"<!--(.*?)-->", "", content_str, flags=re.DOTALL) |
|
|
content_str = content_str.lstrip() |
|
|
|
|
|
if re.match(r"<\?xml", content_str): |
|
|
if "xhtml" in content_str[:1000]: |
|
|
return "application/xhtml+xml" |
|
|
else: |
|
|
return "application/xml" |
|
|
|
|
|
if re.match(r"<!doctype\s+html|<html|<head|<body", content_str): |
|
|
return "text/html" |
|
|
|
|
|
p = re.compile( |
|
|
r"<!doctype\s+(?P<root>[a-zA-Z_:][a-zA-Z0-9_:.-]*)\s+.*>\s*<(?P=root)\b" |
|
|
) |
|
|
if p.search(content_str): |
|
|
return "application/xml" |
|
|
|
|
|
return None |
|
|
|