# markdownconverter/core/core.py import os import re import sys from collections import Counter import tempfile import subprocess from datetime import date from pathlib import Path import docx import pypandoc import pdfkit import markdown import fitz # PyMuPDF import hashlib from docx.enum.text import WD_BREAK from docx2pdf import convert as convert_word from ..utils.logger import get_logger log = get_logger(__name__) # --- Custom Exceptions --- class TemplatePlaceholderError(ValueError): pass class ConverterNotFoundError(Exception): pass # --- PDFKit Configuration --- try: config = pdfkit.configuration() log.info("pdfkit configured using wkhtmltopdf from system PATH.") except OSError: WKHTMLTOPDF_PATH = r"C:\Program Files\wkhtmltopdf\bin\wkhtmltopdf.exe" if os.path.exists(WKHTMLTOPDF_PATH): config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH) else: config = None log.warning("wkhtmltopdf not found. PDF conversion may fail.") def scan_template_for_placeholders(template_path: str) -> tuple[list[str], list[str]]: if not os.path.exists(template_path): raise FileNotFoundError(f"Template file not found: {template_path}") log.info(f"Scanning template '{os.path.basename(template_path)}' for placeholders.") structural_keys = {"%%REVISION_RECORD%%", "%%DOC_TOC%%", "%%DOC_CONTENT%%"} placeholder_pattern = re.compile(r"%%([A-Z0-9_]+)%%") found_placeholders = set() doc = docx.Document(template_path) def find_in_element(element): if hasattr(element, "paragraphs"): for p in element.paragraphs: for match in placeholder_pattern.finditer( "".join(run.text for run in p.runs) ): found_placeholders.add(match.group(0)) if hasattr(element, "tables"): for table in element.tables: for row in table.rows: for cell in row.cells: find_in_element(cell) find_in_element(doc) for section in doc.sections: find_in_element(section.header) find_in_element(section.footer) dynamic = sorted([p for p in found_placeholders if p not in structural_keys]) structural = sorted([p for p in found_placeholders if p in structural_keys]) log.info( f"Scan complete. Found {len(dynamic)} dynamic and {len(structural)} structural placeholders." ) return dynamic, structural def _get_document_title(markdown_text: str) -> str: match = re.search(r"^\s*#+\s+(.+)", markdown_text, re.MULTILINE) return match.group(1).strip() if match else "Untitled Document" def _split_markdown_by_revision_history( markdown_text: str, separator_heading="## Revision Record" ) -> tuple[str, str]: pattern = re.compile( f"({re.escape(separator_heading)}.*?)(?=\n#+)", re.DOTALL | re.S ) match = pattern.search(markdown_text) if not match: log.warning( f"'{separator_heading}' section not found. No revision history will be added." ) return "", markdown_text rev_history_md = match.group(0).strip() main_content_md = markdown_text.replace(rev_history_md, "", 1).strip() return rev_history_md, main_content_md def _replace_text_in_paragraph(paragraph, placeholders: dict[str, str]): full_text = "".join(run.text for run in paragraph.runs) if not any(key in full_text for key in placeholders): return for key, value in placeholders.items(): if key in full_text: full_text = full_text.replace(key, str(value)) style = paragraph.runs[0].style if paragraph.runs else None font = paragraph.runs[0].font if paragraph.runs else None for run in reversed(paragraph.runs): p = paragraph._p p.remove(run._r) new_run = paragraph.add_run(full_text) if style: new_run.style = style if font: new_run.font.name = font.name new_run.font.size = font.size new_run.font.bold = font.bold new_run.font.italic = font.italic new_run.font.underline = font.underline if font.color and font.color.rgb: new_run.font.color.rgb = font.color.rgb def _replace_text_in_element(element, placeholders: dict[str, str]): if hasattr(element, "paragraphs"): for p in element.paragraphs: _replace_text_in_paragraph(p, placeholders) if hasattr(element, "tables"): for table in element.tables: for row in table.rows: for cell in row.cells: _replace_text_in_element(cell, placeholders) def _replace_metadata_placeholders(doc: docx.Document, placeholders: dict[str, str]): log.info(f"Replacing metadata placeholders: {list(placeholders.keys())}") _replace_text_in_element(doc, placeholders) for section in doc.sections: _replace_text_in_element(section.header, placeholders) _replace_text_in_element(section.footer, placeholders) def _find_placeholder_paragraph(doc: docx.Document, placeholder: str): for p in doc.paragraphs: if placeholder in "".join(run.text for run in p.runs): return p return None def _insert_docx_at_paragraph(paragraph, source_docx_path: str): parent = paragraph._p.getparent() index = parent.index(paragraph._p) source_doc = docx.Document(source_docx_path) for element in source_doc.element.body: parent.insert(index, element) index += 1 parent.remove(paragraph._p) def _remove_paragraph(paragraph): if paragraph is None: return parent = paragraph._p.getparent() parent.remove(paragraph._p) def _add_revision_table(doc: docx.Document, rev_history_md: str): placeholder_p = _find_placeholder_paragraph(doc, "%%REVISION_RECORD%%") if not placeholder_p: log.warning("Revision record placeholder not found in template. Skipping.") return if not rev_history_md: log.info("No revision history content found. Removing placeholder.") _remove_paragraph(placeholder_p) return lines = [line.strip() for line in rev_history_md.strip().split("\n")] table_lines = [ line for line in lines if line.startswith("|") and not line.startswith("|:--") ] if not table_lines: log.warning( "Could not parse a markdown table from the revision history section." ) _remove_paragraph(placeholder_p) return table_data = [ [cell.strip() for cell in line.split("|")][1:-1] for line in table_lines ] if not table_data or len(table_data) < 1: log.warning("Revision history table is empty.") _remove_paragraph(placeholder_p) return log.info(f"Adding revision history table with {len(table_data)} rows.") table = doc.add_table(rows=1, cols=len(table_data[0])) table.style = "Table Grid" hdr_cells = table.rows[0].cells for i, header_text in enumerate(table_data[0]): hdr_cells[i].text = header_text for row_data in table_data[1:]: row_cells = table.add_row().cells for i, cell_text in enumerate(row_data): row_cells[i].text = cell_text parent = placeholder_p._p.getparent() parent.insert(parent.index(placeholder_p._p), table._tbl) _remove_paragraph(placeholder_p) def _convert_to_pdf(markdown_text: str, output_file: str, add_toc: bool): log.info("Starting PDF conversion using pdfkit.") if config is None: raise FileNotFoundError("wkhtmltopdf executable not found. Cannot create PDF.") title = _get_document_title(markdown_text) content_without_title = markdown_text match = re.search(r"^\s*#+\s+(.+)\n?", markdown_text, re.MULTILINE) if match: content_without_title = markdown_text[match.end() :] # Use nl2br extension to preserve line breaks (consistent with DOCX hard_line_breaks) md_converter = markdown.Markdown( extensions=["toc", "fenced_code", "tables", "nl2br"] ) html_body = md_converter.convert(content_without_title) toc_html = "" if add_toc and hasattr(md_converter, "toc") and md_converter.toc: log.info("Generating Table of Contents for PDF.") toc_html = f"

Table of Contents

{md_converter.toc}
" full_html = f'{title}

{title}

{toc_html}{html_body}' pdf_options = {"encoding": "UTF-8", "enable-local-file-access": None} pdfkit.from_string( full_html, output_file, configuration=config, options=pdf_options ) log.info(f"PDF successfully generated: {output_file}") def _convert_to_docx( markdown_text: str, output_file: str, template_path: str, metadata: dict, add_toc: bool, ): log.info("Starting DOCX conversion.") dynamic_placeholders, structural_placeholders = scan_template_for_placeholders( template_path ) required_structural = {"%%REVISION_RECORD%%", "%%DOC_TOC%%", "%%DOC_CONTENT%%"} if not required_structural.issubset(structural_placeholders): missing = required_structural - set(structural_placeholders) raise TemplatePlaceholderError( f"Template is missing required structural placeholders: {', '.join(missing)}" ) doc = docx.Document(template_path) if "%%DOC_PROJECT%%" in dynamic_placeholders and not metadata.get( "%%DOC_PROJECT%%" ): metadata["%%DOC_PROJECT%%"] = _get_document_title(markdown_text) if "%%DOC_DATE%%" in dynamic_placeholders: metadata["%%DOC_DATE%%"] = date.today().strftime("%d/%m/%Y") _replace_metadata_placeholders(doc, metadata) rev_history_md, main_content_md = _split_markdown_by_revision_history(markdown_text) _add_revision_table(doc, rev_history_md) temp_files = [] pandoc_format = "markdown+hard_line_breaks" try: if main_content_md: content_for_pandoc = main_content_md # Step 1: Remove the main H1 document title from the content to be processed. # It's used for metadata, not for the main body's numbering. match = re.search(r"^\s*#\s+(.+)\n?", content_for_pandoc, re.MULTILINE) if match: log.info("Removing main H1 title from content body.") content_for_pandoc = content_for_pandoc[match.end() :] # Step 2: Strip any existing manual numbering from headings (e.g., "## 1. Title") # to prevent double numbering when automatic numbering is applied. log.info("Stripping manual numbering from headings for auto-numbering.") content_for_pandoc = re.sub( r"^(\s*#+)\s+[0-9\.]+\s+", r"\1 ", content_for_pandoc, flags=re.MULTILINE, ) # Step 3: Configure Pandoc arguments for correct hierarchical numbering. pandoc_args = [ # Enable automatic section numbering (e.g., 1, 1.1, 1.1.1). "--number-sections", # Shift heading levels up by one. This maps: # ## (H2 in MD) -> Heading 1 in DOCX (numbered as 1, 2, ...) # ### (H3 in MD) -> Heading 2 in DOCX (numbered as 1.1, 1.2, ...) "--shift-heading-level-by=-1", # Keep text left-aligned. "--variable=justify:false", ] if add_toc: pandoc_args.append("--toc") log.info("Adding page break before Table of Contents.") toc_placeholder_p = _find_placeholder_paragraph(doc, "%%DOC_TOC%%") # Insert a page break before the TOC for better formatting. if toc_placeholder_p: toc_placeholder_p.insert_paragraph_before().add_run().add_break( WD_BREAK.PAGE ) with tempfile.NamedTemporaryFile( delete=False, suffix=".docx" ) as temp_file: pypandoc.convert_text( content_for_pandoc, "docx", format=pandoc_format, extra_args=pandoc_args, outputfile=temp_file.name, ) temp_files.append(temp_file.name) if toc_placeholder_p: _insert_docx_at_paragraph(toc_placeholder_p, temp_file.name) # The main content is now part of the generated TOC doc, so remove the placeholder. _remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_CONTENT%%")) else: # If no TOC, just insert the content at its placeholder. log.info("Adding page break before main content.") content_placeholder_p = _find_placeholder_paragraph( doc, "%%DOC_CONTENT%%" ) if content_placeholder_p: content_placeholder_p.insert_paragraph_before().add_run().add_break( WD_BREAK.PAGE ) with tempfile.NamedTemporaryFile( delete=False, suffix=".docx" ) as temp_file: # We don't add '--toc' to pandoc_args here. pypandoc.convert_text( content_for_pandoc, "docx", format=pandoc_format, extra_args=pandoc_args, outputfile=temp_file.name, ) temp_files.append(temp_file.name) if content_placeholder_p: _insert_docx_at_paragraph(content_placeholder_p, temp_file.name) # TOC placeholder is not used, so remove it. _remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_TOC%%")) else: # If there is no main content, remove both placeholders. _remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_TOC%%")) _remove_paragraph(_find_placeholder_paragraph(doc, "%%DOC_CONTENT%%")) doc.save(output_file) log.info(f"Document successfully created at {output_file}") finally: for temp_file in temp_files: if os.path.exists(temp_file): os.remove(temp_file) def convert_docx_to_pdf( input_docx_path: str, output_pdf_path: str, max_retries: int = 2 ) -> str: """ Convert DOCX to PDF using MS Word or LibreOffice with retry logic. Args: input_docx_path: Path to the input DOCX file output_pdf_path: Path where the PDF will be saved max_retries: Maximum number of retry attempts for LibreOffice (default: 2) Returns: Path to the generated PDF file """ if not os.path.exists(input_docx_path): raise FileNotFoundError(f"Input DOCX file not found: {input_docx_path}") try: log.info("Attempting DOCX to PDF conversion using MS Word.") convert_word(input_docx_path, output_pdf_path) log.info(f"Successfully converted using MS Word: {output_pdf_path}") return output_pdf_path except Exception as e: log.warning(f"MS Word conversion failed. It might not be installed. Error: {e}") log.info("Falling back to LibreOffice conversion.") libreoffice_path = r"C:\Program Files\LibreOffice\program\soffice.exe" if not os.path.exists(libreoffice_path): log.error("LibreOffice executable not found. Cannot convert DOCX to PDF.") raise ConverterNotFoundError( "Neither MS Word nor LibreOffice could be used. Please install one to use this feature." ) if sys.platform == "win32": try: log.debug( "Attempting to terminate existing LibreOffice processes on Windows." ) subprocess.run( ["taskkill", "/f", "/im", "soffice.exe"], check=False, capture_output=True, ) subprocess.run( ["taskkill", "/f", "/im", "soffice.bin"], check=False, capture_output=True, ) log.debug("Termination commands sent.") except Exception as kill_e: log.warning(f"Could not terminate existing LibreOffice processes: {kill_e}") output_dir = os.path.dirname(output_pdf_path) log.info(f"Attempting conversion using LibreOffice at: {libreoffice_path}") # Retry logic for LibreOffice (can fail on first attempt on Windows) import time last_error = None for attempt in range(1, max_retries + 1): try: if attempt > 1: log.info( f"Retry attempt {attempt}/{max_retries} for LibreOffice conversion..." ) time.sleep(2) # Brief pause between retries expected_lo_output = os.path.join( output_dir, os.path.splitext(os.path.basename(input_docx_path))[0] + ".pdf", ) command = [ libreoffice_path, "--headless", "--convert-to", "pdf", "--outdir", output_dir, input_docx_path, ] process = subprocess.run( command, check=True, capture_output=True, text=True, encoding="utf-8", errors="ignore", timeout=60, ) log.debug(f"LibreOffice stdout: {process.stdout}") log.debug(f"LibreOffice stderr: {process.stderr}") if ( os.path.exists(output_pdf_path) and expected_lo_output != output_pdf_path ): os.remove(output_pdf_path) if os.path.exists(expected_lo_output): if expected_lo_output != output_pdf_path: os.rename(expected_lo_output, output_pdf_path) else: raise FileNotFoundError( f"LibreOffice conversion process finished, but the output file was not found at the expected path: {expected_lo_output}" ) log.info(f"Successfully converted using LibreOffice: {output_pdf_path}") return output_pdf_path except subprocess.TimeoutExpired as e: last_error = e log.warning( f"LibreOffice conversion timed out (attempt {attempt}/{max_retries})" ) if attempt >= max_retries: log.error("LibreOffice conversion failed after all retry attempts.") raise except (subprocess.CalledProcessError, FileNotFoundError) as e: last_error = e log.warning( f"LibreOffice conversion failed on attempt {attempt}/{max_retries}: {e}" ) if attempt >= max_retries: log.error( f"LibreOffice conversion failed after all retry attempts. Last error: {e}", exc_info=True, ) raise def combine_markdown_files(markdown_files: list, output_path: str) -> str: """ Combines multiple markdown files into a single file. Args: markdown_files: List of Path objects or strings pointing to markdown files output_path: Path where the combined markdown file will be saved Returns: Path to the combined markdown file """ log.info(f"Combining {len(markdown_files)} markdown files into {output_path}") with open(output_path, "w", encoding="utf-8") as out: for md_file in markdown_files: file_name = os.path.basename(md_file) log.debug(f"Adding file: {file_name}") out.write(f"\n\n# --- {file_name} ---\n\n") with open(md_file, "r", encoding="utf-8") as f: out.write(f.read()) out.write("\n\n") log.info(f"Successfully combined files into: {output_path}") return output_path def convert_markdown_to_docx_with_pandoc( input_file: str, output_path: str, template_path: str = None, add_toc: bool = False, number_sections: bool = False, ) -> str: """ Converts markdown to DOCX using pypandoc with optional template. This is a simpler conversion without placeholder replacement. Args: input_file: Path to the markdown file output_path: Path where the DOCX will be saved template_path: Optional path to a DOCX template (reference-doc) add_toc: If True, adds a table of contents number_sections: If True, automatically numbers sections Returns: Path to the generated DOCX file """ log.info(f"Converting '{os.path.basename(input_file)}' to DOCX using pypandoc.") if not os.path.exists(input_file): raise FileNotFoundError(f"Input file not found: {input_file}") # Build pypandoc arguments extra_args = ["--variable=justify:false"] if template_path and os.path.exists(template_path): log.info(f"Using template: {os.path.basename(template_path)}") extra_args.extend(["--reference-doc", str(template_path)]) if add_toc: log.info("Adding table of contents") extra_args.append("--toc") if number_sections: log.info("Enabling automatic section numbering") extra_args.append("--number-sections") try: # Use pypandoc for more robust conversion pypandoc.convert_file( input_file, "docx", format="markdown+hard_line_breaks", outputfile=output_path, extra_args=extra_args, ) log.info(f"DOCX successfully generated: {output_path}") return output_path except Exception as e: log.error(f"Pandoc conversion failed: {e}") raise RuntimeError(f"Pandoc conversion failed: {str(e)}") def convert_markdown( input_file: str, output_path: str, output_format: str, add_toc: bool = False, template_path: str = None, metadata: dict = None, ): if not os.path.exists(input_file): raise FileNotFoundError(f"Input file not found: {input_file}") log.info( f"Starting conversion of '{os.path.basename(input_file)}' to {output_format}." ) with open(input_file, "r", encoding="utf-8") as f: markdown_text = f.read() if output_format == "PDF": _convert_to_pdf(markdown_text, output_path, add_toc) elif output_format == "DOCX": if metadata is None: metadata = {} _convert_to_docx(markdown_text, output_path, template_path, metadata, add_toc) else: raise ValueError(f"Unsupported output format: {output_format}") return output_path def convert_pdf_to_markdown( input_pdf_path: str, output_md_path: str, extract_images: bool = True, image_folder: str = None, progress_callback=None, page_limit: int = None, ) -> str: """ Convert PDF to Markdown using PyMuPDF with progress callback support and more robust image extraction and header/footer deduplication. progress_callback: optional callable(progress_percent:int, message:str) """ if not os.path.exists(input_pdf_path): raise FileNotFoundError(f"Input PDF file not found: {input_pdf_path}") log.info(f"Converting PDF '{os.path.basename(input_pdf_path)}' to Markdown.") # Setup image folder if needed if extract_images: if image_folder is None: output_path = Path(output_md_path) image_folder = output_path.parent / f"{output_path.stem}_images" image_folder = Path(image_folder) image_folder.mkdir(exist_ok=True) log.info(f"Images will be saved to: {image_folder}") markdown_content = [] image_counter = 0 failed_image_extractions = 0 failed_image_samples = set() total_image_refs = 0 # Track extracted images to avoid duplicates: map xref -> filename and hash -> filename _seen_xref_to_file = {} _seen_hash_to_file = {} _inserted_images = set() duplicates_skipped = 0 def _normalize_header_footer(s: str) -> str: # normalize by stripping whitespace, collapsing spaces, and removing page numbers/digits if not s: return "" ns = re.sub(r"\s+", " ", s.strip()) ns = re.sub(r"\d+", "", ns) return ns.strip() try: # Open PDF with PyMuPDF doc = fitz.open(input_pdf_path) num_pages = len(doc) num_pages_effective = ( min(num_pages, int(page_limit)) if page_limit and page_limit > 0 else num_pages ) log.info(f"PDF opened successfully. Total pages: {num_pages}") if progress_callback: try: progress_callback(0, f"Opened PDF, {num_pages} pages detected") except Exception: pass # First pass: detect common header/footer text across pages header_candidates = Counter() footer_candidates = Counter() for page_num in range(num_pages): page = doc[page_num] blocks = page.get_text("dict").get("blocks", []) page_h = page.rect.height top_zone = page_h * 0.12 bottom_zone = page_h * 0.88 for block in blocks: if block.get("type") != 0: continue bbox = block.get("bbox", [0, 0, 0, 0]) y0 = bbox[1] y1 = bbox[3] # assemble block text block_text = " ".join( span.get("text", "") for line in block.get("lines", []) for span in line.get("spans", []) ) norm = _normalize_header_footer(block_text) if not norm: continue if y0 <= top_zone: header_candidates[norm] += 1 if y1 >= bottom_zone: footer_candidates[norm] += 1 # choose header/footer if they appear on majority of pages header_text = None footer_text = None if header_candidates: most_common_header, count = header_candidates.most_common(1)[0] if count >= max(3, int(0.6 * num_pages)): header_text = most_common_header log.info( f"Detected common header to remove from pages: '{header_text[:80]}' (appears {count} times)" ) if footer_candidates: most_common_footer, countf = footer_candidates.most_common(1)[0] if countf >= max(3, int(0.6 * num_pages)): footer_text = most_common_footer log.info( f"Detected common footer to remove from pages: '{footer_text[:80]}' (appears {countf} times)" ) if progress_callback: try: progress_callback(2, "Detected common header/footer (if any)") except Exception: pass # If header/footer found, add them once at top if header_text: markdown_content.append(header_text + "\n\n") # Second pass: build markdown, skipping repeated header/footer blocks for page_num in range(num_pages): page = doc[page_num] log.debug(f"Processing page {page_num + 1}/{num_pages}") if progress_callback: try: pct = int((page_num / num_pages) * 100) progress_callback( pct, f"Processing page {page_num + 1}/{num_pages}" ) except Exception: pass # Add page separator for multi-page documents if page_num > 0: markdown_content.append("\n\n---\n\n") markdown_content.append(f"## Page {page_num + 1}\n\n") # Extract text blocks with formatting blocks = page.get_text("dict").get("blocks", []) for block in blocks: if block.get("type") == 0: # Text block # assemble block text for header/footer check block_text_full = " ".join( span.get("text", "") for line in block.get("lines", []) for span in line.get("spans", []) ) norm_block = _normalize_header_footer(block_text_full) if norm_block: if header_text and header_text == norm_block: # skip repeated header continue if footer_text and footer_text == norm_block: # skip repeated footer continue # Build a structured representation of lines and spans with x-coordinates block_x0 = block.get("bbox", [0, 0, 0, 0])[0] lines_info = [] all_x_positions = [] mono_line_flags = [] for line in block.get("lines", []): spans_info = [] for span in line.get("spans", []): text = span.get("text", "").strip() if not text: continue bbox = ( span.get("bbox") or span.get("origin") or [0, 0, 0, 0] ) x0 = bbox[0] font_size = span.get("size", 0) flags = span.get("flags", 0) font = span.get("font", "") spans_info.append( { "x0": x0, "text": text, "size": font_size, "flags": flags, "font": font, } ) all_x_positions.append(x0) # detect if this line looks monospaced (code) by font name mono_flag = any( "mono" in (s.get("font", "").lower()) or "courier" in (s.get("font", "").lower()) for s in spans_info ) mono_line_flags.append(mono_flag) lines_info.append(spans_info) # Simple heuristic: if there are multiple consistent x-columns, treat as a table table_md_lines = [] is_table = False if all_x_positions: # cluster x positions into columns with tolerance tol = 8.0 cols = [] for x in sorted(set(all_x_positions)): placed = False for i, c in enumerate(cols): if abs(x - c) <= tol: # update center cols[i] = (cols[i] + x) / 2.0 placed = True break if not placed: cols.append(x) cols = sorted(cols) # build a matrix of cells per line matrix = [] for spans in lines_info: if not spans: matrix.append([]) continue row = [""] * len(cols) for s in spans: # find nearest column idx = min( range(len(cols)), key=lambda i: abs(s["x0"] - cols[i]), ) if row[idx]: row[idx] += " " + s["text"] else: row[idx] = s["text"] matrix.append(row) # determine if matrix looks like a table: multiple columns and many rows have >1 non-empty cells if len(cols) >= 2: rows_with_multi = sum( 1 for r in matrix if sum(1 for c in r if c.strip()) > 1 ) if rows_with_multi >= max(2, len(matrix) // 3): is_table = True # convert matrix to markdown table for r_idx, row in enumerate(matrix): # join cells with pipe cleaned = [c.strip() for c in row] table_md_lines.append( "| " + " | ".join(cleaned) + " |" ) # after first row add header separator if r_idx == 0: sep = ( "| " + " | ".join( [ "---" if c.strip() else "" for c in cleaned ] ) + " |" ) table_md_lines.append(sep) if is_table: markdown_content.append("\n") markdown_content.extend([ln + "\n" for ln in table_md_lines]) markdown_content.append("\n") else: # Not a table: output lines respecting indentation and formatting in_code_block = False for li, spans in enumerate(lines_info): if not spans: if in_code_block: markdown_content.append("```") in_code_block = False markdown_content.append("\n") continue first_x = spans[0]["x0"] if spans else block_x0 indent_level = max( 0, int(round((first_x - block_x0) / 20.0)) ) # detect mono font sequences and wrap in code block mono = ( mono_line_flags[li] if li < len(mono_line_flags) else False ) if mono and not in_code_block: markdown_content.append("```\n") in_code_block = True line_text = "" for span in spans: text = span["text"] font_size = span.get("size", 0) flags = span.get("flags", 0) # headings detection if font_size > 18: text = f"# {text}" elif font_size > 14: text = f"## {text}" elif font_size > 12: text = f"### {text}" if flags & 16: text = f"**{text}**" if flags & 2: text = f"*{text}*" line_text += text + " " line_text = line_text.strip() if in_code_block: markdown_content.append(line_text + "\n") else: if indent_level > 0: markdown_content.append( " " * indent_level + line_text + "\n" ) else: markdown_content.append(line_text + "\n") if in_code_block: markdown_content.append("```\n") # add spacing after block markdown_content.append("\n") elif block.get("type") == 1 and extract_images: # Image block # Extract images for this page safely try: imgs = page.get_images(full=True) except Exception as e: # Can't list images on this page; count as a failed attempt and continue failed_image_extractions += 1 if len(failed_image_samples) < 3: failed_image_samples.add(f"list_images_error: {str(e)}") imgs = [] # Try to extract each referenced image, but avoid flooding the logs page_image_failures = 0 for img in imgs: total_image_refs += 1 try: # image tuple may vary, try to locate xref safely if not img: raise ValueError("empty image tuple") # prefer first element as xref but be defensive xref = None if isinstance(img, (list, tuple)) and len(img) > 0: xref = img[0] if xref is None: raise ValueError(f"unexpected image descriptor: {img}") # If we've already extracted this xref, reuse filename if xref in _seen_xref_to_file: image_filename = _seen_xref_to_file[xref] relative_path = f"{image_folder.name}/{image_filename}" # Only insert the image tag once; skip repeated inline images if image_filename in _inserted_images: duplicates_skipped += 1 else: markdown_content.append( f"\n![Image]({relative_path})\n\n" ) _inserted_images.add(image_filename) continue # Use a short timeout for extraction to avoid long hangs on malformed images try: from concurrent.futures import ( ThreadPoolExecutor, TimeoutError, ) def _extract(x): return doc.extract_image(x) with ThreadPoolExecutor(max_workers=1) as ex: fut = ex.submit(_extract, xref) base_image = fut.result(timeout=3) except Exception as tex: raise RuntimeError( f"image_extraction_timeout_or_error: {tex}" ) if not base_image or "image" not in base_image: raise ValueError(f"no image bytes for xref {xref}") image_bytes = base_image.get("image") image_ext = base_image.get("ext", "png") # Compute hash to detect identical image content img_hash = ( hashlib.sha256(image_bytes).hexdigest() if image_bytes else None ) # If we've already extracted an identical image (different xref), reuse it if img_hash and img_hash in _seen_hash_to_file: image_filename = _seen_hash_to_file[img_hash] # remember xref mapping for future _seen_xref_to_file[xref] = image_filename relative_path = f"{image_folder.name}/{image_filename}" if image_filename in _inserted_images: duplicates_skipped += 1 else: markdown_content.append( f"\n![Image]({relative_path})\n\n" ) _inserted_images.add(image_filename) else: image_counter += 1 image_filename = f"image_{image_counter}.{image_ext}" image_path = image_folder / image_filename with open(image_path, "wb") as img_file: img_file.write(image_bytes) # register mappings _seen_xref_to_file[xref] = image_filename if img_hash: _seen_hash_to_file[img_hash] = image_filename relative_path = f"{image_folder.name}/{image_filename}" markdown_content.append( f"\n![Image {image_counter}]({relative_path})\n\n" ) _inserted_images.add(image_filename) except Exception as ie: page_image_failures += 1 failed_image_extractions += 1 if len(failed_image_samples) < 3: failed_image_samples.add(str(ie)) if page_image_failures and progress_callback: try: progress_callback( int(((page_num + 1) / num_pages) * 100), f"{page_image_failures} image(s) failed on page {page_num + 1}", ) except Exception: pass # append footer once if detected if footer_text: markdown_content.append("\n\n" + footer_text + "\n") doc.close() # Write markdown file with open(output_md_path, "w", encoding="utf-8") as f: f.write("".join(markdown_content)) log.info(f"Markdown file successfully created: {output_md_path}") if extract_images: log.info(f"Extracted {image_counter} images") duplicates_reused = max( 0, total_image_refs - image_counter - failed_image_extractions ) log.info( f"Unique images saved: {image_counter}; duplicate references reused: {duplicates_reused}" ) if failed_image_extractions: sample_list = list(failed_image_samples) log.warning( f"Failed to extract {failed_image_extractions} images (sample errors: {sample_list})" ) if progress_callback: try: progress_callback(100, "Conversion finished") except Exception: pass return output_md_path except Exception as e: log.error(f"Failed to convert PDF to Markdown: {e}", exc_info=True) raise RuntimeError(f"PDF to Markdown conversion failed: {str(e)}")