1084 lines
44 KiB
Python
1084 lines
44 KiB
Python
# markdownconverter/core/core.py
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import Counter
|
|
import tempfile
|
|
import subprocess
|
|
from datetime import date
|
|
from pathlib import Path
|
|
import docx
|
|
import pypandoc
|
|
import pdfkit
|
|
import markdown
|
|
import fitz # PyMuPDF
|
|
import hashlib
|
|
from docx.enum.text import WD_BREAK
|
|
from docx2pdf import convert as convert_word
|
|
from ..utils.logger import get_logger
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
# --- Custom Exceptions ---
|
|
class TemplatePlaceholderError(ValueError):
|
|
pass
|
|
|
|
|
|
class ConverterNotFoundError(Exception):
|
|
pass
|
|
|
|
|
|
# --- PDFKit Configuration ---
|
|
try:
|
|
config = pdfkit.configuration()
|
|
log.info("pdfkit configured using wkhtmltopdf from system PATH.")
|
|
except OSError:
|
|
WKHTMLTOPDF_PATH = r"C:\Program Files\wkhtmltopdf\bin\wkhtmltopdf.exe"
|
|
if os.path.exists(WKHTMLTOPDF_PATH):
|
|
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
|
|
else:
|
|
config = None
|
|
log.warning("wkhtmltopdf not found. PDF conversion may fail.")
|
|
|
|
|
|
def scan_template_for_placeholders(template_path: str) -> tuple[list[str], list[str]]:
|
|
if not os.path.exists(template_path):
|
|
raise FileNotFoundError(f"Template file not found: {template_path}")
|
|
log.info(f"Scanning template '{os.path.basename(template_path)}' for placeholders.")
|
|
structural_keys = {"%%REVISION_RECORD%%", "%%DOC_TOC%%", "%%DOC_CONTENT%%"}
|
|
placeholder_pattern = re.compile(r"%%([A-Z0-9_]+)%%")
|
|
found_placeholders = set()
|
|
doc = docx.Document(template_path)
|
|
|
|
def find_in_element(element):
|
|
if hasattr(element, "paragraphs"):
|
|
for p in element.paragraphs:
|
|
for match in placeholder_pattern.finditer(
|
|
"".join(run.text for run in p.runs)
|
|
):
|
|
found_placeholders.add(match.group(0))
|
|
if hasattr(element, "tables"):
|
|
for table in element.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
find_in_element(cell)
|
|
|
|
find_in_element(doc)
|
|
for section in doc.sections:
|
|
find_in_element(section.header)
|
|
find_in_element(section.footer)
|
|
dynamic = sorted([p for p in found_placeholders if p not in structural_keys])
|
|
structural = sorted([p for p in found_placeholders if p in structural_keys])
|
|
log.info(
|
|
f"Scan complete. Found {len(dynamic)} dynamic and {len(structural)} structural placeholders."
|
|
)
|
|
return dynamic, structural
|
|
|
|
|
|
def _get_document_title(markdown_text: str) -> str:
|
|
match = re.search(r"^\s*#+\s+(.+)", markdown_text, re.MULTILINE)
|
|
return match.group(1).strip() if match else "Untitled Document"
|
|
|
|
|
|
def _split_markdown_by_revision_history(
|
|
markdown_text: str, separator_heading="## Revision Record"
|
|
) -> tuple[str, str]:
|
|
pattern = re.compile(
|
|
f"({re.escape(separator_heading)}.*?)(?=\n#+)", re.DOTALL | re.S
|
|
)
|
|
match = pattern.search(markdown_text)
|
|
if not match:
|
|
log.warning(
|
|
f"'{separator_heading}' section not found. No revision history will be added."
|
|
)
|
|
return "", markdown_text
|
|
rev_history_md = match.group(0).strip()
|
|
main_content_md = markdown_text.replace(rev_history_md, "", 1).strip()
|
|
return rev_history_md, main_content_md
|
|
|
|
|
|
def _replace_text_in_paragraph(paragraph, placeholders: dict[str, str]):
|
|
full_text = "".join(run.text for run in paragraph.runs)
|
|
if not any(key in full_text for key in placeholders):
|
|
return
|
|
for key, value in placeholders.items():
|
|
if key in full_text:
|
|
full_text = full_text.replace(key, str(value))
|
|
style = paragraph.runs[0].style if paragraph.runs else None
|
|
font = paragraph.runs[0].font if paragraph.runs else None
|
|
for run in reversed(paragraph.runs):
|
|
p = paragraph._p
|
|
p.remove(run._r)
|
|
new_run = paragraph.add_run(full_text)
|
|
if style:
|
|
new_run.style = style
|
|
if font:
|
|
new_run.font.name = font.name
|
|
new_run.font.size = font.size
|
|
new_run.font.bold = font.bold
|
|
new_run.font.italic = font.italic
|
|
new_run.font.underline = font.underline
|
|
if font.color and font.color.rgb:
|
|
new_run.font.color.rgb = font.color.rgb
|
|
|
|
|
|
def _replace_text_in_element(element, placeholders: dict[str, str]):
|
|
if hasattr(element, "paragraphs"):
|
|
for p in element.paragraphs:
|
|
_replace_text_in_paragraph(p, placeholders)
|
|
if hasattr(element, "tables"):
|
|
for table in element.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
_replace_text_in_element(cell, placeholders)
|
|
|
|
|
|
def _replace_metadata_placeholders(doc: docx.Document, placeholders: dict[str, str]):
|
|
log.info(f"Replacing metadata placeholders: {list(placeholders.keys())}")
|
|
_replace_text_in_element(doc, placeholders)
|
|
for section in doc.sections:
|
|
_replace_text_in_element(section.header, placeholders)
|
|
_replace_text_in_element(section.footer, placeholders)
|
|
|
|
|
|
def _find_placeholder_paragraph(doc: docx.Document, placeholder: str):
|
|
for p in doc.paragraphs:
|
|
if placeholder in "".join(run.text for run in p.runs):
|
|
return p
|
|
return None
|
|
|
|
|
|
def _insert_docx_at_paragraph(paragraph, source_docx_path: str):
|
|
parent = paragraph._p.getparent()
|
|
index = parent.index(paragraph._p)
|
|
source_doc = docx.Document(source_docx_path)
|
|
for element in source_doc.element.body:
|
|
parent.insert(index, element)
|
|
index += 1
|
|
parent.remove(paragraph._p)
|
|
|
|
|
|
def _remove_paragraph(paragraph):
|
|
if paragraph is None:
|
|
return
|
|
parent = paragraph._p.getparent()
|
|
parent.remove(paragraph._p)
|
|
|
|
|
|
def _add_revision_table(doc: docx.Document, rev_history_md: str):
|
|
placeholder_p = _find_placeholder_paragraph(doc, "%%REVISION_RECORD%%")
|
|
if not placeholder_p:
|
|
log.warning("Revision record placeholder not found in template. Skipping.")
|
|
return
|
|
if not rev_history_md:
|
|
log.info("No revision history content found. Removing placeholder.")
|
|
_remove_paragraph(placeholder_p)
|
|
return
|
|
lines = [line.strip() for line in rev_history_md.strip().split("\n")]
|
|
table_lines = [
|
|
line for line in lines if line.startswith("|") and not line.startswith("|:--")
|
|
]
|
|
if not table_lines:
|
|
log.warning(
|
|
"Could not parse a markdown table from the revision history section."
|
|
)
|
|
_remove_paragraph(placeholder_p)
|
|
return
|
|
table_data = [
|
|
[cell.strip() for cell in line.split("|")][1:-1] for line in table_lines
|
|
]
|
|
if not table_data or len(table_data) < 1:
|
|
log.warning("Revision history table is empty.")
|
|
_remove_paragraph(placeholder_p)
|
|
return
|
|
log.info(f"Adding revision history table with {len(table_data)} rows.")
|
|
table = doc.add_table(rows=1, cols=len(table_data[0]))
|
|
table.style = "Table Grid"
|
|
hdr_cells = table.rows[0].cells
|
|
for i, header_text in enumerate(table_data[0]):
|
|
hdr_cells[i].text = header_text
|
|
for row_data in table_data[1:]:
|
|
row_cells = table.add_row().cells
|
|
for i, cell_text in enumerate(row_data):
|
|
row_cells[i].text = cell_text
|
|
parent = placeholder_p._p.getparent()
|
|
parent.insert(parent.index(placeholder_p._p), table._tbl)
|
|
_remove_paragraph(placeholder_p)
|
|
|
|
|
|
def _convert_to_pdf(markdown_text: str, output_file: str, add_toc: bool):
|
|
log.info("Starting PDF conversion using pdfkit.")
|
|
if config is None:
|
|
raise FileNotFoundError("wkhtmltopdf executable not found. Cannot create PDF.")
|
|
|
|
title = _get_document_title(markdown_text)
|
|
content_without_title = markdown_text
|
|
match = re.search(r"^\s*#+\s+(.+)\n?", markdown_text, re.MULTILINE)
|
|
if match:
|
|
content_without_title = markdown_text[match.end() :]
|
|
|
|
# Use nl2br extension to preserve line breaks (consistent with DOCX hard_line_breaks)
|
|
md_converter = markdown.Markdown(
|
|
extensions=["toc", "fenced_code", "tables", "nl2br"]
|
|
)
|
|
|
|
html_body = md_converter.convert(content_without_title)
|
|
toc_html = ""
|
|
if add_toc and hasattr(md_converter, "toc") and md_converter.toc:
|
|
log.info("Generating Table of Contents for PDF.")
|
|
toc_html = f"<h2>Table of Contents</h2>{md_converter.toc}<div style='page-break-after: always;'></div>"
|
|
|
|
full_html = f'<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>{title}</title><style>body{{font-family:sans-serif;}}h1,h2{{border-bottom:1px solid #eaecef;padding-bottom:.3em;}}</style></head><body><h1>{title}</h1>{toc_html}{html_body}</body></html>'
|
|
pdf_options = {"encoding": "UTF-8", "enable-local-file-access": None}
|
|
|
|
pdfkit.from_string(
|
|
full_html, output_file, configuration=config, options=pdf_options
|
|
)
|
|
log.info(f"PDF successfully generated: {output_file}")
|
|
|
|
|
|
def _convert_to_docx(
|
|
markdown_text: str,
|
|
output_file: str,
|
|
template_path: str,
|
|
metadata: dict,
|
|
add_toc: bool,
|
|
):
|
|
log.info("Starting DOCX conversion.")
|
|
dynamic_placeholders, structural_placeholders = scan_template_for_placeholders(
|
|
template_path
|
|
)
|
|
required_structural = {"%%REVISION_RECORD%%", "%%DOC_TOC%%", "%%DOC_CONTENT%%"}
|
|
if not required_structural.issubset(structural_placeholders):
|
|
missing = required_structural - set(structural_placeholders)
|
|
raise TemplatePlaceholderError(
|
|
f"Template is missing required structural placeholders: {', '.join(missing)}"
|
|
)
|
|
doc = docx.Document(template_path)
|
|
if "%%DOC_PROJECT%%" in dynamic_placeholders and not metadata.get(
|
|
"%%DOC_PROJECT%%"
|
|
):
|
|
metadata["%%DOC_PROJECT%%"] = _get_document_title(markdown_text)
|
|
if "%%DOC_DATE%%" in dynamic_placeholders:
|
|
metadata["%%DOC_DATE%%"] = date.today().strftime("%d/%m/%Y")
|
|
_replace_metadata_placeholders(doc, metadata)
|
|
rev_history_md, main_content_md = _split_markdown_by_revision_history(markdown_text)
|
|
_add_revision_table(doc, rev_history_md)
|
|
temp_files = []
|
|
|
|
pandoc_format = "markdown+hard_line_breaks"
|
|
|
|
try:
|
|
if main_content_md:
|
|
content_for_pandoc = main_content_md
|
|
|
|
# Step 1: Remove the main H1 document title from the content to be processed.
|
|
# It's used for metadata, not for the main body's numbering.
|
|
match = re.search(r"^\s*#\s+(.+)\n?", content_for_pandoc, re.MULTILINE)
|
|
if match:
|
|
log.info("Removing main H1 title from content body.")
|
|
content_for_pandoc = content_for_pandoc[match.end() :]
|
|
|
|
# Step 2: Strip any existing manual numbering from headings (e.g., "## 1. Title")
|
|
# to prevent double numbering when automatic numbering is applied.
|
|
log.info("Stripping manual numbering from headings for auto-numbering.")
|
|
content_for_pandoc = re.sub(
|
|
r"^(\s*#+)\s+[0-9\.]+\s+",
|
|
r"\1 ",
|
|
content_for_pandoc,
|
|
flags=re.MULTILINE,
|
|
)
|
|
|
|
# Step 3: Configure Pandoc arguments for correct hierarchical numbering.
|
|
pandoc_args = [
|
|
# Enable automatic section numbering (e.g., 1, 1.1, 1.1.1).
|
|
"--number-sections",
|
|
# Shift heading levels up by one. This maps:
|
|
# ## (H2 in MD) -> Heading 1 in DOCX (numbered as 1, 2, ...)
|
|
# ### (H3 in MD) -> Heading 2 in DOCX (numbered as 1.1, 1.2, ...)
|
|
"--shift-heading-level-by=-1",
|
|
# Keep text left-aligned.
|
|
"--variable=justify:false",
|
|
]
|
|
|
|
if add_toc:
|
|
pandoc_args.append("--toc")
|
|
log.info("Adding page break before Table of Contents.")
|
|
toc_placeholder_p = _find_placeholder_paragraph(doc, "%%DOC_TOC%%")
|
|
# Insert a page break before the TOC for better formatting.
|
|
if toc_placeholder_p:
|
|
toc_placeholder_p.insert_paragraph_before().add_run().add_break(
|
|
WD_BREAK.PAGE
|
|
)
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False, suffix=".docx"
|
|
) as temp_file:
|
|
pypandoc.convert_text(
|
|
content_for_pandoc,
|
|
"docx",
|
|
format=pandoc_format,
|
|
extra_args=pandoc_args,
|
|
outputfile=temp_file.name,
|
|
)
|
|
temp_files.append(temp_file.name)
|
|
if toc_placeholder_p:
|
|
_insert_docx_at_paragraph(toc_placeholder_p, temp_file.name)
|
|
# The main content is now part of the generated TOC doc, so remove the placeholder.
|
|
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_CONTENT%%"))
|
|
else:
|
|
# If no TOC, just insert the content at its placeholder.
|
|
log.info("Adding page break before main content.")
|
|
content_placeholder_p = _find_placeholder_paragraph(
|
|
doc, "%%DOC_CONTENT%%"
|
|
)
|
|
if content_placeholder_p:
|
|
content_placeholder_p.insert_paragraph_before().add_run().add_break(
|
|
WD_BREAK.PAGE
|
|
)
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False, suffix=".docx"
|
|
) as temp_file:
|
|
# We don't add '--toc' to pandoc_args here.
|
|
pypandoc.convert_text(
|
|
content_for_pandoc,
|
|
"docx",
|
|
format=pandoc_format,
|
|
extra_args=pandoc_args,
|
|
outputfile=temp_file.name,
|
|
)
|
|
temp_files.append(temp_file.name)
|
|
if content_placeholder_p:
|
|
_insert_docx_at_paragraph(content_placeholder_p, temp_file.name)
|
|
# TOC placeholder is not used, so remove it.
|
|
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_TOC%%"))
|
|
else:
|
|
# If there is no main content, remove both placeholders.
|
|
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_TOC%%"))
|
|
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_CONTENT%%"))
|
|
|
|
doc.save(output_file)
|
|
log.info(f"Document successfully created at {output_file}")
|
|
finally:
|
|
for temp_file in temp_files:
|
|
if os.path.exists(temp_file):
|
|
os.remove(temp_file)
|
|
|
|
|
|
def convert_docx_to_pdf(
|
|
input_docx_path: str, output_pdf_path: str, max_retries: int = 2
|
|
) -> str:
|
|
"""
|
|
Convert DOCX to PDF using MS Word or LibreOffice with retry logic.
|
|
|
|
Args:
|
|
input_docx_path: Path to the input DOCX file
|
|
output_pdf_path: Path where the PDF will be saved
|
|
max_retries: Maximum number of retry attempts for LibreOffice (default: 2)
|
|
|
|
Returns:
|
|
Path to the generated PDF file
|
|
"""
|
|
if not os.path.exists(input_docx_path):
|
|
raise FileNotFoundError(f"Input DOCX file not found: {input_docx_path}")
|
|
try:
|
|
log.info("Attempting DOCX to PDF conversion using MS Word.")
|
|
convert_word(input_docx_path, output_pdf_path)
|
|
log.info(f"Successfully converted using MS Word: {output_pdf_path}")
|
|
return output_pdf_path
|
|
except Exception as e:
|
|
log.warning(f"MS Word conversion failed. It might not be installed. Error: {e}")
|
|
log.info("Falling back to LibreOffice conversion.")
|
|
libreoffice_path = r"C:\Program Files\LibreOffice\program\soffice.exe"
|
|
if not os.path.exists(libreoffice_path):
|
|
log.error("LibreOffice executable not found. Cannot convert DOCX to PDF.")
|
|
raise ConverterNotFoundError(
|
|
"Neither MS Word nor LibreOffice could be used. Please install one to use this feature."
|
|
)
|
|
if sys.platform == "win32":
|
|
try:
|
|
log.debug(
|
|
"Attempting to terminate existing LibreOffice processes on Windows."
|
|
)
|
|
subprocess.run(
|
|
["taskkill", "/f", "/im", "soffice.exe"],
|
|
check=False,
|
|
capture_output=True,
|
|
)
|
|
subprocess.run(
|
|
["taskkill", "/f", "/im", "soffice.bin"],
|
|
check=False,
|
|
capture_output=True,
|
|
)
|
|
log.debug("Termination commands sent.")
|
|
except Exception as kill_e:
|
|
log.warning(f"Could not terminate existing LibreOffice processes: {kill_e}")
|
|
output_dir = os.path.dirname(output_pdf_path)
|
|
log.info(f"Attempting conversion using LibreOffice at: {libreoffice_path}")
|
|
|
|
# Retry logic for LibreOffice (can fail on first attempt on Windows)
|
|
import time
|
|
|
|
last_error = None
|
|
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
if attempt > 1:
|
|
log.info(
|
|
f"Retry attempt {attempt}/{max_retries} for LibreOffice conversion..."
|
|
)
|
|
time.sleep(2) # Brief pause between retries
|
|
|
|
expected_lo_output = os.path.join(
|
|
output_dir,
|
|
os.path.splitext(os.path.basename(input_docx_path))[0] + ".pdf",
|
|
)
|
|
command = [
|
|
libreoffice_path,
|
|
"--headless",
|
|
"--convert-to",
|
|
"pdf",
|
|
"--outdir",
|
|
output_dir,
|
|
input_docx_path,
|
|
]
|
|
process = subprocess.run(
|
|
command,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="ignore",
|
|
timeout=60,
|
|
)
|
|
log.debug(f"LibreOffice stdout: {process.stdout}")
|
|
log.debug(f"LibreOffice stderr: {process.stderr}")
|
|
if (
|
|
os.path.exists(output_pdf_path)
|
|
and expected_lo_output != output_pdf_path
|
|
):
|
|
os.remove(output_pdf_path)
|
|
if os.path.exists(expected_lo_output):
|
|
if expected_lo_output != output_pdf_path:
|
|
os.rename(expected_lo_output, output_pdf_path)
|
|
else:
|
|
raise FileNotFoundError(
|
|
f"LibreOffice conversion process finished, but the output file was not found at the expected path: {expected_lo_output}"
|
|
)
|
|
log.info(f"Successfully converted using LibreOffice: {output_pdf_path}")
|
|
return output_pdf_path
|
|
|
|
except subprocess.TimeoutExpired as e:
|
|
last_error = e
|
|
log.warning(
|
|
f"LibreOffice conversion timed out (attempt {attempt}/{max_retries})"
|
|
)
|
|
if attempt >= max_retries:
|
|
log.error("LibreOffice conversion failed after all retry attempts.")
|
|
raise
|
|
except (subprocess.CalledProcessError, FileNotFoundError) as e:
|
|
last_error = e
|
|
log.warning(
|
|
f"LibreOffice conversion failed on attempt {attempt}/{max_retries}: {e}"
|
|
)
|
|
if attempt >= max_retries:
|
|
log.error(
|
|
f"LibreOffice conversion failed after all retry attempts. Last error: {e}",
|
|
exc_info=True,
|
|
)
|
|
raise
|
|
|
|
|
|
def combine_markdown_files(markdown_files: list, output_path: str) -> str:
|
|
"""
|
|
Combines multiple markdown files into a single file.
|
|
|
|
Args:
|
|
markdown_files: List of Path objects or strings pointing to markdown files
|
|
output_path: Path where the combined markdown file will be saved
|
|
|
|
Returns:
|
|
Path to the combined markdown file
|
|
"""
|
|
log.info(f"Combining {len(markdown_files)} markdown files into {output_path}")
|
|
|
|
with open(output_path, "w", encoding="utf-8") as out:
|
|
for md_file in markdown_files:
|
|
file_name = os.path.basename(md_file)
|
|
log.debug(f"Adding file: {file_name}")
|
|
out.write(f"\n\n# --- {file_name} ---\n\n")
|
|
with open(md_file, "r", encoding="utf-8") as f:
|
|
out.write(f.read())
|
|
out.write("\n\n")
|
|
|
|
log.info(f"Successfully combined files into: {output_path}")
|
|
return output_path
|
|
|
|
|
|
def convert_markdown_to_docx_with_pandoc(
|
|
input_file: str,
|
|
output_path: str,
|
|
template_path: str = None,
|
|
add_toc: bool = False,
|
|
number_sections: bool = False,
|
|
) -> str:
|
|
"""
|
|
Converts markdown to DOCX using pypandoc with optional template.
|
|
This is a simpler conversion without placeholder replacement.
|
|
|
|
Args:
|
|
input_file: Path to the markdown file
|
|
output_path: Path where the DOCX will be saved
|
|
template_path: Optional path to a DOCX template (reference-doc)
|
|
add_toc: If True, adds a table of contents
|
|
number_sections: If True, automatically numbers sections
|
|
|
|
Returns:
|
|
Path to the generated DOCX file
|
|
"""
|
|
log.info(f"Converting '{os.path.basename(input_file)}' to DOCX using pypandoc.")
|
|
|
|
if not os.path.exists(input_file):
|
|
raise FileNotFoundError(f"Input file not found: {input_file}")
|
|
|
|
# Build pypandoc arguments
|
|
extra_args = ["--variable=justify:false"]
|
|
|
|
if template_path and os.path.exists(template_path):
|
|
log.info(f"Using template: {os.path.basename(template_path)}")
|
|
extra_args.extend(["--reference-doc", str(template_path)])
|
|
|
|
if add_toc:
|
|
log.info("Adding table of contents")
|
|
extra_args.append("--toc")
|
|
|
|
if number_sections:
|
|
log.info("Enabling automatic section numbering")
|
|
extra_args.append("--number-sections")
|
|
|
|
try:
|
|
# Use pypandoc for more robust conversion
|
|
pypandoc.convert_file(
|
|
input_file,
|
|
"docx",
|
|
format="markdown+hard_line_breaks",
|
|
outputfile=output_path,
|
|
extra_args=extra_args,
|
|
)
|
|
log.info(f"DOCX successfully generated: {output_path}")
|
|
return output_path
|
|
except Exception as e:
|
|
log.error(f"Pandoc conversion failed: {e}")
|
|
raise RuntimeError(f"Pandoc conversion failed: {str(e)}")
|
|
|
|
|
|
def convert_markdown(
|
|
input_file: str,
|
|
output_path: str,
|
|
output_format: str,
|
|
add_toc: bool = False,
|
|
template_path: str = None,
|
|
metadata: dict = None,
|
|
):
|
|
if not os.path.exists(input_file):
|
|
raise FileNotFoundError(f"Input file not found: {input_file}")
|
|
log.info(
|
|
f"Starting conversion of '{os.path.basename(input_file)}' to {output_format}."
|
|
)
|
|
with open(input_file, "r", encoding="utf-8") as f:
|
|
markdown_text = f.read()
|
|
if output_format == "PDF":
|
|
_convert_to_pdf(markdown_text, output_path, add_toc)
|
|
elif output_format == "DOCX":
|
|
if metadata is None:
|
|
metadata = {}
|
|
_convert_to_docx(markdown_text, output_path, template_path, metadata, add_toc)
|
|
else:
|
|
raise ValueError(f"Unsupported output format: {output_format}")
|
|
return output_path
|
|
|
|
|
|
def convert_pdf_to_markdown(
|
|
input_pdf_path: str,
|
|
output_md_path: str,
|
|
extract_images: bool = True,
|
|
image_folder: str = None,
|
|
progress_callback=None,
|
|
page_limit: int = None,
|
|
) -> str:
|
|
"""
|
|
Convert PDF to Markdown using PyMuPDF with progress callback support and
|
|
more robust image extraction and header/footer deduplication.
|
|
|
|
progress_callback: optional callable(progress_percent:int, message:str)
|
|
"""
|
|
if not os.path.exists(input_pdf_path):
|
|
raise FileNotFoundError(f"Input PDF file not found: {input_pdf_path}")
|
|
|
|
log.info(f"Converting PDF '{os.path.basename(input_pdf_path)}' to Markdown.")
|
|
|
|
# Setup image folder if needed
|
|
if extract_images:
|
|
if image_folder is None:
|
|
output_path = Path(output_md_path)
|
|
image_folder = output_path.parent / f"{output_path.stem}_images"
|
|
image_folder = Path(image_folder)
|
|
image_folder.mkdir(exist_ok=True)
|
|
log.info(f"Images will be saved to: {image_folder}")
|
|
|
|
markdown_content = []
|
|
image_counter = 0
|
|
failed_image_extractions = 0
|
|
failed_image_samples = set()
|
|
total_image_refs = 0
|
|
# Track extracted images to avoid duplicates: map xref -> filename and hash -> filename
|
|
_seen_xref_to_file = {}
|
|
_seen_hash_to_file = {}
|
|
_inserted_images = set()
|
|
duplicates_skipped = 0
|
|
|
|
def _normalize_header_footer(s: str) -> str:
|
|
# normalize by stripping whitespace, collapsing spaces, and removing page numbers/digits
|
|
if not s:
|
|
return ""
|
|
ns = re.sub(r"\s+", " ", s.strip())
|
|
ns = re.sub(r"\d+", "", ns)
|
|
return ns.strip()
|
|
|
|
try:
|
|
# Open PDF with PyMuPDF
|
|
doc = fitz.open(input_pdf_path)
|
|
num_pages = len(doc)
|
|
num_pages_effective = (
|
|
min(num_pages, int(page_limit))
|
|
if page_limit and page_limit > 0
|
|
else num_pages
|
|
)
|
|
log.info(f"PDF opened successfully. Total pages: {num_pages}")
|
|
if progress_callback:
|
|
try:
|
|
progress_callback(0, f"Opened PDF, {num_pages} pages detected")
|
|
except Exception:
|
|
pass
|
|
|
|
# First pass: detect common header/footer text across pages
|
|
header_candidates = Counter()
|
|
footer_candidates = Counter()
|
|
for page_num in range(num_pages):
|
|
page = doc[page_num]
|
|
blocks = page.get_text("dict").get("blocks", [])
|
|
page_h = page.rect.height
|
|
top_zone = page_h * 0.12
|
|
bottom_zone = page_h * 0.88
|
|
|
|
for block in blocks:
|
|
if block.get("type") != 0:
|
|
continue
|
|
bbox = block.get("bbox", [0, 0, 0, 0])
|
|
y0 = bbox[1]
|
|
y1 = bbox[3]
|
|
# assemble block text
|
|
block_text = " ".join(
|
|
span.get("text", "")
|
|
for line in block.get("lines", [])
|
|
for span in line.get("spans", [])
|
|
)
|
|
norm = _normalize_header_footer(block_text)
|
|
if not norm:
|
|
continue
|
|
if y0 <= top_zone:
|
|
header_candidates[norm] += 1
|
|
if y1 >= bottom_zone:
|
|
footer_candidates[norm] += 1
|
|
|
|
# choose header/footer if they appear on majority of pages
|
|
header_text = None
|
|
footer_text = None
|
|
if header_candidates:
|
|
most_common_header, count = header_candidates.most_common(1)[0]
|
|
if count >= max(3, int(0.6 * num_pages)):
|
|
header_text = most_common_header
|
|
log.info(
|
|
f"Detected common header to remove from pages: '{header_text[:80]}' (appears {count} times)"
|
|
)
|
|
if footer_candidates:
|
|
most_common_footer, countf = footer_candidates.most_common(1)[0]
|
|
if countf >= max(3, int(0.6 * num_pages)):
|
|
footer_text = most_common_footer
|
|
log.info(
|
|
f"Detected common footer to remove from pages: '{footer_text[:80]}' (appears {countf} times)"
|
|
)
|
|
|
|
if progress_callback:
|
|
try:
|
|
progress_callback(2, "Detected common header/footer (if any)")
|
|
except Exception:
|
|
pass
|
|
|
|
# If header/footer found, add them once at top
|
|
if header_text:
|
|
markdown_content.append(header_text + "\n\n")
|
|
|
|
# Second pass: build markdown, skipping repeated header/footer blocks
|
|
for page_num in range(num_pages):
|
|
page = doc[page_num]
|
|
log.debug(f"Processing page {page_num + 1}/{num_pages}")
|
|
if progress_callback:
|
|
try:
|
|
pct = int((page_num / num_pages) * 100)
|
|
progress_callback(
|
|
pct, f"Processing page {page_num + 1}/{num_pages}"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Add page separator for multi-page documents
|
|
if page_num > 0:
|
|
markdown_content.append("\n\n---\n\n")
|
|
markdown_content.append(f"## Page {page_num + 1}\n\n")
|
|
|
|
# Extract text blocks with formatting
|
|
blocks = page.get_text("dict").get("blocks", [])
|
|
|
|
for block in blocks:
|
|
if block.get("type") == 0: # Text block
|
|
# assemble block text for header/footer check
|
|
block_text_full = " ".join(
|
|
span.get("text", "")
|
|
for line in block.get("lines", [])
|
|
for span in line.get("spans", [])
|
|
)
|
|
norm_block = _normalize_header_footer(block_text_full)
|
|
if norm_block:
|
|
if header_text and header_text == norm_block:
|
|
# skip repeated header
|
|
continue
|
|
if footer_text and footer_text == norm_block:
|
|
# skip repeated footer
|
|
continue
|
|
|
|
# Build a structured representation of lines and spans with x-coordinates
|
|
block_x0 = block.get("bbox", [0, 0, 0, 0])[0]
|
|
lines_info = []
|
|
all_x_positions = []
|
|
mono_line_flags = []
|
|
|
|
for line in block.get("lines", []):
|
|
spans_info = []
|
|
for span in line.get("spans", []):
|
|
text = span.get("text", "").strip()
|
|
if not text:
|
|
continue
|
|
bbox = (
|
|
span.get("bbox") or span.get("origin") or [0, 0, 0, 0]
|
|
)
|
|
x0 = bbox[0]
|
|
font_size = span.get("size", 0)
|
|
flags = span.get("flags", 0)
|
|
font = span.get("font", "")
|
|
spans_info.append(
|
|
{
|
|
"x0": x0,
|
|
"text": text,
|
|
"size": font_size,
|
|
"flags": flags,
|
|
"font": font,
|
|
}
|
|
)
|
|
all_x_positions.append(x0)
|
|
# detect if this line looks monospaced (code) by font name
|
|
mono_flag = any(
|
|
"mono" in (s.get("font", "").lower())
|
|
or "courier" in (s.get("font", "").lower())
|
|
for s in spans_info
|
|
)
|
|
mono_line_flags.append(mono_flag)
|
|
lines_info.append(spans_info)
|
|
|
|
# Simple heuristic: if there are multiple consistent x-columns, treat as a table
|
|
table_md_lines = []
|
|
is_table = False
|
|
if all_x_positions:
|
|
# cluster x positions into columns with tolerance
|
|
tol = 8.0
|
|
cols = []
|
|
for x in sorted(set(all_x_positions)):
|
|
placed = False
|
|
for i, c in enumerate(cols):
|
|
if abs(x - c) <= tol:
|
|
# update center
|
|
cols[i] = (cols[i] + x) / 2.0
|
|
placed = True
|
|
break
|
|
if not placed:
|
|
cols.append(x)
|
|
cols = sorted(cols)
|
|
|
|
# build a matrix of cells per line
|
|
matrix = []
|
|
for spans in lines_info:
|
|
if not spans:
|
|
matrix.append([])
|
|
continue
|
|
row = [""] * len(cols)
|
|
for s in spans:
|
|
# find nearest column
|
|
idx = min(
|
|
range(len(cols)),
|
|
key=lambda i: abs(s["x0"] - cols[i]),
|
|
)
|
|
if row[idx]:
|
|
row[idx] += " " + s["text"]
|
|
else:
|
|
row[idx] = s["text"]
|
|
matrix.append(row)
|
|
|
|
# determine if matrix looks like a table: multiple columns and many rows have >1 non-empty cells
|
|
if len(cols) >= 2:
|
|
rows_with_multi = sum(
|
|
1 for r in matrix if sum(1 for c in r if c.strip()) > 1
|
|
)
|
|
if rows_with_multi >= max(2, len(matrix) // 3):
|
|
is_table = True
|
|
# convert matrix to markdown table
|
|
for r_idx, row in enumerate(matrix):
|
|
# join cells with pipe
|
|
cleaned = [c.strip() for c in row]
|
|
table_md_lines.append(
|
|
"| " + " | ".join(cleaned) + " |"
|
|
)
|
|
# after first row add header separator
|
|
if r_idx == 0:
|
|
sep = (
|
|
"| "
|
|
+ " | ".join(
|
|
[
|
|
"---" if c.strip() else ""
|
|
for c in cleaned
|
|
]
|
|
)
|
|
+ " |"
|
|
)
|
|
table_md_lines.append(sep)
|
|
|
|
if is_table:
|
|
markdown_content.append("\n")
|
|
markdown_content.extend([ln + "\n" for ln in table_md_lines])
|
|
markdown_content.append("\n")
|
|
else:
|
|
# Not a table: output lines respecting indentation and formatting
|
|
in_code_block = False
|
|
for li, spans in enumerate(lines_info):
|
|
if not spans:
|
|
if in_code_block:
|
|
markdown_content.append("```")
|
|
in_code_block = False
|
|
markdown_content.append("\n")
|
|
continue
|
|
|
|
first_x = spans[0]["x0"] if spans else block_x0
|
|
indent_level = max(
|
|
0, int(round((first_x - block_x0) / 20.0))
|
|
)
|
|
|
|
# detect mono font sequences and wrap in code block
|
|
mono = (
|
|
mono_line_flags[li]
|
|
if li < len(mono_line_flags)
|
|
else False
|
|
)
|
|
if mono and not in_code_block:
|
|
markdown_content.append("```\n")
|
|
in_code_block = True
|
|
|
|
line_text = ""
|
|
for span in spans:
|
|
text = span["text"]
|
|
font_size = span.get("size", 0)
|
|
flags = span.get("flags", 0)
|
|
|
|
# headings detection
|
|
if font_size > 18:
|
|
text = f"# {text}"
|
|
elif font_size > 14:
|
|
text = f"## {text}"
|
|
elif font_size > 12:
|
|
text = f"### {text}"
|
|
|
|
if flags & 16:
|
|
text = f"**{text}**"
|
|
if flags & 2:
|
|
text = f"*{text}*"
|
|
|
|
line_text += text + " "
|
|
|
|
line_text = line_text.strip()
|
|
if in_code_block:
|
|
markdown_content.append(line_text + "\n")
|
|
else:
|
|
if indent_level > 0:
|
|
markdown_content.append(
|
|
" " * indent_level + line_text + "\n"
|
|
)
|
|
else:
|
|
markdown_content.append(line_text + "\n")
|
|
|
|
if in_code_block:
|
|
markdown_content.append("```\n")
|
|
|
|
# add spacing after block
|
|
markdown_content.append("\n")
|
|
|
|
elif block.get("type") == 1 and extract_images: # Image block
|
|
# Extract images for this page safely
|
|
try:
|
|
imgs = page.get_images(full=True)
|
|
except Exception as e:
|
|
# Can't list images on this page; count as a failed attempt and continue
|
|
failed_image_extractions += 1
|
|
if len(failed_image_samples) < 3:
|
|
failed_image_samples.add(f"list_images_error: {str(e)}")
|
|
imgs = []
|
|
|
|
# Try to extract each referenced image, but avoid flooding the logs
|
|
page_image_failures = 0
|
|
for img in imgs:
|
|
total_image_refs += 1
|
|
try:
|
|
# image tuple may vary, try to locate xref safely
|
|
if not img:
|
|
raise ValueError("empty image tuple")
|
|
# prefer first element as xref but be defensive
|
|
xref = None
|
|
if isinstance(img, (list, tuple)) and len(img) > 0:
|
|
xref = img[0]
|
|
if xref is None:
|
|
raise ValueError(f"unexpected image descriptor: {img}")
|
|
|
|
# If we've already extracted this xref, reuse filename
|
|
if xref in _seen_xref_to_file:
|
|
image_filename = _seen_xref_to_file[xref]
|
|
relative_path = f"{image_folder.name}/{image_filename}"
|
|
# Only insert the image tag once; skip repeated inline images
|
|
if image_filename in _inserted_images:
|
|
duplicates_skipped += 1
|
|
else:
|
|
markdown_content.append(
|
|
f"\n\n\n"
|
|
)
|
|
_inserted_images.add(image_filename)
|
|
continue
|
|
|
|
# Use a short timeout for extraction to avoid long hangs on malformed images
|
|
try:
|
|
from concurrent.futures import (
|
|
ThreadPoolExecutor,
|
|
TimeoutError,
|
|
)
|
|
|
|
def _extract(x):
|
|
return doc.extract_image(x)
|
|
|
|
with ThreadPoolExecutor(max_workers=1) as ex:
|
|
fut = ex.submit(_extract, xref)
|
|
base_image = fut.result(timeout=3)
|
|
except Exception as tex:
|
|
raise RuntimeError(
|
|
f"image_extraction_timeout_or_error: {tex}"
|
|
)
|
|
if not base_image or "image" not in base_image:
|
|
raise ValueError(f"no image bytes for xref {xref}")
|
|
|
|
image_bytes = base_image.get("image")
|
|
image_ext = base_image.get("ext", "png")
|
|
|
|
# Compute hash to detect identical image content
|
|
img_hash = (
|
|
hashlib.sha256(image_bytes).hexdigest()
|
|
if image_bytes
|
|
else None
|
|
)
|
|
|
|
# If we've already extracted an identical image (different xref), reuse it
|
|
if img_hash and img_hash in _seen_hash_to_file:
|
|
image_filename = _seen_hash_to_file[img_hash]
|
|
# remember xref mapping for future
|
|
_seen_xref_to_file[xref] = image_filename
|
|
relative_path = f"{image_folder.name}/{image_filename}"
|
|
if image_filename in _inserted_images:
|
|
duplicates_skipped += 1
|
|
else:
|
|
markdown_content.append(
|
|
f"\n\n\n"
|
|
)
|
|
_inserted_images.add(image_filename)
|
|
else:
|
|
image_counter += 1
|
|
image_filename = f"image_{image_counter}.{image_ext}"
|
|
image_path = image_folder / image_filename
|
|
with open(image_path, "wb") as img_file:
|
|
img_file.write(image_bytes)
|
|
# register mappings
|
|
_seen_xref_to_file[xref] = image_filename
|
|
if img_hash:
|
|
_seen_hash_to_file[img_hash] = image_filename
|
|
relative_path = f"{image_folder.name}/{image_filename}"
|
|
markdown_content.append(
|
|
f"\n\n\n"
|
|
)
|
|
_inserted_images.add(image_filename)
|
|
except Exception as ie:
|
|
page_image_failures += 1
|
|
failed_image_extractions += 1
|
|
if len(failed_image_samples) < 3:
|
|
failed_image_samples.add(str(ie))
|
|
|
|
if page_image_failures and progress_callback:
|
|
try:
|
|
progress_callback(
|
|
int(((page_num + 1) / num_pages) * 100),
|
|
f"{page_image_failures} image(s) failed on page {page_num + 1}",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# append footer once if detected
|
|
if footer_text:
|
|
markdown_content.append("\n\n" + footer_text + "\n")
|
|
|
|
doc.close()
|
|
|
|
# Write markdown file
|
|
with open(output_md_path, "w", encoding="utf-8") as f:
|
|
f.write("".join(markdown_content))
|
|
|
|
log.info(f"Markdown file successfully created: {output_md_path}")
|
|
if extract_images:
|
|
log.info(f"Extracted {image_counter} images")
|
|
duplicates_reused = max(
|
|
0, total_image_refs - image_counter - failed_image_extractions
|
|
)
|
|
log.info(
|
|
f"Unique images saved: {image_counter}; duplicate references reused: {duplicates_reused}"
|
|
)
|
|
if failed_image_extractions:
|
|
sample_list = list(failed_image_samples)
|
|
log.warning(
|
|
f"Failed to extract {failed_image_extractions} images (sample errors: {sample_list})"
|
|
)
|
|
|
|
if progress_callback:
|
|
try:
|
|
progress_callback(100, "Conversion finished")
|
|
except Exception:
|
|
pass
|
|
|
|
return output_md_path
|
|
|
|
except Exception as e:
|
|
log.error(f"Failed to convert PDF to Markdown: {e}", exc_info=True)
|
|
raise RuntimeError(f"PDF to Markdown conversion failed: {str(e)}")
|