app.py(文件已创建)
| @@ -0,0 +1,322 @@ | |||
| 1 | + | #pdf ocr. 파싱 가능한지 확인해보고, 불가 or 내용이 너무 적으면 OCR수행 | |
| 2 | + | import io | |
| 3 | + | import cv2 | |
| 4 | + | import re | |
| 5 | + | import os | |
| 6 | + | import numpy as np | |
| 7 | + | from fastapi import FastAPI, File, UploadFile, HTTPException, Form | |
| 8 | + | from paddleocr import PaddleOCR | |
| 9 | + | from PIL import Image | |
| 10 | + | import statistics | |
| 11 | + | from enum import Enum | |
| 12 | + | import fitz # PyMuPDF | |
| 13 | + | from pdf2image import convert_from_bytes | |
| 14 | + | # from typing import List, Tuple, Any, Dict, Optional | |
| 15 | + | POPPLER_PATH = '/usr/bin' # Poppler 설치 경로 (필요시 수정) | |
| 16 | + | # --- Configuration --- | |
| 17 | + | ||
| 18 | + | # 지원할 언어 목록 정의 (필요에 따라 추가/수정) | |
| 19 | + | # PaddleOCR에서 지원하는 언어 코드를 사용해야 합니다. (예: 'en', 'ko', 'ch', 'japan', 'fr', 'de' 등) | |
| 20 | + | SUPPORTED_LANGUAGES = ["en", "ko"] # 예시: 영어, 한국어 지원 | |
| 21 | + | ||
| 22 | + | # 각 언어별 PaddleOCR 엔진을 저장할 딕셔너리 | |
| 23 | + | ocr_engines: dict[str, PaddleOCR] = {} | |
| 24 | + | ||
| 25 | + | # 애플리케이션 시작 시 OCR 엔진 초기화 | |
| 26 | + | print("Initializing PaddleOCR engines...") | |
| 27 | + | for lang_code in SUPPORTED_LANGUAGES: | |
| 28 | + | try: | |
| 29 | + | print(f" Initializing for language: {lang_code}") | |
| 30 | + | # CPU 버전을 사용하려면 use_gpu=False 설정 | |
| 31 | + | ocr_engines[lang_code] = PaddleOCR(use_angle_cls=True, lang=lang_code, use_gpu=False, show_log=False) | |
| 32 | + | print(f" Successfully initialized for {lang_code}") | |
| 33 | + | except Exception as e: | |
| 34 | + | # 특정 언어 초기화 실패 시 오류 로그 출력 (서버는 계속 실행) | |
| 35 | + | print(f" ERROR initializing PaddleOCR for language '{lang_code}': {e}") | |
| 36 | + | print(f" Language '{lang_code}' will not be available.") | |
| 37 | + | print("PaddleOCR engine initialization complete.") | |
| 38 | + | ||
| 39 | + | # 초기화된 엔진이 하나도 없으면 에러 발생 | |
| 40 | + | if not ocr_engines: | |
| 41 | + | raise RuntimeError("FATAL: No PaddleOCR engines could be initialized. The application cannot start.") | |
| 42 | + | ||
| 43 | + | # FastAPI 앱 생성 | |
| 44 | + | app = FastAPI() | |
| 45 | + | ||
| 46 | + | # --- Helper Functions --- | |
| 47 | + | ||
| 48 | + | def get_bounding_box_details(box_data): | |
| 49 | + | """OCR 결과에서 바운딩 박스 좌표와 텍스트/점수를 추출합니다.""" | |
| 50 | + | if not box_data or not isinstance(box_data, list) or len(box_data) != 2: | |
| 51 | + | return None, None, None, None, None, None | |
| 52 | + | ||
| 53 | + | points = box_data[0] | |
| 54 | + | text_info = box_data[1] | |
| 55 | + | ||
| 56 | + | # 텍스트와 점수 분리 (형식: ('Text', score)) | |
| 57 | + | if isinstance(text_info, tuple) and len(text_info) == 2: | |
| 58 | + | text, score = text_info | |
| 59 | + | else: | |
| 60 | + | # 예상치 못한 형식 처리 (예: 텍스트만 있는 경우) | |
| 61 | + | text = str(text_info) | |
| 62 | + | score = None # 점수 정보 없음 | |
| 63 | + | ||
| 64 | + | # 좌표 유효성 검사 및 추출 | |
| 65 | + | if not isinstance(points, list) or len(points) != 4: | |
| 66 | + | return None, text, score, None, None, None # 좌표 정보가 잘못된 경우 | |
| 67 | + | ||
| 68 | + | try: | |
| 69 | + | y_coords = [p[1] for p in points] | |
| 70 | + | min_y = min(y_coords) | |
| 71 | + | max_y = max(y_coords) | |
| 72 | + | ||
| 73 | + | x_coords = [p[0] for p in points] | |
| 74 | + | min_x = min(x_coords) | |
| 75 | + | except (TypeError, IndexError): | |
| 76 | + | # 좌표 내 값이 숫자가 아니거나 구조가 잘못된 경우 | |
| 77 | + | return None, text, score, None, None, None | |
| 78 | + | ||
| 79 | + | return points, text, score, min_y, max_y, min_x | |
| 80 | + | ||
| 81 | + | def process_ocr_results(ocr_result: list[list[any]], line_break_threshold_factor: float = 0.5) -> str: | |
| 82 | + | """ | |
| 83 | + | OCR 결과를 받아 지능적으로 줄바꿈을 적용한 텍스트를 생성합니다. | |
| 84 | + | (이 함수는 언어 설정과 직접적인 관련은 없습니다) | |
| 85 | + | """ | |
| 86 | + | if not ocr_result: | |
| 87 | + | return "" | |
| 88 | + | ||
| 89 | + | lines_data = [] | |
| 90 | + | box_heights = [] | |
| 91 | + | for item in ocr_result: | |
| 92 | + | points, text, score, min_y, max_y, min_x = get_bounding_box_details(item) | |
| 93 | + | # 유효한 데이터만 처리 (좌표와 텍스트가 모두 있어야 함) | |
| 94 | + | if text and min_y is not None and max_y is not None and min_x is not None: | |
| 95 | + | lines_data.append({ | |
| 96 | + | "text": text, | |
| 97 | + | "min_y": min_y, | |
| 98 | + | "max_y": max_y, | |
| 99 | + | "min_x": min_x, | |
| 100 | + | "box": points | |
| 101 | + | }) | |
| 102 | + | box_heights.append(max_y - min_y) | |
| 103 | + | ||
| 104 | + | if not lines_data: | |
| 105 | + | return "" | |
| 106 | + | ||
| 107 | + | lines_data.sort(key=lambda x: (x['min_y'], x['min_x'])) | |
| 108 | + | ||
| 109 | + | if not box_heights: | |
| 110 | + | avg_height = 10 | |
| 111 | + | else: | |
| 112 | + | # 높이가 0인 경우 제외 (매우 드물지만 오류 방지) | |
| 113 | + | valid_heights = [h for h in box_heights if h > 0] | |
| 114 | + | if not valid_heights: | |
| 115 | + | avg_height = 10 # 유효한 높이가 없으면 기본값 | |
| 116 | + | else: | |
| 117 | + | avg_height = statistics.mean(valid_heights) | |
| 118 | + | ||
| 119 | + | ||
| 120 | + | processed_text = "" | |
| 121 | + | for i, current_line in enumerate(lines_data): | |
| 122 | + | processed_text += current_line["text"] | |
| 123 | + | ||
| 124 | + | if i < len(lines_data) - 1: | |
| 125 | + | next_line = lines_data[i+1] | |
| 126 | + | vertical_gap = next_line["min_y"] - current_line["max_y"] | |
| 127 | + | ||
| 128 | + | # 평균 높이가 0보다 클 때만 비율 계산 (0으로 나누기 방지) | |
| 129 | + | if avg_height > 0 and vertical_gap > avg_height * line_break_threshold_factor: | |
| 130 | + | processed_text += "\\n" | |
| 131 | + | elif vertical_gap >= 0: # 간격이 0 이상이면 (겹치지 않으면) 공백 추가 | |
| 132 | + | processed_text += " " | |
| 133 | + | # else: 간격이 음수면 (겹치면) 아무것도 추가 안 함 (이미 붙어있다고 간주) | |
| 134 | + | ||
| 135 | + | return processed_text.strip() | |
| 136 | + | ||
| 137 | + | def perform_ocr_on_image_data(image_data: bytes, ocr_engine: PaddleOCR) -> tuple[list[list[any]], str]: | |
| 138 | + | """주어진 이미지 데이터(bytes)에 대해 OCR을 수행하고 결과를 반환""" | |
| 139 | + | try: | |
| 140 | + | img_pil = Image.open(io.BytesIO(image_data)) | |
| 141 | + | if img_pil.mode != 'RGB': | |
| 142 | + | img_pil = img_pil.convert('RGB') | |
| 143 | + | img_cv = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) | |
| 144 | + | ||
| 145 | + | result = ocr_engine.ocr(img_cv, cls=True) | |
| 146 | + | ocr_raw_result = result[0] if result and len(result) > 0 else [] | |
| 147 | + | processed_text = process_ocr_results(ocr_raw_result) | |
| 148 | + | return ocr_raw_result, processed_text | |
| 149 | + | except Exception as e: | |
| 150 | + | print(f"Error during OCR on image data: {e}") | |
| 151 | + | # 개별 이미지 OCR 실패 시 빈 결과 반환 또는 예외 처리 방식 결정 | |
| 152 | + | return [], f"[OCR Error: {e}]" | |
| 153 | + | ||
| 154 | + | ||
| 155 | + | # --- Constants for PDF Processing --- | |
| 156 | + | # 이 값들을 조정하여 민감도 변경 가능 | |
| 157 | + | MIN_MEANINGFUL_TEXT_LENGTH_PER_PAGE = 50 # 페이지당 최소 유의미 텍스트 길이 | |
| 158 | + | COMMON_WATERMARK_PATTERNS = [ | |
| 159 | + | r"CamScanner", | |
| 160 | + | r"Scanned with", | |
| 161 | + | # 필요시 다른 흔한 워터마크 패턴 추가 (정규식 사용 가능) | |
| 162 | + | ] | |
| 163 | + | # 워터마크 패턴을 하나의 정규식으로 컴파일 (대소문자 무시) | |
| 164 | + | watermark_regex = re.compile("|".join(COMMON_WATERMARK_PATTERNS), re.IGNORECASE) | |
| 165 | + | ||
| 166 | + | ||
| 167 | + | # --- API Endpoint --- | |
| 168 | + | ||
| 169 | + | # 지원하는 언어를 Enum으로 정의하여 FastAPI의 자동 유효성 검사 및 문서화 활용 | |
| 170 | + | class Language(str, Enum): | |
| 171 | + | en = "en" | |
| 172 | + | ko = "ko" | |
| 173 | + | # SUPPORTED_LANGUAGES에 정의된 다른 언어들도 필요시 추가 | |
| 174 | + | # 예: ch = "ch", ja = "japan" 등 | |
| 175 | + | ||
| 176 | + | @app.post("/ocr/") | |
| 177 | + | async def process_file( | |
| 178 | + | file: UploadFile = File(..., description="OCR을 수행할 이미지 또는 PDF 파일"), | |
| 179 | + | lang: Language = Form(Language.en, description=f"OCR 언어 선택 (지원: {', '.join(SUPPORTED_LANGUAGES)})") | |
| 180 | + | ): | |
| 181 | + | """ | |
| 182 | + | 이미지 또는 PDF 파일을 업로드 받아 OCR 또는 텍스트 추출을 수행합니다. | |
| 183 | + | PDF의 경우, 먼저 텍스트 추출을 시도하고, 추출된 텍스트가 충분하지 않거나 | |
| 184 | + | 워터마크로 판단되면 이미지로 변환하여 OCR을 진행합니다. | |
| 185 | + | """ | |
| 186 | + | # 1. 언어 엔진 선택 (이전과 동일) | |
| 187 | + | selected_lang = lang.value | |
| 188 | + | if selected_lang not in ocr_engines: | |
| 189 | + | raise HTTPException(status_code=400, detail=f"Language '{selected_lang}' not supported...") | |
| 190 | + | selected_ocr_engine = ocr_engines[selected_lang] | |
| 191 | + | ||
| 192 | + | # 2. 파일 내용 읽기 (이전과 동일) | |
| 193 | + | contents = await file.read() | |
| 194 | + | filename = file.filename | |
| 195 | + | content_type = file.content_type | |
| 196 | + | ||
| 197 | + | raw_ocr_results_agg = [] | |
| 198 | + | processed_texts_agg = [] | |
| 199 | + | processing_method = "unknown" | |
| 200 | + | extracted_text_from_pdf = None # PDF 텍스트 추출 결과를 저장할 변수 | |
| 201 | + | ||
| 202 | + | # 3. 파일 타입에 따른 처리 분기 | |
| 203 | + | if content_type.startswith("image/"): | |
| 204 | + | # ... (이미지 처리 로직은 이전과 동일) ... | |
| 205 | + | print(f"Processing image file: {filename}") | |
| 206 | + | processing_method = "ocr_image" | |
| 207 | + | try: | |
| 208 | + | raw_result, processed_text = perform_ocr_on_image_data(contents, selected_ocr_engine) | |
| 209 | + | raw_ocr_results_agg = raw_result | |
| 210 | + | processed_texts_agg.append(processed_text) | |
| 211 | + | except Exception as e: | |
| 212 | + | raise HTTPException(status_code=500, detail=f"Failed to perform OCR on image: {e}") | |
| 213 | + | ||
| 214 | + | elif content_type == "application/pdf": | |
| 215 | + | print(f"Processing PDF file: {filename}") | |
| 216 | + | force_ocr = False # OCR을 강제할지 여부 플래그 | |
| 217 | + | pdf_page_count = 0 | |
| 218 | + | ||
| 219 | + | # 3-1. PDF 텍스트 추출 시도 (PyMuPDF) | |
| 220 | + | try: | |
| 221 | + | print("Attempting text extraction from PDF...") | |
| 222 | + | with fitz.open(stream=contents, filetype="pdf") as doc: | |
| 223 | + | pdf_page_count = len(doc) | |
| 224 | + | if doc.needs_pass: | |
| 225 | + | print("PDF is password protected. Text extraction skipped, proceeding to OCR.") | |
| 226 | + | force_ocr = True # 비밀번호가 있으면 OCR 강제 | |
| 227 | + | else: | |
| 228 | + | all_text = "" | |
| 229 | + | for page_num in range(pdf_page_count): | |
| 230 | + | page = doc.load_page(page_num) | |
| 231 | + | page_text = page.get_text("text") | |
| 232 | + | all_text += page_text | |
| 233 | + | if page_num < pdf_page_count - 1: | |
| 234 | + | all_text += "\n--- Page Break ---\n" # 페이지 구분 추가 | |
| 235 | + | ||
| 236 | + | # 추출된 텍스트 유효성 검사 | |
| 237 | + | if all_text and not all_text.isspace(): | |
| 238 | + | print(f"Text extracted ({len(all_text)} chars). Validating content...") | |
| 239 | + | # 워터마크 제거 후 텍스트 확인 | |
| 240 | + | cleaned_text = watermark_regex.sub("", all_text).strip() | |
| 241 | + | # 페이지 구분자도 제거하고 길이 계산 | |
| 242 | + | cleaned_text_for_length = cleaned_text.replace("\n--- Page Break ---\n", "") | |
| 243 | + | ||
| 244 | + | # 1. 전체 텍스트 길이가 페이지 수 * 임계값보다 긴지 확인 | |
| 245 | + | # 2. 워터마크 제거 후에도 텍스트가 남아있는지 확인 | |
| 246 | + | min_total_length = MIN_MEANINGFUL_TEXT_LENGTH_PER_PAGE * pdf_page_count | |
| 247 | + | if len(cleaned_text_for_length) >= min_total_length and cleaned_text: | |
| 248 | + | print("Meaningful text found after validation.") | |
| 249 | + | extracted_text_from_pdf = all_text.strip() # 원본 추출 텍스트 저장 | |
| 250 | + | processing_method = "pdf_text_extraction" | |
| 251 | + | else: | |
| 252 | + | print(f"Extracted text seems insufficient or mostly watermarks (Cleaned length: {len(cleaned_text_for_length)}, Threshold: {min_total_length}). Forcing OCR.") | |
| 253 | + | force_ocr = True | |
| 254 | + | else: | |
| 255 | + | print("No text found via direct extraction. Proceeding to OCR.") | |
| 256 | + | force_ocr = True # 텍스트가 아예 없으면 OCR 강제 | |
| 257 | + | ||
| 258 | + | except Exception as e: | |
| 259 | + | print(f"Error during PDF text extraction: {e}. Proceeding to OCR.") | |
| 260 | + | force_ocr = True # 텍스트 추출 중 에러 발생 시 OCR 강제 | |
| 261 | + | ||
| 262 | + | # 3-2. OCR 강제 플래그가 True이면 -> 이미지 변환 및 OCR 수행 | |
| 263 | + | if force_ocr: | |
| 264 | + | # extracted_text_from_pdf가 None이거나 유효하지 않다고 판단된 경우 | |
| 265 | + | processing_method = "ocr_pdf_pages" | |
| 266 | + | print("Converting PDF pages to images for OCR...") | |
| 267 | + | try: | |
| 268 | + | images = convert_from_bytes(contents, dpi=200, poppler_path=POPPLER_PATH) | |
| 269 | + | print(f"Converted PDF to {len(images)} image(s). Performing OCR on each page...") | |
| 270 | + | ||
| 271 | + | for i, page_image in enumerate(images): | |
| 272 | + | print(f" Processing page {i+1}/{len(images)}...") | |
| 273 | + | img_byte_arr = io.BytesIO() | |
| 274 | + | page_image.save(img_byte_arr, format='PNG') | |
| 275 | + | img_byte_arr = img_byte_arr.getvalue() | |
| 276 | + | ||
| 277 | + | raw_result, processed_text = perform_ocr_on_image_data(img_byte_arr, selected_ocr_engine) | |
| 278 | + | raw_ocr_results_agg.extend(raw_result) | |
| 279 | + | processed_texts_agg.append(processed_text) | |
| 280 | + | ||
| 281 | + | print("Finished OCR on all PDF pages.") | |
| 282 | + | # OCR을 수행했으므로, 이전에 추출된 (부적절한) 텍스트는 사용하지 않음 | |
| 283 | + | extracted_text_from_pdf = None | |
| 284 | + | ||
| 285 | + | except Exception as e: | |
| 286 | + | print(f"Error during PDF to image conversion or OCR: {e}") | |
| 287 | + | error_detail = f"Failed processing PDF as image: {e}" | |
| 288 | + | if "poppler" in str(e).lower(): | |
| 289 | + | error_detail = f"Failed to convert PDF to images. Ensure 'poppler' is installed and in PATH, or configure POPPLER_PATH. Original error: {e}" | |
| 290 | + | # PDF 처리 실패 시, 텍스트 추출도 안됐거나 부적절했으므로 오류 반환 | |
| 291 | + | raise HTTPException(status_code=500, detail=error_detail) | |
| 292 | + | ||
| 293 | + | else: | |
| 294 | + | # ... (지원하지 않는 파일 타입 처리) ... | |
| 295 | + | raise HTTPException(status_code=400, detail=f"Unsupported file type: {content_type}...") | |
| 296 | + | ||
| 297 | + | # 4. 최종 결과 조합 | |
| 298 | + | final_processed_text = "" | |
| 299 | + | if processing_method == "pdf_text_extraction" and extracted_text_from_pdf is not None: | |
| 300 | + | # 텍스트 추출이 성공했고 유효하다고 판단된 경우 | |
| 301 | + | final_processed_text = extracted_text_from_pdf | |
| 302 | + | elif processing_method.startswith("ocr"): | |
| 303 | + | # OCR 결과 조합 (페이지별로 구분) | |
| 304 | + | final_processed_text = "\n--- Page Break ---\n".join(processed_texts_agg) | |
| 305 | + | ||
| 306 | + | # 5. 결과 반환 | |
| 307 | + | return { | |
| 308 | + | "filename": filename, | |
| 309 | + | "content_type": content_type, | |
| 310 | + | "language_used": selected_lang, | |
| 311 | + | "processing_method": processing_method, # 어떤 방식으로 처리했는지 정보 추가 | |
| 312 | + | "raw_ocr_result": raw_ocr_results_agg, # OCR 수행 시 결과 (텍스트 추출 시 빈 리스트) | |
| 313 | + | "processed_text": final_processed_text # 최종 텍스트 결과 | |
| 314 | + | } | |
| 315 | + | ||
| 316 | + | ||
| 317 | + | # --- Uvicorn 실행 (개발용) --- | |
| 318 | + | if __name__ == "__main__": | |
| 319 | + | import uvicorn | |
| 320 | + | uvicorn.run(app, host="0.0.0.0", port=8000) | |
| 321 | + | ||
| 322 | + | # 실행 명령어: python main.py | |
上一页
下一页