# markdownconverter/core/core.py
import os
import re
import sys
from collections import Counter
import tempfile
import subprocess
from datetime import date
from pathlib import Path
import docx
import pypandoc
import pdfkit
import markdown
import fitz # PyMuPDF
import hashlib
from docx.enum.text import WD_BREAK
from docx2pdf import convert as convert_word
from ..utils.logger import get_logger
log = get_logger(__name__)
# --- Custom Exceptions ---
class TemplatePlaceholderError(ValueError):
pass
class ConverterNotFoundError(Exception):
pass
# --- PDFKit Configuration ---
try:
config = pdfkit.configuration()
log.info("pdfkit configured using wkhtmltopdf from system PATH.")
except OSError:
WKHTMLTOPDF_PATH = r"C:\Program Files\wkhtmltopdf\bin\wkhtmltopdf.exe"
if os.path.exists(WKHTMLTOPDF_PATH):
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
else:
config = None
log.warning("wkhtmltopdf not found. PDF conversion may fail.")
def scan_template_for_placeholders(template_path: str) -> tuple[list[str], list[str]]:
if not os.path.exists(template_path):
raise FileNotFoundError(f"Template file not found: {template_path}")
log.info(f"Scanning template '{os.path.basename(template_path)}' for placeholders.")
structural_keys = {"%%REVISION_RECORD%%", "%%DOC_TOC%%", "%%DOC_CONTENT%%"}
placeholder_pattern = re.compile(r"%%([A-Z0-9_]+)%%")
found_placeholders = set()
doc = docx.Document(template_path)
def find_in_element(element):
if hasattr(element, "paragraphs"):
for p in element.paragraphs:
for match in placeholder_pattern.finditer(
"".join(run.text for run in p.runs)
):
found_placeholders.add(match.group(0))
if hasattr(element, "tables"):
for table in element.tables:
for row in table.rows:
for cell in row.cells:
find_in_element(cell)
find_in_element(doc)
for section in doc.sections:
find_in_element(section.header)
find_in_element(section.footer)
dynamic = sorted([p for p in found_placeholders if p not in structural_keys])
structural = sorted([p for p in found_placeholders if p in structural_keys])
log.info(
f"Scan complete. Found {len(dynamic)} dynamic and {len(structural)} structural placeholders."
)
return dynamic, structural
def _get_document_title(markdown_text: str) -> str:
match = re.search(r"^\s*#+\s+(.+)", markdown_text, re.MULTILINE)
return match.group(1).strip() if match else "Untitled Document"
def _split_markdown_by_revision_history(
markdown_text: str, separator_heading="## Revision Record"
) -> tuple[str, str]:
pattern = re.compile(
f"({re.escape(separator_heading)}.*?)(?=\n#+)", re.DOTALL | re.S
)
match = pattern.search(markdown_text)
if not match:
log.warning(
f"'{separator_heading}' section not found. No revision history will be added."
)
return "", markdown_text
rev_history_md = match.group(0).strip()
main_content_md = markdown_text.replace(rev_history_md, "", 1).strip()
return rev_history_md, main_content_md
def _replace_text_in_paragraph(paragraph, placeholders: dict[str, str]):
full_text = "".join(run.text for run in paragraph.runs)
if not any(key in full_text for key in placeholders):
return
for key, value in placeholders.items():
if key in full_text:
full_text = full_text.replace(key, str(value))
style = paragraph.runs[0].style if paragraph.runs else None
font = paragraph.runs[0].font if paragraph.runs else None
for run in reversed(paragraph.runs):
p = paragraph._p
p.remove(run._r)
new_run = paragraph.add_run(full_text)
if style:
new_run.style = style
if font:
new_run.font.name = font.name
new_run.font.size = font.size
new_run.font.bold = font.bold
new_run.font.italic = font.italic
new_run.font.underline = font.underline
if font.color and font.color.rgb:
new_run.font.color.rgb = font.color.rgb
def _replace_text_in_element(element, placeholders: dict[str, str]):
if hasattr(element, "paragraphs"):
for p in element.paragraphs:
_replace_text_in_paragraph(p, placeholders)
if hasattr(element, "tables"):
for table in element.tables:
for row in table.rows:
for cell in row.cells:
_replace_text_in_element(cell, placeholders)
def _replace_metadata_placeholders(doc: docx.Document, placeholders: dict[str, str]):
log.info(f"Replacing metadata placeholders: {list(placeholders.keys())}")
_replace_text_in_element(doc, placeholders)
for section in doc.sections:
_replace_text_in_element(section.header, placeholders)
_replace_text_in_element(section.footer, placeholders)
def _find_placeholder_paragraph(doc: docx.Document, placeholder: str):
for p in doc.paragraphs:
if placeholder in "".join(run.text for run in p.runs):
return p
return None
def _insert_docx_at_paragraph(paragraph, source_docx_path: str):
parent = paragraph._p.getparent()
index = parent.index(paragraph._p)
source_doc = docx.Document(source_docx_path)
for element in source_doc.element.body:
parent.insert(index, element)
index += 1
parent.remove(paragraph._p)
def _remove_paragraph(paragraph):
if paragraph is None:
return
parent = paragraph._p.getparent()
parent.remove(paragraph._p)
def _add_revision_table(doc: docx.Document, rev_history_md: str):
placeholder_p = _find_placeholder_paragraph(doc, "%%REVISION_RECORD%%")
if not placeholder_p:
log.warning("Revision record placeholder not found in template. Skipping.")
return
if not rev_history_md:
log.info("No revision history content found. Removing placeholder.")
_remove_paragraph(placeholder_p)
return
lines = [line.strip() for line in rev_history_md.strip().split("\n")]
table_lines = [
line for line in lines if line.startswith("|") and not line.startswith("|:--")
]
if not table_lines:
log.warning(
"Could not parse a markdown table from the revision history section."
)
_remove_paragraph(placeholder_p)
return
table_data = [
[cell.strip() for cell in line.split("|")][1:-1] for line in table_lines
]
if not table_data or len(table_data) < 1:
log.warning("Revision history table is empty.")
_remove_paragraph(placeholder_p)
return
log.info(f"Adding revision history table with {len(table_data)} rows.")
table = doc.add_table(rows=1, cols=len(table_data[0]))
table.style = "Table Grid"
hdr_cells = table.rows[0].cells
for i, header_text in enumerate(table_data[0]):
hdr_cells[i].text = header_text
for row_data in table_data[1:]:
row_cells = table.add_row().cells
for i, cell_text in enumerate(row_data):
row_cells[i].text = cell_text
parent = placeholder_p._p.getparent()
parent.insert(parent.index(placeholder_p._p), table._tbl)
_remove_paragraph(placeholder_p)
def _convert_to_pdf(markdown_text: str, output_file: str, add_toc: bool):
log.info("Starting PDF conversion using pdfkit.")
if config is None:
raise FileNotFoundError("wkhtmltopdf executable not found. Cannot create PDF.")
title = _get_document_title(markdown_text)
content_without_title = markdown_text
match = re.search(r"^\s*#+\s+(.+)\n?", markdown_text, re.MULTILINE)
if match:
content_without_title = markdown_text[match.end() :]
# Use nl2br extension to preserve line breaks (consistent with DOCX hard_line_breaks)
md_converter = markdown.Markdown(
extensions=["toc", "fenced_code", "tables", "nl2br"]
)
html_body = md_converter.convert(content_without_title)
toc_html = ""
if add_toc and hasattr(md_converter, "toc") and md_converter.toc:
log.info("Generating Table of Contents for PDF.")
toc_html = f"
Table of Contents
{md_converter.toc}"
full_html = f'{title}{title}
{toc_html}{html_body}'
pdf_options = {"encoding": "UTF-8", "enable-local-file-access": None}
pdfkit.from_string(
full_html, output_file, configuration=config, options=pdf_options
)
log.info(f"PDF successfully generated: {output_file}")
def _convert_to_docx(
markdown_text: str,
output_file: str,
template_path: str,
metadata: dict,
add_toc: bool,
):
log.info("Starting DOCX conversion.")
dynamic_placeholders, structural_placeholders = scan_template_for_placeholders(
template_path
)
required_structural = {"%%REVISION_RECORD%%", "%%DOC_TOC%%", "%%DOC_CONTENT%%"}
if not required_structural.issubset(structural_placeholders):
missing = required_structural - set(structural_placeholders)
raise TemplatePlaceholderError(
f"Template is missing required structural placeholders: {', '.join(missing)}"
)
doc = docx.Document(template_path)
if "%%DOC_PROJECT%%" in dynamic_placeholders and not metadata.get(
"%%DOC_PROJECT%%"
):
metadata["%%DOC_PROJECT%%"] = _get_document_title(markdown_text)
if "%%DOC_DATE%%" in dynamic_placeholders:
metadata["%%DOC_DATE%%"] = date.today().strftime("%d/%m/%Y")
_replace_metadata_placeholders(doc, metadata)
rev_history_md, main_content_md = _split_markdown_by_revision_history(markdown_text)
_add_revision_table(doc, rev_history_md)
temp_files = []
pandoc_format = "markdown+hard_line_breaks"
try:
if main_content_md:
content_for_pandoc = main_content_md
# Step 1: Remove the main H1 document title from the content to be processed.
# It's used for metadata, not for the main body's numbering.
match = re.search(r"^\s*#\s+(.+)\n?", content_for_pandoc, re.MULTILINE)
if match:
log.info("Removing main H1 title from content body.")
content_for_pandoc = content_for_pandoc[match.end() :]
# Step 2: Strip any existing manual numbering from headings (e.g., "## 1. Title")
# to prevent double numbering when automatic numbering is applied.
log.info("Stripping manual numbering from headings for auto-numbering.")
content_for_pandoc = re.sub(
r"^(\s*#+)\s+[0-9\.]+\s+",
r"\1 ",
content_for_pandoc,
flags=re.MULTILINE,
)
# Step 3: Configure Pandoc arguments for correct hierarchical numbering.
pandoc_args = [
# Enable automatic section numbering (e.g., 1, 1.1, 1.1.1).
"--number-sections",
# Shift heading levels up by one. This maps:
# ## (H2 in MD) -> Heading 1 in DOCX (numbered as 1, 2, ...)
# ### (H3 in MD) -> Heading 2 in DOCX (numbered as 1.1, 1.2, ...)
"--shift-heading-level-by=-1",
# Keep text left-aligned.
"--variable=justify:false",
]
if add_toc:
pandoc_args.append("--toc")
log.info("Adding page break before Table of Contents.")
toc_placeholder_p = _find_placeholder_paragraph(doc, "%%DOC_TOC%%")
# Insert a page break before the TOC for better formatting.
if toc_placeholder_p:
toc_placeholder_p.insert_paragraph_before().add_run().add_break(
WD_BREAK.PAGE
)
with tempfile.NamedTemporaryFile(
delete=False, suffix=".docx"
) as temp_file:
pypandoc.convert_text(
content_for_pandoc,
"docx",
format=pandoc_format,
extra_args=pandoc_args,
outputfile=temp_file.name,
)
temp_files.append(temp_file.name)
if toc_placeholder_p:
_insert_docx_at_paragraph(toc_placeholder_p, temp_file.name)
# The main content is now part of the generated TOC doc, so remove the placeholder.
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_CONTENT%%"))
else:
# If no TOC, just insert the content at its placeholder.
log.info("Adding page break before main content.")
content_placeholder_p = _find_placeholder_paragraph(
doc, "%%DOC_CONTENT%%"
)
if content_placeholder_p:
content_placeholder_p.insert_paragraph_before().add_run().add_break(
WD_BREAK.PAGE
)
with tempfile.NamedTemporaryFile(
delete=False, suffix=".docx"
) as temp_file:
# We don't add '--toc' to pandoc_args here.
pypandoc.convert_text(
content_for_pandoc,
"docx",
format=pandoc_format,
extra_args=pandoc_args,
outputfile=temp_file.name,
)
temp_files.append(temp_file.name)
if content_placeholder_p:
_insert_docx_at_paragraph(content_placeholder_p, temp_file.name)
# TOC placeholder is not used, so remove it.
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_TOC%%"))
else:
# If there is no main content, remove both placeholders.
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_TOC%%"))
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_CONTENT%%"))
doc.save(output_file)
log.info(f"Document successfully created at {output_file}")
finally:
for temp_file in temp_files:
if os.path.exists(temp_file):
os.remove(temp_file)
def convert_docx_to_pdf(
input_docx_path: str, output_pdf_path: str, max_retries: int = 2
) -> str:
"""
Convert DOCX to PDF using MS Word or LibreOffice with retry logic.
Args:
input_docx_path: Path to the input DOCX file
output_pdf_path: Path where the PDF will be saved
max_retries: Maximum number of retry attempts for LibreOffice (default: 2)
Returns:
Path to the generated PDF file
"""
if not os.path.exists(input_docx_path):
raise FileNotFoundError(f"Input DOCX file not found: {input_docx_path}")
try:
log.info("Attempting DOCX to PDF conversion using MS Word.")
convert_word(input_docx_path, output_pdf_path)
log.info(f"Successfully converted using MS Word: {output_pdf_path}")
return output_pdf_path
except Exception as e:
log.warning(f"MS Word conversion failed. It might not be installed. Error: {e}")
log.info("Falling back to LibreOffice conversion.")
libreoffice_path = r"C:\Program Files\LibreOffice\program\soffice.exe"
if not os.path.exists(libreoffice_path):
log.error("LibreOffice executable not found. Cannot convert DOCX to PDF.")
raise ConverterNotFoundError(
"Neither MS Word nor LibreOffice could be used. Please install one to use this feature."
)
if sys.platform == "win32":
try:
log.debug(
"Attempting to terminate existing LibreOffice processes on Windows."
)
subprocess.run(
["taskkill", "/f", "/im", "soffice.exe"],
check=False,
capture_output=True,
)
subprocess.run(
["taskkill", "/f", "/im", "soffice.bin"],
check=False,
capture_output=True,
)
log.debug("Termination commands sent.")
except Exception as kill_e:
log.warning(f"Could not terminate existing LibreOffice processes: {kill_e}")
output_dir = os.path.dirname(output_pdf_path)
log.info(f"Attempting conversion using LibreOffice at: {libreoffice_path}")
# Retry logic for LibreOffice (can fail on first attempt on Windows)
import time
last_error = None
for attempt in range(1, max_retries + 1):
try:
if attempt > 1:
log.info(
f"Retry attempt {attempt}/{max_retries} for LibreOffice conversion..."
)
time.sleep(2) # Brief pause between retries
expected_lo_output = os.path.join(
output_dir,
os.path.splitext(os.path.basename(input_docx_path))[0] + ".pdf",
)
command = [
libreoffice_path,
"--headless",
"--convert-to",
"pdf",
"--outdir",
output_dir,
input_docx_path,
]
process = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
timeout=60,
)
log.debug(f"LibreOffice stdout: {process.stdout}")
log.debug(f"LibreOffice stderr: {process.stderr}")
if (
os.path.exists(output_pdf_path)
and expected_lo_output != output_pdf_path
):
os.remove(output_pdf_path)
if os.path.exists(expected_lo_output):
if expected_lo_output != output_pdf_path:
os.rename(expected_lo_output, output_pdf_path)
else:
raise FileNotFoundError(
f"LibreOffice conversion process finished, but the output file was not found at the expected path: {expected_lo_output}"
)
log.info(f"Successfully converted using LibreOffice: {output_pdf_path}")
return output_pdf_path
except subprocess.TimeoutExpired as e:
last_error = e
log.warning(
f"LibreOffice conversion timed out (attempt {attempt}/{max_retries})"
)
if attempt >= max_retries:
log.error("LibreOffice conversion failed after all retry attempts.")
raise
except (subprocess.CalledProcessError, FileNotFoundError) as e:
last_error = e
log.warning(
f"LibreOffice conversion failed on attempt {attempt}/{max_retries}: {e}"
)
if attempt >= max_retries:
log.error(
f"LibreOffice conversion failed after all retry attempts. Last error: {e}",
exc_info=True,
)
raise
def combine_markdown_files(markdown_files: list, output_path: str) -> str:
"""
Combines multiple markdown files into a single file.
Args:
markdown_files: List of Path objects or strings pointing to markdown files
output_path: Path where the combined markdown file will be saved
Returns:
Path to the combined markdown file
"""
log.info(f"Combining {len(markdown_files)} markdown files into {output_path}")
with open(output_path, "w", encoding="utf-8") as out:
for md_file in markdown_files:
file_name = os.path.basename(md_file)
log.debug(f"Adding file: {file_name}")
out.write(f"\n\n# --- {file_name} ---\n\n")
with open(md_file, "r", encoding="utf-8") as f:
out.write(f.read())
out.write("\n\n")
log.info(f"Successfully combined files into: {output_path}")
return output_path
def convert_markdown_to_docx_with_pandoc(
input_file: str,
output_path: str,
template_path: str = None,
add_toc: bool = False,
number_sections: bool = False,
) -> str:
"""
Converts markdown to DOCX using pypandoc with optional template.
This is a simpler conversion without placeholder replacement.
Args:
input_file: Path to the markdown file
output_path: Path where the DOCX will be saved
template_path: Optional path to a DOCX template (reference-doc)
add_toc: If True, adds a table of contents
number_sections: If True, automatically numbers sections
Returns:
Path to the generated DOCX file
"""
log.info(f"Converting '{os.path.basename(input_file)}' to DOCX using pypandoc.")
if not os.path.exists(input_file):
raise FileNotFoundError(f"Input file not found: {input_file}")
# Build pypandoc arguments
extra_args = ["--variable=justify:false"]
if template_path and os.path.exists(template_path):
log.info(f"Using template: {os.path.basename(template_path)}")
extra_args.extend(["--reference-doc", str(template_path)])
if add_toc:
log.info("Adding table of contents")
extra_args.append("--toc")
if number_sections:
log.info("Enabling automatic section numbering")
extra_args.append("--number-sections")
try:
# Use pypandoc for more robust conversion
pypandoc.convert_file(
input_file,
"docx",
format="markdown+hard_line_breaks",
outputfile=output_path,
extra_args=extra_args,
)
log.info(f"DOCX successfully generated: {output_path}")
return output_path
except Exception as e:
log.error(f"Pandoc conversion failed: {e}")
raise RuntimeError(f"Pandoc conversion failed: {str(e)}")
def convert_markdown(
input_file: str,
output_path: str,
output_format: str,
add_toc: bool = False,
template_path: str = None,
metadata: dict = None,
):
if not os.path.exists(input_file):
raise FileNotFoundError(f"Input file not found: {input_file}")
log.info(
f"Starting conversion of '{os.path.basename(input_file)}' to {output_format}."
)
with open(input_file, "r", encoding="utf-8") as f:
markdown_text = f.read()
if output_format == "PDF":
_convert_to_pdf(markdown_text, output_path, add_toc)
elif output_format == "DOCX":
if metadata is None:
metadata = {}
_convert_to_docx(markdown_text, output_path, template_path, metadata, add_toc)
else:
raise ValueError(f"Unsupported output format: {output_format}")
return output_path
def convert_pdf_to_markdown(
input_pdf_path: str,
output_md_path: str,
extract_images: bool = True,
image_folder: str = None,
progress_callback=None,
page_limit: int = None,
) -> str:
"""
Convert PDF to Markdown using PyMuPDF with progress callback support and
more robust image extraction and header/footer deduplication.
progress_callback: optional callable(progress_percent:int, message:str)
"""
if not os.path.exists(input_pdf_path):
raise FileNotFoundError(f"Input PDF file not found: {input_pdf_path}")
log.info(f"Converting PDF '{os.path.basename(input_pdf_path)}' to Markdown.")
# Setup image folder if needed
if extract_images:
if image_folder is None:
output_path = Path(output_md_path)
image_folder = output_path.parent / f"{output_path.stem}_images"
image_folder = Path(image_folder)
image_folder.mkdir(exist_ok=True)
log.info(f"Images will be saved to: {image_folder}")
markdown_content = []
image_counter = 0
failed_image_extractions = 0
failed_image_samples = set()
total_image_refs = 0
# Track extracted images to avoid duplicates: map xref -> filename and hash -> filename
_seen_xref_to_file = {}
_seen_hash_to_file = {}
_inserted_images = set()
duplicates_skipped = 0
def _normalize_header_footer(s: str) -> str:
# normalize by stripping whitespace, collapsing spaces, and removing page numbers/digits
if not s:
return ""
ns = re.sub(r"\s+", " ", s.strip())
ns = re.sub(r"\d+", "", ns)
return ns.strip()
try:
# Open PDF with PyMuPDF
doc = fitz.open(input_pdf_path)
num_pages = len(doc)
num_pages_effective = (
min(num_pages, int(page_limit))
if page_limit and page_limit > 0
else num_pages
)
log.info(f"PDF opened successfully. Total pages: {num_pages}")
if progress_callback:
try:
progress_callback(0, f"Opened PDF, {num_pages} pages detected")
except Exception:
pass
# First pass: detect common header/footer text across pages
header_candidates = Counter()
footer_candidates = Counter()
for page_num in range(num_pages):
page = doc[page_num]
blocks = page.get_text("dict").get("blocks", [])
page_h = page.rect.height
top_zone = page_h * 0.12
bottom_zone = page_h * 0.88
for block in blocks:
if block.get("type") != 0:
continue
bbox = block.get("bbox", [0, 0, 0, 0])
y0 = bbox[1]
y1 = bbox[3]
# assemble block text
block_text = " ".join(
span.get("text", "")
for line in block.get("lines", [])
for span in line.get("spans", [])
)
norm = _normalize_header_footer(block_text)
if not norm:
continue
if y0 <= top_zone:
header_candidates[norm] += 1
if y1 >= bottom_zone:
footer_candidates[norm] += 1
# choose header/footer if they appear on majority of pages
header_text = None
footer_text = None
if header_candidates:
most_common_header, count = header_candidates.most_common(1)[0]
if count >= max(3, int(0.6 * num_pages)):
header_text = most_common_header
log.info(
f"Detected common header to remove from pages: '{header_text[:80]}' (appears {count} times)"
)
if footer_candidates:
most_common_footer, countf = footer_candidates.most_common(1)[0]
if countf >= max(3, int(0.6 * num_pages)):
footer_text = most_common_footer
log.info(
f"Detected common footer to remove from pages: '{footer_text[:80]}' (appears {countf} times)"
)
if progress_callback:
try:
progress_callback(2, "Detected common header/footer (if any)")
except Exception:
pass
# If header/footer found, add them once at top
if header_text:
markdown_content.append(header_text + "\n\n")
# Second pass: build markdown, skipping repeated header/footer blocks
for page_num in range(num_pages):
page = doc[page_num]
log.debug(f"Processing page {page_num + 1}/{num_pages}")
if progress_callback:
try:
pct = int((page_num / num_pages) * 100)
progress_callback(
pct, f"Processing page {page_num + 1}/{num_pages}"
)
except Exception:
pass
# Add page separator for multi-page documents
if page_num > 0:
markdown_content.append("\n\n---\n\n")
markdown_content.append(f"## Page {page_num + 1}\n\n")
# Extract text blocks with formatting
blocks = page.get_text("dict").get("blocks", [])
for block in blocks:
if block.get("type") == 0: # Text block
# assemble block text for header/footer check
block_text_full = " ".join(
span.get("text", "")
for line in block.get("lines", [])
for span in line.get("spans", [])
)
norm_block = _normalize_header_footer(block_text_full)
if norm_block:
if header_text and header_text == norm_block:
# skip repeated header
continue
if footer_text and footer_text == norm_block:
# skip repeated footer
continue
# Build a structured representation of lines and spans with x-coordinates
block_x0 = block.get("bbox", [0, 0, 0, 0])[0]
lines_info = []
all_x_positions = []
mono_line_flags = []
for line in block.get("lines", []):
spans_info = []
for span in line.get("spans", []):
text = span.get("text", "").strip()
if not text:
continue
bbox = (
span.get("bbox") or span.get("origin") or [0, 0, 0, 0]
)
x0 = bbox[0]
font_size = span.get("size", 0)
flags = span.get("flags", 0)
font = span.get("font", "")
spans_info.append(
{
"x0": x0,
"text": text,
"size": font_size,
"flags": flags,
"font": font,
}
)
all_x_positions.append(x0)
# detect if this line looks monospaced (code) by font name
mono_flag = any(
"mono" in (s.get("font", "").lower())
or "courier" in (s.get("font", "").lower())
for s in spans_info
)
mono_line_flags.append(mono_flag)
lines_info.append(spans_info)
# Simple heuristic: if there are multiple consistent x-columns, treat as a table
table_md_lines = []
is_table = False
if all_x_positions:
# cluster x positions into columns with tolerance
tol = 8.0
cols = []
for x in sorted(set(all_x_positions)):
placed = False
for i, c in enumerate(cols):
if abs(x - c) <= tol:
# update center
cols[i] = (cols[i] + x) / 2.0
placed = True
break
if not placed:
cols.append(x)
cols = sorted(cols)
# build a matrix of cells per line
matrix = []
for spans in lines_info:
if not spans:
matrix.append([])
continue
row = [""] * len(cols)
for s in spans:
# find nearest column
idx = min(
range(len(cols)),
key=lambda i: abs(s["x0"] - cols[i]),
)
if row[idx]:
row[idx] += " " + s["text"]
else:
row[idx] = s["text"]
matrix.append(row)
# determine if matrix looks like a table: multiple columns and many rows have >1 non-empty cells
if len(cols) >= 2:
rows_with_multi = sum(
1 for r in matrix if sum(1 for c in r if c.strip()) > 1
)
if rows_with_multi >= max(2, len(matrix) // 3):
is_table = True
# convert matrix to markdown table
for r_idx, row in enumerate(matrix):
# join cells with pipe
cleaned = [c.strip() for c in row]
table_md_lines.append(
"| " + " | ".join(cleaned) + " |"
)
# after first row add header separator
if r_idx == 0:
sep = (
"| "
+ " | ".join(
[
"---" if c.strip() else ""
for c in cleaned
]
)
+ " |"
)
table_md_lines.append(sep)
if is_table:
markdown_content.append("\n")
markdown_content.extend([ln + "\n" for ln in table_md_lines])
markdown_content.append("\n")
else:
# Not a table: output lines respecting indentation and formatting
in_code_block = False
for li, spans in enumerate(lines_info):
if not spans:
if in_code_block:
markdown_content.append("```")
in_code_block = False
markdown_content.append("\n")
continue
first_x = spans[0]["x0"] if spans else block_x0
indent_level = max(
0, int(round((first_x - block_x0) / 20.0))
)
# detect mono font sequences and wrap in code block
mono = (
mono_line_flags[li]
if li < len(mono_line_flags)
else False
)
if mono and not in_code_block:
markdown_content.append("```\n")
in_code_block = True
line_text = ""
for span in spans:
text = span["text"]
font_size = span.get("size", 0)
flags = span.get("flags", 0)
# headings detection
if font_size > 18:
text = f"# {text}"
elif font_size > 14:
text = f"## {text}"
elif font_size > 12:
text = f"### {text}"
if flags & 16:
text = f"**{text}**"
if flags & 2:
text = f"*{text}*"
line_text += text + " "
line_text = line_text.strip()
if in_code_block:
markdown_content.append(line_text + "\n")
else:
if indent_level > 0:
markdown_content.append(
" " * indent_level + line_text + "\n"
)
else:
markdown_content.append(line_text + "\n")
if in_code_block:
markdown_content.append("```\n")
# add spacing after block
markdown_content.append("\n")
elif block.get("type") == 1 and extract_images: # Image block
# Extract images for this page safely
try:
imgs = page.get_images(full=True)
except Exception as e:
# Can't list images on this page; count as a failed attempt and continue
failed_image_extractions += 1
if len(failed_image_samples) < 3:
failed_image_samples.add(f"list_images_error: {str(e)}")
imgs = []
# Try to extract each referenced image, but avoid flooding the logs
page_image_failures = 0
for img in imgs:
total_image_refs += 1
try:
# image tuple may vary, try to locate xref safely
if not img:
raise ValueError("empty image tuple")
# prefer first element as xref but be defensive
xref = None
if isinstance(img, (list, tuple)) and len(img) > 0:
xref = img[0]
if xref is None:
raise ValueError(f"unexpected image descriptor: {img}")
# If we've already extracted this xref, reuse filename
if xref in _seen_xref_to_file:
image_filename = _seen_xref_to_file[xref]
relative_path = f"{image_folder.name}/{image_filename}"
# Only insert the image tag once; skip repeated inline images
if image_filename in _inserted_images:
duplicates_skipped += 1
else:
markdown_content.append(
f"\n\n\n"
)
_inserted_images.add(image_filename)
continue
# Use a short timeout for extraction to avoid long hangs on malformed images
try:
from concurrent.futures import (
ThreadPoolExecutor,
TimeoutError,
)
def _extract(x):
return doc.extract_image(x)
with ThreadPoolExecutor(max_workers=1) as ex:
fut = ex.submit(_extract, xref)
base_image = fut.result(timeout=3)
except Exception as tex:
raise RuntimeError(
f"image_extraction_timeout_or_error: {tex}"
)
if not base_image or "image" not in base_image:
raise ValueError(f"no image bytes for xref {xref}")
image_bytes = base_image.get("image")
image_ext = base_image.get("ext", "png")
# Compute hash to detect identical image content
img_hash = (
hashlib.sha256(image_bytes).hexdigest()
if image_bytes
else None
)
# If we've already extracted an identical image (different xref), reuse it
if img_hash and img_hash in _seen_hash_to_file:
image_filename = _seen_hash_to_file[img_hash]
# remember xref mapping for future
_seen_xref_to_file[xref] = image_filename
relative_path = f"{image_folder.name}/{image_filename}"
if image_filename in _inserted_images:
duplicates_skipped += 1
else:
markdown_content.append(
f"\n\n\n"
)
_inserted_images.add(image_filename)
else:
image_counter += 1
image_filename = f"image_{image_counter}.{image_ext}"
image_path = image_folder / image_filename
with open(image_path, "wb") as img_file:
img_file.write(image_bytes)
# register mappings
_seen_xref_to_file[xref] = image_filename
if img_hash:
_seen_hash_to_file[img_hash] = image_filename
relative_path = f"{image_folder.name}/{image_filename}"
markdown_content.append(
f"\n\n\n"
)
_inserted_images.add(image_filename)
except Exception as ie:
page_image_failures += 1
failed_image_extractions += 1
if len(failed_image_samples) < 3:
failed_image_samples.add(str(ie))
if page_image_failures and progress_callback:
try:
progress_callback(
int(((page_num + 1) / num_pages) * 100),
f"{page_image_failures} image(s) failed on page {page_num + 1}",
)
except Exception:
pass
# append footer once if detected
if footer_text:
markdown_content.append("\n\n" + footer_text + "\n")
doc.close()
# Write markdown file
with open(output_md_path, "w", encoding="utf-8") as f:
f.write("".join(markdown_content))
log.info(f"Markdown file successfully created: {output_md_path}")
if extract_images:
log.info(f"Extracted {image_counter} images")
duplicates_reused = max(
0, total_image_refs - image_counter - failed_image_extractions
)
log.info(
f"Unique images saved: {image_counter}; duplicate references reused: {duplicates_reused}"
)
if failed_image_extractions:
sample_list = list(failed_image_samples)
log.warning(
f"Failed to extract {failed_image_extractions} images (sample errors: {sample_list})"
)
if progress_callback:
try:
progress_callback(100, "Conversion finished")
except Exception:
pass
return output_md_path
except Exception as e:
log.error(f"Failed to convert PDF to Markdown: {e}", exc_info=True)
raise RuntimeError(f"PDF to Markdown conversion failed: {str(e)}")