534 lines
21 KiB
Python
534 lines
21 KiB
Python
# markdownconverter/core/core.py
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import tempfile
|
|
import subprocess
|
|
from datetime import date
|
|
import docx
|
|
import pypandoc
|
|
import pdfkit
|
|
import markdown
|
|
from docx.enum.text import WD_BREAK
|
|
from docx2pdf import convert as convert_word
|
|
from ..utils.logger import get_logger
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
# --- Custom Exceptions ---
|
|
class TemplatePlaceholderError(ValueError):
|
|
pass
|
|
|
|
|
|
class ConverterNotFoundError(Exception):
|
|
pass
|
|
|
|
|
|
# --- PDFKit Configuration ---
|
|
try:
|
|
config = pdfkit.configuration()
|
|
log.info("pdfkit configured using wkhtmltopdf from system PATH.")
|
|
except OSError:
|
|
WKHTMLTOPDF_PATH = r"C:\Program Files\wkhtmltopdf\bin\wkhtmltopdf.exe"
|
|
if os.path.exists(WKHTMLTOPDF_PATH):
|
|
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
|
|
else:
|
|
config = None
|
|
log.warning("wkhtmltopdf not found. PDF conversion may fail.")
|
|
|
|
|
|
def scan_template_for_placeholders(template_path: str) -> tuple[list[str], list[str]]:
|
|
if not os.path.exists(template_path):
|
|
raise FileNotFoundError(f"Template file not found: {template_path}")
|
|
log.info(f"Scanning template '{os.path.basename(template_path)}' for placeholders.")
|
|
structural_keys = {"%%REVISION_RECORD%%", "%%DOC_TOC%%", "%%DOC_CONTENT%%"}
|
|
placeholder_pattern = re.compile(r"%%([A-Z0-9_]+)%%")
|
|
found_placeholders = set()
|
|
doc = docx.Document(template_path)
|
|
|
|
def find_in_element(element):
|
|
if hasattr(element, "paragraphs"):
|
|
for p in element.paragraphs:
|
|
for match in placeholder_pattern.finditer(
|
|
"".join(run.text for run in p.runs)
|
|
):
|
|
found_placeholders.add(match.group(0))
|
|
if hasattr(element, "tables"):
|
|
for table in element.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
find_in_element(cell)
|
|
|
|
find_in_element(doc)
|
|
for section in doc.sections:
|
|
find_in_element(section.header)
|
|
find_in_element(section.footer)
|
|
dynamic = sorted([p for p in found_placeholders if p not in structural_keys])
|
|
structural = sorted([p for p in found_placeholders if p in structural_keys])
|
|
log.info(
|
|
f"Scan complete. Found {len(dynamic)} dynamic and {len(structural)} structural placeholders."
|
|
)
|
|
return dynamic, structural
|
|
|
|
|
|
def _get_document_title(markdown_text: str) -> str:
|
|
match = re.search(r"^\s*#+\s+(.+)", markdown_text, re.MULTILINE)
|
|
return match.group(1).strip() if match else "Untitled Document"
|
|
|
|
|
|
def _split_markdown_by_revision_history(
|
|
markdown_text: str, separator_heading="## Revision Record"
|
|
) -> tuple[str, str]:
|
|
pattern = re.compile(
|
|
f"({re.escape(separator_heading)}.*?)(?=\n#+)", re.DOTALL | re.S
|
|
)
|
|
match = pattern.search(markdown_text)
|
|
if not match:
|
|
log.warning(
|
|
f"'{separator_heading}' section not found. No revision history will be added."
|
|
)
|
|
return "", markdown_text
|
|
rev_history_md = match.group(0).strip()
|
|
main_content_md = markdown_text.replace(rev_history_md, "", 1).strip()
|
|
return rev_history_md, main_content_md
|
|
|
|
|
|
def _replace_text_in_paragraph(paragraph, placeholders: dict[str, str]):
|
|
full_text = "".join(run.text for run in paragraph.runs)
|
|
if not any(key in full_text for key in placeholders):
|
|
return
|
|
for key, value in placeholders.items():
|
|
if key in full_text:
|
|
full_text = full_text.replace(key, str(value))
|
|
style = paragraph.runs[0].style if paragraph.runs else None
|
|
font = paragraph.runs[0].font if paragraph.runs else None
|
|
for run in reversed(paragraph.runs):
|
|
p = paragraph._p
|
|
p.remove(run._r)
|
|
new_run = paragraph.add_run(full_text)
|
|
if style:
|
|
new_run.style = style
|
|
if font:
|
|
new_run.font.name = font.name
|
|
new_run.font.size = font.size
|
|
new_run.font.bold = font.bold
|
|
new_run.font.italic = font.italic
|
|
new_run.font.underline = font.underline
|
|
if font.color and font.color.rgb:
|
|
new_run.font.color.rgb = font.color.rgb
|
|
|
|
|
|
def _replace_text_in_element(element, placeholders: dict[str, str]):
|
|
if hasattr(element, "paragraphs"):
|
|
for p in element.paragraphs:
|
|
_replace_text_in_paragraph(p, placeholders)
|
|
if hasattr(element, "tables"):
|
|
for table in element.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
_replace_text_in_element(cell, placeholders)
|
|
|
|
|
|
def _replace_metadata_placeholders(doc: docx.Document, placeholders: dict[str, str]):
|
|
log.info(f"Replacing metadata placeholders: {list(placeholders.keys())}")
|
|
_replace_text_in_element(doc, placeholders)
|
|
for section in doc.sections:
|
|
_replace_text_in_element(section.header, placeholders)
|
|
_replace_text_in_element(section.footer, placeholders)
|
|
|
|
|
|
def _find_placeholder_paragraph(doc: docx.Document, placeholder: str):
|
|
for p in doc.paragraphs:
|
|
if placeholder in "".join(run.text for run in p.runs):
|
|
return p
|
|
return None
|
|
|
|
|
|
def _insert_docx_at_paragraph(paragraph, source_docx_path: str):
|
|
parent = paragraph._p.getparent()
|
|
index = parent.index(paragraph._p)
|
|
source_doc = docx.Document(source_docx_path)
|
|
for element in source_doc.element.body:
|
|
parent.insert(index, element)
|
|
index += 1
|
|
parent.remove(paragraph._p)
|
|
|
|
|
|
def _remove_paragraph(paragraph):
|
|
if paragraph is None:
|
|
return
|
|
parent = paragraph._p.getparent()
|
|
parent.remove(paragraph._p)
|
|
|
|
|
|
def _add_revision_table(doc: docx.Document, rev_history_md: str):
|
|
placeholder_p = _find_placeholder_paragraph(doc, "%%REVISION_RECORD%%")
|
|
if not placeholder_p:
|
|
log.warning("Revision record placeholder not found in template. Skipping.")
|
|
return
|
|
if not rev_history_md:
|
|
log.info("No revision history content found. Removing placeholder.")
|
|
_remove_paragraph(placeholder_p)
|
|
return
|
|
lines = [line.strip() for line in rev_history_md.strip().split("\n")]
|
|
table_lines = [
|
|
line for line in lines if line.startswith("|") and not line.startswith("|:--")
|
|
]
|
|
if not table_lines:
|
|
log.warning(
|
|
"Could not parse a markdown table from the revision history section."
|
|
)
|
|
_remove_paragraph(placeholder_p)
|
|
return
|
|
table_data = [
|
|
[cell.strip() for cell in line.split("|")][1:-1] for line in table_lines
|
|
]
|
|
if not table_data or len(table_data) < 1:
|
|
log.warning("Revision history table is empty.")
|
|
_remove_paragraph(placeholder_p)
|
|
return
|
|
log.info(f"Adding revision history table with {len(table_data)} rows.")
|
|
table = doc.add_table(rows=1, cols=len(table_data[0]))
|
|
table.style = "Table Grid"
|
|
hdr_cells = table.rows[0].cells
|
|
for i, header_text in enumerate(table_data[0]):
|
|
hdr_cells[i].text = header_text
|
|
for row_data in table_data[1:]:
|
|
row_cells = table.add_row().cells
|
|
for i, cell_text in enumerate(row_data):
|
|
row_cells[i].text = cell_text
|
|
parent = placeholder_p._p.getparent()
|
|
parent.insert(parent.index(placeholder_p._p), table._tbl)
|
|
_remove_paragraph(placeholder_p)
|
|
|
|
|
|
def _convert_to_pdf(markdown_text: str, output_file: str, add_toc: bool):
|
|
log.info("Starting PDF conversion using pdfkit.")
|
|
if config is None:
|
|
raise FileNotFoundError("wkhtmltopdf executable not found. Cannot create PDF.")
|
|
|
|
title = _get_document_title(markdown_text)
|
|
content_without_title = markdown_text
|
|
match = re.search(r"^\s*#+\s+(.+)\n?", markdown_text, re.MULTILINE)
|
|
if match:
|
|
content_without_title = markdown_text[match.end() :]
|
|
|
|
# Previous code:
|
|
# md_converter = markdown.Markdown(extensions=["toc", "fenced_code", "tables"])
|
|
# New code with 'nl2br' extension:
|
|
md_converter = markdown.Markdown(extensions=["toc", "fenced_code", "tables", "nl2br"])
|
|
|
|
html_body = md_converter.convert(content_without_title)
|
|
toc_html = ""
|
|
if add_toc and hasattr(md_converter, "toc") and md_converter.toc:
|
|
log.info("Generating Table of Contents for PDF.")
|
|
toc_html = f"<h2>Table of Contents</h2>{md_converter.toc}<div style='page-break-after: always;'></div>"
|
|
|
|
full_html = f'<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>{title}</title><style>body{{font-family:sans-serif;}}h1,h2{{border-bottom:1px solid #eaecef;padding-bottom:.3em;}}</style></head><body><h1>{title}</h1>{toc_html}{html_body}</body></html>'
|
|
pdf_options = {"encoding": "UTF-8", "enable-local-file-access": None}
|
|
|
|
pdfkit.from_string(
|
|
full_html, output_file, configuration=config, options=pdf_options
|
|
)
|
|
log.info(f"PDF successfully generated: {output_file}")
|
|
|
|
|
|
def _convert_to_docx(
|
|
markdown_text: str,
|
|
output_file: str,
|
|
template_path: str,
|
|
metadata: dict,
|
|
add_toc: bool,
|
|
):
|
|
log.info("Starting DOCX conversion.")
|
|
dynamic_placeholders, structural_placeholders = scan_template_for_placeholders(
|
|
template_path
|
|
)
|
|
required_structural = {"%%REVISION_RECORD%%", "%%DOC_TOC%%", "%%DOC_CONTENT%%"}
|
|
if not required_structural.issubset(structural_placeholders):
|
|
missing = required_structural - set(structural_placeholders)
|
|
raise TemplatePlaceholderError(
|
|
f"Template is missing required structural placeholders: {', '.join(missing)}"
|
|
)
|
|
doc = docx.Document(template_path)
|
|
if "%%DOC_PROJECT%%" in dynamic_placeholders and not metadata.get(
|
|
"%%DOC_PROJECT%%"
|
|
):
|
|
metadata["%%DOC_PROJECT%%"] = _get_document_title(markdown_text)
|
|
if "%%DOC_DATE%%" in dynamic_placeholders:
|
|
metadata["%%DOC_DATE%%"] = date.today().strftime("%d/%m/%Y")
|
|
_replace_metadata_placeholders(doc, metadata)
|
|
rev_history_md, main_content_md = _split_markdown_by_revision_history(markdown_text)
|
|
_add_revision_table(doc, rev_history_md)
|
|
temp_files = []
|
|
|
|
pandoc_format = "markdown+hard_line_breaks"
|
|
|
|
try:
|
|
if main_content_md:
|
|
content_for_pandoc = main_content_md
|
|
|
|
# Step 1: Remove the main H1 document title from the content to be processed.
|
|
# It's used for metadata, not for the main body's numbering.
|
|
match = re.search(r"^\s*#\s+(.+)\n?", content_for_pandoc, re.MULTILINE)
|
|
if match:
|
|
log.info("Removing main H1 title from content body.")
|
|
content_for_pandoc = content_for_pandoc[match.end() :]
|
|
|
|
# Step 2: Strip any existing manual numbering from headings (e.g., "## 1. Title")
|
|
# to prevent double numbering when automatic numbering is applied.
|
|
log.info("Stripping manual numbering from headings for auto-numbering.")
|
|
content_for_pandoc = re.sub(
|
|
r"^(\s*#+)\s+[0-9\.]+\s+",
|
|
r"\1 ",
|
|
content_for_pandoc,
|
|
flags=re.MULTILINE,
|
|
)
|
|
|
|
# Step 3: Configure Pandoc arguments for correct hierarchical numbering.
|
|
pandoc_args = [
|
|
# Enable automatic section numbering (e.g., 1, 1.1, 1.1.1).
|
|
"--number-sections",
|
|
# Shift heading levels up by one. This maps:
|
|
# ## (H2 in MD) -> Heading 1 in DOCX (numbered as 1, 2, ...)
|
|
# ### (H3 in MD) -> Heading 2 in DOCX (numbered as 1.1, 1.2, ...)
|
|
"--shift-heading-level-by=-1",
|
|
# Keep text left-aligned.
|
|
"--variable=justify:false",
|
|
]
|
|
|
|
if add_toc:
|
|
pandoc_args.append("--toc")
|
|
log.info("Adding page break before Table of Contents.")
|
|
toc_placeholder_p = _find_placeholder_paragraph(doc, "%%DOC_TOC%%")
|
|
# Insert a page break before the TOC for better formatting.
|
|
if toc_placeholder_p:
|
|
toc_placeholder_p.insert_paragraph_before().add_run().add_break(
|
|
WD_BREAK.PAGE
|
|
)
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False, suffix=".docx"
|
|
) as temp_file:
|
|
pypandoc.convert_text(
|
|
content_for_pandoc,
|
|
"docx",
|
|
format=pandoc_format,
|
|
extra_args=pandoc_args,
|
|
outputfile=temp_file.name,
|
|
)
|
|
temp_files.append(temp_file.name)
|
|
if toc_placeholder_p:
|
|
_insert_docx_at_paragraph(toc_placeholder_p, temp_file.name)
|
|
# The main content is now part of the generated TOC doc, so remove the placeholder.
|
|
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_CONTENT%%"))
|
|
else:
|
|
# If no TOC, just insert the content at its placeholder.
|
|
log.info("Adding page break before main content.")
|
|
content_placeholder_p = _find_placeholder_paragraph(
|
|
doc, "%%DOC_CONTENT%%"
|
|
)
|
|
if content_placeholder_p:
|
|
content_placeholder_p.insert_paragraph_before().add_run().add_break(
|
|
WD_BREAK.PAGE
|
|
)
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False, suffix=".docx"
|
|
) as temp_file:
|
|
# We don't add '--toc' to pandoc_args here.
|
|
pypandoc.convert_text(
|
|
content_for_pandoc,
|
|
"docx",
|
|
format=pandoc_format,
|
|
extra_args=pandoc_args,
|
|
outputfile=temp_file.name,
|
|
)
|
|
temp_files.append(temp_file.name)
|
|
if content_placeholder_p:
|
|
_insert_docx_at_paragraph(content_placeholder_p, temp_file.name)
|
|
# TOC placeholder is not used, so remove it.
|
|
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_TOC%%"))
|
|
else:
|
|
# If there is no main content, remove both placeholders.
|
|
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_TOC%%"))
|
|
_remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_CONTENT%%"))
|
|
|
|
doc.save(output_file)
|
|
log.info(f"Document successfully created at {output_file}")
|
|
finally:
|
|
for temp_file in temp_files:
|
|
if os.path.exists(temp_file):
|
|
os.remove(temp_file)
|
|
|
|
|
|
def convert_docx_to_pdf(input_docx_path: str, output_pdf_path: str) -> str:
|
|
if not os.path.exists(input_docx_path):
|
|
raise FileNotFoundError(f"Input DOCX file not found: {input_docx_path}")
|
|
try:
|
|
log.info("Attempting DOCX to PDF conversion using MS Word.")
|
|
convert_word(input_docx_path, output_pdf_path)
|
|
log.info(f"Successfully converted using MS Word: {output_pdf_path}")
|
|
return output_pdf_path
|
|
except Exception as e:
|
|
log.warning(f"MS Word conversion failed. It might not be installed. Error: {e}")
|
|
log.info("Falling back to LibreOffice conversion.")
|
|
libreoffice_path = r"C:\Program Files\LibreOffice\program\soffice.exe"
|
|
if not os.path.exists(libreoffice_path):
|
|
log.error("LibreOffice executable not found. Cannot convert DOCX to PDF.")
|
|
raise ConverterNotFoundError(
|
|
"Neither MS Word nor LibreOffice could be used. Please install one to use this feature."
|
|
)
|
|
if sys.platform == "win32":
|
|
try:
|
|
log.debug(
|
|
"Attempting to terminate existing LibreOffice processes on Windows."
|
|
)
|
|
subprocess.run(
|
|
["taskkill", "/f", "/im", "soffice.exe"],
|
|
check=False,
|
|
capture_output=True,
|
|
)
|
|
subprocess.run(
|
|
["taskkill", "/f", "/im", "soffice.bin"],
|
|
check=False,
|
|
capture_output=True,
|
|
)
|
|
log.debug("Termination commands sent.")
|
|
except Exception as kill_e:
|
|
log.warning(f"Could not terminate existing LibreOffice processes: {kill_e}")
|
|
output_dir = os.path.dirname(output_pdf_path)
|
|
log.info(f"Attempting conversion using LibreOffice at: {libreoffice_path}")
|
|
try:
|
|
expected_lo_output = os.path.join(
|
|
output_dir, os.path.splitext(os.path.basename(input_docx_path))[0] + ".pdf"
|
|
)
|
|
command = [
|
|
libreoffice_path,
|
|
"--headless",
|
|
"--convert-to",
|
|
"pdf",
|
|
"--outdir",
|
|
output_dir,
|
|
input_docx_path,
|
|
]
|
|
process = subprocess.run(
|
|
command,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="ignore",
|
|
timeout=60,
|
|
)
|
|
log.debug(f"LibreOffice stdout: {process.stdout}")
|
|
log.debug(f"LibreOffice stderr: {process.stderr}")
|
|
if os.path.exists(output_pdf_path) and expected_lo_output != output_pdf_path:
|
|
os.remove(output_pdf_path)
|
|
if os.path.exists(expected_lo_output):
|
|
if expected_lo_output != output_pdf_path:
|
|
os.rename(expected_lo_output, output_pdf_path)
|
|
else:
|
|
raise FileNotFoundError(
|
|
f"LibreOffice conversion process finished, but the output file was not found at the expected path: {expected_lo_output}"
|
|
)
|
|
log.info(f"Successfully converted using LibreOffice: {output_pdf_path}")
|
|
return output_pdf_path
|
|
except subprocess.TimeoutExpired:
|
|
log.error("LibreOffice conversion timed out after 60 seconds.")
|
|
raise
|
|
except (subprocess.CalledProcessError, FileNotFoundError) as e:
|
|
log.error(f"LibreOffice conversion failed. Error: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
def combine_markdown_files(markdown_files: list, output_path: str) -> str:
|
|
"""
|
|
Combines multiple markdown files into a single file.
|
|
|
|
Args:
|
|
markdown_files: List of Path objects or strings pointing to markdown files
|
|
output_path: Path where the combined markdown file will be saved
|
|
|
|
Returns:
|
|
Path to the combined markdown file
|
|
"""
|
|
log.info(f"Combining {len(markdown_files)} markdown files into {output_path}")
|
|
|
|
with open(output_path, "w", encoding="utf-8") as out:
|
|
for md_file in markdown_files:
|
|
file_name = os.path.basename(md_file)
|
|
log.debug(f"Adding file: {file_name}")
|
|
out.write(f"\n\n# --- {file_name} ---\n\n")
|
|
with open(md_file, "r", encoding="utf-8") as f:
|
|
out.write(f.read())
|
|
out.write("\n\n")
|
|
|
|
log.info(f"Successfully combined files into: {output_path}")
|
|
return output_path
|
|
|
|
|
|
def convert_markdown_to_docx_with_pandoc(
|
|
input_file: str,
|
|
output_path: str,
|
|
template_path: str = None
|
|
) -> str:
|
|
"""
|
|
Converts markdown to DOCX using Pandoc with optional template.
|
|
This is a simpler conversion without placeholder replacement.
|
|
|
|
Args:
|
|
input_file: Path to the markdown file
|
|
output_path: Path where the DOCX will be saved
|
|
template_path: Optional path to a DOCX template (reference-doc)
|
|
|
|
Returns:
|
|
Path to the generated DOCX file
|
|
"""
|
|
log.info(f"Converting '{os.path.basename(input_file)}' to DOCX using Pandoc.")
|
|
|
|
if not os.path.exists(input_file):
|
|
raise FileNotFoundError(f"Input file not found: {input_file}")
|
|
|
|
cmd = ["pandoc", str(input_file), "-o", str(output_path)]
|
|
|
|
if template_path and os.path.exists(template_path):
|
|
log.info(f"Using template: {os.path.basename(template_path)}")
|
|
cmd.extend(["--reference-doc", str(template_path)])
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
log.info(f"DOCX successfully generated: {output_path}")
|
|
return output_path
|
|
except subprocess.CalledProcessError as e:
|
|
log.error(f"Pandoc conversion failed: {e.stderr}")
|
|
raise RuntimeError(f"Pandoc conversion failed: {e.stderr}")
|
|
|
|
|
|
def convert_markdown(
|
|
input_file: str,
|
|
output_path: str,
|
|
output_format: str,
|
|
add_toc: bool = False,
|
|
template_path: str = None,
|
|
metadata: dict = None,
|
|
):
|
|
if not os.path.exists(input_file):
|
|
raise FileNotFoundError(f"Input file not found: {input_file}")
|
|
log.info(
|
|
f"Starting conversion of '{os.path.basename(input_file)}' to {output_format}."
|
|
)
|
|
with open(input_file, "r", encoding="utf-8") as f:
|
|
markdown_text = f.read()
|
|
if output_format == "PDF":
|
|
_convert_to_pdf(markdown_text, output_path, add_toc)
|
|
elif output_format == "DOCX":
|
|
if metadata is None:
|
|
metadata = {}
|
|
_convert_to_docx(markdown_text, output_path, template_path, metadata, add_toc)
|
|
else:
|
|
raise ValueError(f"Unsupported output format: {output_format}")
|
|
return output_path
|