목표: 개발 환경 구축 및 프로젝트 구조 생성
# Python 3.9+ 설치 확인
python --version
# Node.js 16+ 설치 확인
node --version
# Git 설치 확인
git --versionmkdir easycrawler
cd easycrawler
# 폴더 구조 생성
mkdir -p extension/{popup,content,background}
mkdir -p backend/{app/{api,core,models,services},crawler,scheduler,tests}
mkdir -p templates docs deploy
mkdir -p data/{templates,results,logs}파일: requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
beautifulsoup4==4.12.2
selenium==4.15.2
playwright==1.40.0
pandas==2.1.3
openpyxl==3.1.2
requests==2.31.0
pydantic==2.5.0
sqlalchemy==2.0.23
redis==5.0.1
schedule==1.2.0
python-multipart==0.0.6
jinja2==3.1.2파일: package.json (확장프로그램용)
{
"name": "easycrawler-extension",
"version": "1.0.0",
"description": "Easy Web Crawling Tool",
"scripts": {
"build": "webpack --mode=production",
"dev": "webpack --mode=development --watch"
},
"devDependencies": {
"webpack": "^5.89.0",
"webpack-cli": "^5.1.4"
}
}목표: URL의 robots.txt를 자동으로 확인하여 크롤링 허용 여부 판단
파일: backend/app/core/site_analyzer.py
import requests
from urllib.robotparser import RobotFileParser
from urllib.parse import urljoin, urlparse
from typing import Dict, Optional
import logging
class SiteAnalyzer:
def __init__(self):
self.logger = logging.getLogger(__name__)
def check_robots_txt(self, url: str, user_agent: str = "*") -> Dict:
"""
robots.txt 확인
Args:
url: 확인할 웹사이트 URL
user_agent: 사용할 User-Agent
Returns:
{
"allowed": bool,
"robots_url": str,
"crawl_delay": int,
"sitemap": List[str]
}
"""
try:
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
robots_url = urljoin(base_url, "/robots.txt")
# RobotFileParser 사용
rp = RobotFileParser()
rp.set_url(robots_url)
rp.read()
# 크롤링 허용 여부 확인
allowed = rp.can_fetch(user_agent, url)
# Crawl-delay 확인
crawl_delay = rp.crawl_delay(user_agent) or 1
# Sitemap 확인
sitemaps = list(rp.site_maps()) if hasattr(rp, 'site_maps') else []
return {
"allowed": allowed,
"robots_url": robots_url,
"crawl_delay": crawl_delay,
"sitemaps": sitemaps,
"status": "success"
}
except Exception as e:
self.logger.error(f"robots.txt 확인 실패: {str(e)}")
return {
"allowed": True, # 기본값으로 허용
"robots_url": None,
"crawl_delay": 1,
"sitemaps": [],
"status": "error",
"error": str(e)
}파일: backend/tests/test_site_analyzer.py
import pytest
from app.core.site_analyzer import SiteAnalyzer
from app.models.schemas import AnalyzeRequest, AnalyzeResponse
router = APIRouter()
logger = logging.getLogger(__name__)
class AnalyzeRequest(BaseModel):
url: HttpUrl
user_agent: str = "*"
class AnalyzeResponse(BaseModel):
success: bool
url: str
robots: Dict[str, Any]
javascript: Dict[str, Any]
difficulty: Dict[str, Any]
recommendations: list[str]
error: str = None
@router.post("/site", response_model=AnalyzeResponse)
async def analyze_site(request: AnalyzeRequest):
"""
웹사이트 크롤링 가능성 분석
"""
try:
analyzer = SiteAnalyzer()
url = str(request.url)
logger.info(f"사이트 분석 시작: {url}")
# robots.txt 확인
robots_result = analyzer.check_robots_txt(url, request.user_agent)
# JavaScript 렌더링 확인
js_result = analyzer.detect_javascript_rendering(url)
# 크롤링 난이도 계산
difficulty_result = analyzer.calculate_difficulty_score(url)
# 추천사항 생성
recommendations = generate_recommendations(robots_result, js_result, difficulty_result)
return AnalyzeResponse(
success=True,
url=url,
robots=robots_result,
javascript=js_result,
difficulty=difficulty_result,
recommendations=recommendations
)
except Exception as e:
logger.error(f"사이트 분석 실패: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"사이트 분석 중 오류가 발생했습니다: {str(e)}"
)
def generate_recommendations(robots_result, js_result, difficulty_result):
"""분석 결과를 바탕으로 추천사항 생성"""
recommendations = []
if not robots_result["allowed"]:
recommendations.append("⚠️ robots.txt에서 크롤링을 금지하고 있습니다. 사이트 이용약관을 확인하세요.")
if robots_result["crawl_delay"] > 3:
recommendations.append(f"⏱️ {robots_result['crawl_delay']}초 지연을 권장합니다.")
if js_result["needs_js"]:
recommendations.append("🔧 JavaScript 렌더링이 필요합니다. 동적 크롤링 모드를 사용하세요.")
else:
recommendations.append("✅ 정적 크롤링으로 충분합니다.")
if difficulty_result["score"] >= 4:
recommendations.append("❌ 크롤링이 매우 어려울 수 있습니다. 대안을 고려해보세요.")
elif difficulty_result["score"] >= 3:
recommendations.append("⚡ 고급 설정이 필요할 수 있습니다.")
else:
recommendations.append("✨ 쉽게 크롤링할 수 있습니다!")
return recommendations
@router.get("/test-connection")
async def test_connection():
"""API 연결 테스트"""
return {"status": "connected", "message": "API가 정상적으로 작동 중입니다"}목표: 실제 데이터 크롤링 수행 API
파일: backend/app/api/endpoints/crawl.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel, HttpUrl
from typing import List, Dict, Any, Optional
import logging
import uuid
from datetime import datetime
from app.services.crawler_service import CrawlerService
from app.services.excel_service import ExcelService
from app.models.schemas import CrawlRequest, CrawlResponse, SelectorData
router = APIRouter()
logger = logging.getLogger(__name__)
class SelectorData(BaseModel):
name: str
selector: str
tagName: str
text: str
attributes: Dict[str, str]
class CrawlRequest(BaseModel):
url: HttpUrl
selectors: List[SelectorData]
options: Optional[Dict[str, Any]] = {}
class CrawlResponse(BaseModel):
success: bool
job_id: str
data: List[Dict[str, Any]]
download_url: str
stats: Dict[str, Any]
error: str = None
@router.post("/execute", response_model=CrawlResponse)
async def execute_crawl(request: CrawlRequest, background_tasks: BackgroundTasks):
"""
크롤링 실행
"""
try:
job_id = str(uuid.uuid4())
url = str(request.url)
logger.info(f"크롤링 시작 - Job ID: {job_id}, URL: {url}")
# 크롤링 서비스 초기화
crawler_service = CrawlerService()
# 크롤링 실행
crawl_result = await crawler_service.crawl_page(
url=url,
selectors=request.selectors,
job_id=job_id,
options=request.options
)
if not crawl_result["success"]:
raise Exception(crawl_result["error"])
# Excel 파일 생성
excel_service = ExcelService()
excel_file_path = await excel_service.create_excel(
data=crawl_result["data"],
job_id=job_id,
url=url,
selectors=request.selectors
)
# 다운로드 URL 생성
download_url = f"/downloads/{excel_file_path.name}"
# 통계 정보
stats = {
"total_items": len(crawl_result["data"]),
"execution_time": crawl_result.get("execution_time", 0),
"success_rate": crawl_result.get("success_rate", 100),
"timestamp": datetime.now().isoformat()
}
logger.info(f"크롤링 완료 - Job ID: {job_id}, 수집 항목: {stats['total_items']}개")
return CrawlResponse(
success=True,
job_id=job_id,
data=crawl_result["data"],
download_url=download_url,
stats=stats
)
except Exception as e:
logger.error(f"크롤링 실패 - Job ID: {job_id}, Error: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"크롤링 중 오류가 발생했습니다: {str(e)}"
)
@router.post("/preview")
async def preview_crawl(request: CrawlRequest):
"""
크롤링 미리보기 (최대 5개 항목만)
"""
try:
url = str(request.url)
logger.info(f"크롤링 미리보기 - URL: {url}")
# 미리보기용 옵션 설정
preview_options = request.options.copy() if request.options else {}
preview_options["limit"] = 5
preview_options["preview_mode"] = True
crawler_service = CrawlerService()
result = await crawler_service.crawl_page(
url=url,
selectors=request.selectors,
job_id="preview",
options=preview_options
)
return {
"success": result["success"],
"data": result["data"][:5], # 최대 5개만
"total_found": len(result["data"]),
"selectors_found": result.get("selectors_found", {})
}
except Exception as e:
logger.error(f"미리보기 실패: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"미리보기 중 오류가 발생했습니다: {str(e)}"
)
@router.get("/job/{job_id}/status")
async def get_job_status(job_id: str):
"""
크롤링 작업 상태 확인
"""
# 나중에 데이터베이스 연동 시 구현
return {"job_id": job_id, "status": "completed", "message": "작업이 완료되었습니다"}목표: 실제 크롤링 로직 구현
파일: backend/app/services/crawler_service.py
import asyncio
import time
from typing import List, Dict, Any, Optional
from urllib.parse import urljoin, urlparse
import logging
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from app.core.config import settings
from app.models.schemas import SelectorData
logger = logging.getLogger(__name__)
class CrawlerService:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': settings.DEFAULT_USER_AGENT
})
async def crawl_page(
self,
url: str,
selectors: List[SelectorData],
job_id: str,
options: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
페이지 크롤링 실행
Args:
url: 크롤링할 URL
selectors: 추출할 요소들의 선택자 정보
job_id: 작업 ID
options: 크롤링 옵션
Returns:
크롤링 결과 딕셔너리
"""
start_time = time.time()
options = options or {}
try:
logger.info(f"크롤링 시작 - URL: {url}")
# JavaScript 렌더링 필요 여부 확인
needs_js = await self._needs_javascript_rendering(url)
if needs_js or options.get("force_selenium", False):
logger.info("Selenium을 사용한 동적 크롤링")
data = await self._crawl_with_selenium(url, selectors, options)
else:
logger.info("requests를 사용한 정적 크롤링")
data = await self._crawl_with_requests(url, selectors, options)
execution_time = time.time() - start_time
return {
"success": True,
"data": data,
"execution_time": execution_time,
"method": "selenium" if needs_js else "requests",
"total_items": len(data)
}
except Exception as e:
logger.error(f"크롤링 실패: {str(e)}")
return {
"success": False,
"error": str(e),
"data": []
}
async def _needs_javascript_rendering(self, url: str) -> bool:
"""JavaScript 렌더링 필요 여부 간단 체크"""
try:
response = self.session.get(url, timeout=10)
content = response.text.lower()
# 간단한 휴리스틱으로 SPA 감지
js_indicators = [
'react', 'vue', 'angular', 'spa',
'document.getelementbyid',
'ajax', 'fetch(',
'window.onload'
]
js_count = sum(1 for indicator in js_indicators if indicator in content)
return js_count >= 2
except:
return True # 에러 시 안전하게 Selenium 사용
async def _crawl_with_requests(
self,
url: str,
selectors: List[SelectorData],
options: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""requests와 BeautifulSoup을 사용한 정적 크롤링"""
response = self.session.get(url, timeout=settings.REQUEST_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# 페이지네이션 처리
all_data = []
current_url = url
page_count = 0
max_pages = options.get("max_pages", 1)
while current_url and page_count < max_pages:
page_count += 1
logger.info(f"크롤링 중 - 페이지 {page_count}: {current_url}")
if page_count > 1:
response = self.session.get(current_url, timeout=settings.REQUEST_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# 데이터 추출
page_data = self._extract_data_from_soup(soup, selectors)
all_data.extend(page_data)
# 다음 페이지 URL 찾기
current_url = self._find_next_page_url(soup, current_url, options)
# 미리보기 모드에서는 한 페이지만
if options.get("preview_mode", False):
break
# 제한 확인
if options.get("limit") and len(all_data) >= options["limit"]:
all_data = all_data[:options["limit"]]
break
# 크롤링 지연
await asyncio.sleep(settings.DEFAULT_CRAWL_DELAY)
return all_data
async def _crawl_with_selenium(
self,
url: str,
selectors: List[SelectorData],
options: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Selenium을 사용한 동적 크롤링"""
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--user-agent={settings.DEFAULT_USER_AGENT}")
driver = webdriver.Chrome(options=chrome_options)
try:
all_data = []
page_count = 0
max_pages = options.get("max_pages", 1)
driver.get(url)
# 페이지 로딩 대기
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
while page_count < max_pages:
page_count += 1
logger.info(f"Selenium 크롤링 중 - 페이지 {page_count}")
# JavaScript 실행 완료 대기
await asyncio.sleep(3)
# 데이터 추출
page_data = self._extract_data_from_selenium(driver, selectors)
all_data.extend(page_data)
# 미리보기 모드에서는 한 페이지만
if options.get("preview_mode", False):
break
# 다음 페이지 이동 시도
if not self._navigate_to_next_page(driver, options):
break
# 제한 확인
if options.get("limit") and len(all_data) >= options["limit"]:
all_data = all_data[:options["limit"]]
break
await asyncio.sleep(settings.DEFAULT_CRAWL_DELAY)
return all_data
finally:
driver.quit()
def _extract_data_from_soup(
self,
soup: BeautifulSoup,
selectors: List[SelectorData]
) -> List[Dict[str, Any]]:
"""BeautifulSoup에서 데이터 추출"""
# 첫 번째 선택자로 반복 요소들 찾기
first_selector = selectors[0]
elements = soup.select(first_selector.selector)
if not elements:
logger.warning(f"선택자로 요소를 찾을 수 없음: {first_selector.selector}")
return []
extracted_data = []
for i, element in enumerate(elements):
item_data = {}
for selector_data in selectors:
try:
# 상대적 선택자 처리
if selector_data == first_selector:
target_element = element
else:
# 현재 요소 내에서 찾기
target_element = element.select_one(
self._make_relative_selector(selector_data.selector)
)
if not target_element:
# 전체 문서에서 같은 인덱스의 요소 찾기
all_elements = soup.select(selector_data.selector)
target_element = all_elements[i] if i < len(all_elements) else None
if target_element:
# 데이터 추출
value = self._extract_element_value(target_element, selector_data)
item_data[selector_data.name] = value
else:
item_data[selector_data.name] = None
except Exception as e:
logger.warning(f"요소 추출 실패 - {selector_data.name}: {str(e)}")
item_data[selector_data.name] = None
extracted_data.append(item_data)
return extracted_data
def _extract_data_from_selenium(
self,
driver: webdriver.Chrome,
selectors: List[SelectorData]
) -> List[Dict[str, Any]]:
"""Selenium WebDriver에서 데이터 추출"""
first_selector = selectors[0]
try:
elements = driver.find_elements(By.CSS_SELECTOR, first_selector.selector)
except NoSuchElementException:
logger.warning(f"선택자로 요소를 찾을 수 없음: {first_selector.selector}")
return []
if not elements:
return []
extracted_data = []
for i, element in enumerate(elements):
item_data = {}
for selector_data in selectors:
try:
if selector_data == first_selector:
target_element = element
else:
try:
target_element = element.find_element(
By.CSS_SELECTOR,
self._make_relative_selector(selector_data.selector)
)
except NoSuchElementException:
# 전체 문서에서 같은 인덱스의 요소 찾기
all_elements = driver.find_elements(By.CSS_SELECTOR, selector_data.selector)
target_element = all_elements[i] if i < len(all_elements) else None
if target_element:
value = self._extract_selenium_element_value(target_element, selector_data)
item_data[selector_data.name] = value
else:
item_data[selector_data.name] = None
except Exception as e:
logger.warning(f"Selenium 요소 추출 실패 - {selector_data.name}: {str(e)}")
item_data[selector_data.name] = None
extracted_data.append(item_data)
return extracted_data
def _extract_element_value(self, element, selector_data: SelectorData) -> str:
"""BeautifulSoup 요소에서 값 추출"""
# 속성값 추출
if 'href' in selector_data.attributes or element.name == 'a':
href = element.get('href')
if href:
return href
if 'src' in selector_data.attributes or element.name == 'img':
src = element.get('src')
if src:
return src
# 텍스트 추출
text = element.get_text(strip=True)
return text if text else ""
def _extract_selenium_element_value(self, element, selector_data: SelectorData) -> str:
"""Selenium 요소에서 값 추출"""
tag_name = element.tag_name.lower()
# 속성값 추출
if tag_name == 'a' or 'href' in selector_data.attributes:
href = element.get_attribute('href')
if href:
return href
if tag_name == 'img' or 'src' in selector_data.attributes:
src = element.get_attribute('src')
if src:
return src
# 텍스트 추출
text = element.text.strip()
return text if text else ""
def _make_relative_selector(self, selector: str) -> str:
"""절대 선택자를 상대 선택자로 변환"""
if selector.startswith('#') or selector.startswith('.'):
return selector
# 간단한 변환 로직
if ' > ' in selector:
parts = selector.split(' > ')
return ' > '.join(parts[1:]) # 첫 번째 부분 제거
return selector
def _find_next_page_url(self, soup: BeautifulSoup, current_url: str, options: Dict) -> Optional[str]:
"""다음 페이지 URL 찾기"""
# 일반적인 다음 페이지 링크 선택자들
next_selectors = [
'a[rel="next"]',
'a.next',
'a.pagination-next',
'.pagination .next a',
'.pager .next a',
'a:contains("다음")',
'a:contains("Next")'
]
for selector in next_selectors:
try:
next_link = soup.select_one(selector)
if next_link and next_link.get('href'):
next_url = urljoin(current_url, next_link['href'])
return next_url
except:
continue
return None
def _navigate_to_next_page(self, driver: webdriver.Chrome, options: Dict) -> bool:
"""Selenium으로 다음 페이지 이동"""
next_selectors = [
'a[rel="next"]',
'a.next',
'a.pagination-next',
'.pagination .next a',
'.pager .next a'
]
for selector in next_selectors:
try:
next_button = driver.find_element(By.CSS_SELECTOR, selector)
if next_button.is_enabled():
driver.execute_script("arguments[0].click();", next_button)
# 페이지 로딩 대기
WebDriverWait(driver, 10).until(
EC.staleness_of(next_button)
)
return True
except:
continue
return False목표: 크롤링 결과를 Excel 파일로 변환
파일: backend/app/services/excel_service.py
import pandas as pd
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any
import logging
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils.dataframe import dataframe_to_rows
from app.core.config import settings
from app.models.schemas import SelectorData
logger = logging.getLogger(__name__)
class ExcelService:
def __init__(self):
self.results_dir = Path(settings.RESULTS_DIR)
self.results_dir.mkdir(parents=True, exist_ok=True)
async def create_excel(
self,
data: List[Dict[str, Any]],
job_id: str,
url: str,
selectors: List[SelectorData]
) -> Path:
"""
크롤링 데이터를 Excel 파일로 변환
Args:
data: 크롤링된 데이터
job_id: 작업 ID
url: 크롤링한 URL
selectors: 사용된 선택자들
Returns:
생성된 Excel 파일 경로
"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"crawl_result_{job_id}_{timestamp}.xlsx"
file_path = self.results_dir / filename
logger.info(f"Excel 파일 생성 시작: {filename}")
# 데이터가 없는 경우 빈 파일 생성
if not data:
await self._create_empty_excel(file_path, url, selectors)
return file_path
# DataFrame 생성
df = pd.DataFrame(data)
# Excel 파일 생성 (고급 포맷팅 포함)
await self._create_formatted_excel(df, file_path, url, selectors, job_id)
logger.info(f"Excel 파일 생성 완료: {filename}")
return file_path
except Exception as e:
logger.error(f"Excel 파일 생성 실패: {str(e)}")
raise
async def _create_formatted_excel(
self,
df: pd.DataFrame,
file_path: Path,
url: str,
selectors: List[SelectorData],
job_id: str
):
"""포맷팅이 적용된 Excel 파일 생성"""
wb = Workbook()
# 메인 데이터 시트
ws_data = wb.active
ws_data.title = "크롤링 결과"
# 헤더 스타일
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center")
# 테두리 스타일
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
# 메타 정보 추가
ws_data['A1'] = "크롤링 정보"
ws_data['A1'].font = Font(bold=True, size=14)
ws_data['A2'] = f"URL: {url}"
ws_data['A3'] = f"작업 ID: {job_id}"
ws_data['A4'] = f"수집 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
ws_data['A5'] = f"총 항목 수: {len(df)}"
# 데이터 시작 행
start_row = 7
# DataFrame을 시트에 추가
for r in dataframe_to_rows(df, index=False, header=True):
ws_data.append(r)
# 헤더 행 찾기 및 스타일 적용
header_row = start_row
for col_num, column_title in enumerate(df.columns, 1):
cell = ws_data.cell(row=header_row, column=col_num)
cell.value = column_title
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
# 데이터 행 스타일 적용
for row_num in range(header_row + 1, header_row + len(df) + 1):
for col_num in range(1, len(df.columns) + 1):
cell = ws_data.cell(row=row_num, column=col_num)
cell.border = thin_border
cell.alignment = Alignment(vertical="center")
# 컬럼 너비 자동 조정
for column in ws_data.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws_data.column_dimensions[column_letter].width = adjusted_width
# 선택자 정보 시트 생성
ws_selectors = wb.create_sheet("선택자 정보")
ws_selectors['A1'] = "사용된 선택자 정보"
ws_selectors['A1'].font = Font(bold=True, size=14)
# 선택자 테이블 헤더
selector_headers = ['이름', '선택자', '태그', '샘플 텍스트']
for col_num, header in enumerate(selector_headers, 1):
cell = ws_selectors.cell(row=3, column=col_num)
cell.value = header
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
# 선택자 데이터 추가
for row_num, selector in enumerate(selectors, 4):
ws_selectors.cell(row=row_num, column=1).value = selector.name
ws_selectors.cell(row=row_num, column=2).value = selector.selector
ws_selectors.cell(row=row_num, column=3).value = selector.tagName
ws_selectors.cell(row=row_num, column=4).value = selector.text[:50] + "..." if len(selector.text) > 50 else selector.text
# 테두리 적용
for col_num in range(1, 5):
ws_selectors.cell(row=row_num, column=col_num).border = thin_border
# 선택자 시트 컬럼 너비 조정
ws_selectors.column_dimensions['A'].width = 20
ws_selectors.column_dimensions['B'].width = 40
ws_selectors.column_dimensions['C'].width = 15
ws_selectors.column_dimensions['D'].width = 30
# 통계 시트 생성
ws_stats = wb.create_sheet("통계")
ws_stats['A1'] = "크롤링 통계"
ws_stats['A1'].font = Font(bold=True, size=14)
# 기본 통계 정보
stats_data = [
["총 수집 항목 수", len(df)],
["수집된 필드 수", len(df.columns)],
["빈 값 비율", f"{(df.isnull().sum().sum() / (len(df) * len(df.columns)) * 100):.1f}%"],
["수집 시간", datetime.now().strftime('%Y-%m-%d %H:%M:%S')]
]
for row_num, (label, value) in enumerate(stats_data, 3):
ws_stats.cell(row=row_num, column=1).value = label
ws_stats.cell(row=row_num, column=2).value = value
ws_stats.cell(row=row_num, column=1).font = Font(bold=True)
# 필드별 통계
ws_stats['A8'] = "필드별 통계"
ws_stats['A8'].font = Font(bold=True, size=12)
field_stats_headers = ['필드명', '비어있지 않은 값', '비어있는 값', '완성도']
for col_num, header in enumerate(field_stats_headers, 1):
cell = ws_stats.cell(row=9, column=col_num)
cell.value = header
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
for row_num, column in enumerate(df.columns, 10):
non_null_count = df[column].notna().sum()
null_count = df[column].isna().sum()
completion_rate = (non_null_count / len(df) * 100) if len(df) > 0 else 0
ws_stats.cell(row=row_num, column=1).value = column
ws_stats.cell(row=row_num, column=2).value = non_null_count
ws_stats.cell(row=row_num, column=3).value = null_count
ws_stats.cell(row=row_num, column=4).value = f"{completion_rate:.1f}%"
for col_num in range(1, 5):
ws_stats.cell(row=row_num, column=col_num).border = thin_border
# Excel 파일 저장
wb.save(file_path)
async def _create_empty_excel(self, file_path: Path, url: str, selectors: List[SelectorData]):
"""데이터가 없는 경우 빈 Excel 파일 생성"""
wb = Workbook()
ws = wb.active
ws.title = "크롤링 결과"
ws['A1'] = "크롤링 결과 없음"
ws['A1'].font = Font(bold=True, size=14, color="FF0000")
ws['A2'] = f"URL: {url}"
ws['A3'] = f"수집 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
ws['A4'] = "선택한 요소들에서 데이터를 찾을 수 없습니다."
# 선택자 정보 추가
ws['A6'] = "사용된 선택자:"
ws['A6'].font = Font(bold=True)
for i, selector in enumerate(selectors, 7):
ws[f'A{i}'] = f"- {selector.name}: {selector.selector}"
wb.save(file_path)
async def create_csv(
self,
data: List[Dict[str, Any]],
job_id: str,
url: str
) -> Path:
"""CSV 파일 생성 (간단한 형태)"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"crawl_result_{job_id}_{timestamp}.csv"
file_path = self.results_dir / filename
if not data:
# 빈 CSV 파일 생성
pd.DataFrame().to_csv(file_path, index=False, encoding='utf-8-sig')
else:
df = pd.DataFrame(data)
df.to_csv(file_path, index=False, encoding='utf-8-sig')
return file_path
except Exception as e:
logger.error(f"CSV 파일 생성 실패: {str(e)}")
raise목표: 크롤링 패턴을 템플릿으로 저장하고 재사용
파일: backend/app/models/schemas.py
from pydantic import BaseModel, HttpUrl
from typing import List, Dict, Any, Optional
from datetime import datetime
class SelectorData(BaseModel):
name: str
selector: str
tagName: str
text: str
attributes: Dict[str, str]
class CrawlTemplate(BaseModel):
id: Optional[str] = None
name: str
description: Optional[str] = None
url_pattern: str
selectors: List[SelectorData]
options: Optional[Dict[str, Any]] = {}
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
usage_count: Optional[int] = 0
class TemplateCreateRequest(BaseModel):
name: str
description: Optional[str] = None
url_pattern: str
selectors: List[SelectorData]
options: Optional[Dict[str, Any]] = {}
class TemplateUpdateRequest(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
url_pattern: Optional[str] = None
selectors: Optional[List[SelectorData]] = None
options: Optional[Dict[str, Any]] = None
class TemplateResponse(BaseModel):
success: bool
template: Optional[CrawlTemplate] = None
message: str
error: Optional[str] = None
class TemplateListResponse(BaseModel):
success: bool
templates: List[CrawlTemplate]
total: int
message: str파일: backend/app/api/endpoints/templates.py
from fastapi import APIRouter, HTTPException, Query
from typing import List, Optional
import uuid
import json
from datetime import datetime
from pathlib import Path
from app.models.schemas import (
CrawlTemplate,
TemplateCreateRequest,
TemplateUpdateRequest,
TemplateResponse,
TemplateListResponse
)
from app.services.template_service import TemplateService
router = APIRouter()
@router.post("/", response_model=TemplateResponse)
async def create_template(template_data: TemplateCreateRequest):
"""새 템플릿 생성"""
try:
template_service = TemplateService()
# 템플릿 객체 생성
template = CrawlTemplate(
id=str(uuid.uuid4()),
name=template_data.name,
description=template_data.description,
url_pattern=template_data.url_pattern,
selectors=template_data.selectors,
options=template_data.options,
created_at=datetime.now(),
updated_at=datetime.now(),
usage_count=0
)
# 템플릿 저장
saved_template = await template_service.save_template(template)
return TemplateResponse(
success=True,
template=saved_template,
message="템플릿이 성공적으로 생성되었습니다"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"템플릿 생성 실패: {str(e)}"
)
@router.get("/", response_model=TemplateListResponse)
async def list_templates(
page: int = Query(1, ge=1),
limit: int = Query(10, ge=1, le=100),
search: Optional[str] = Query(None),
url_pattern: Optional[str] = Query(None)
):
"""템플릿 목록 조회"""
try:
template_service = TemplateService()
templates, total = await template_service.list_templates(
page=page,
limit=limit,
search=search,
url_pattern=url_pattern
)
return TemplateListResponse(
success=True,
templates=templates,
total=total,
message=f"{total}개의 템플릿을 찾았습니다"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"템플릿 목록 조회 실패: {str(e)}"
)
@router.get("/{template_id}", response_model=TemplateResponse)
async def get_template(template_id: str):
"""특정 템플릿 조회"""
try:
template_service = TemplateService()
template = await template_service.get_template(template_id)
if not template:
raise HTTPException(
status_code=404,
detail="템플릿을 찾을 수 없습니다"
)
return TemplateResponse(
success=True,
template=template,
message="템플릿을 성공적으로 조회했습니다"
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"템플릿 조회 실패: {str(e)}"
)
@router.put("/{template_id}", response_model=TemplateResponse)
async def update_template(template_id: str, template_data: TemplateUpdateRequest):
"""템플릿 수정"""
try:
template_service = TemplateService()
updated_template = await template_service.update_template(
template_id,
template_data
)
if not updated_template:
raise HTTPException(
status_code=404,
detail="템플릿을 찾을 수 없습니다"
)
return TemplateResponse(
success=True,
template=updated_template,
message="템플릿이 성공적으로 수정되었습니다"
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"템플릿 수정 실패: {str(e)}"
)
@router.delete("/{template_id}")
async def delete_template(template_id: str):
"""템플릿 삭제"""
try:
template_service = TemplateService()
success = await template_service.delete_template(template_id)
if not success:
raise HTTPException(
status_code=404,
detail="템플릿을 찾을 수 없습니다"
)
return {
"success": True,
"message": "템플릿이 성공적으로 삭제되었습니다"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"템플릿 삭제 실패: {str(e)}"
)
@router.post("/{template_id}/execute")
async def execute_template(template_id: str, url: str):
"""템플릿을 사용하여 크롤링 실행"""
try:
template_service = TemplateService()
template = await template_service.get_template(template_id)
if not template:
raise HTTPException(
status_code=404,
detail="템플릿을 찾을 수 없습니다"
)
# 사용 횟수 증가
await template_service.increment_usage_count(template_id)
# 크롤링 실행 (크롤링 서비스 호출)
from app.services.crawler_service import CrawlerService
crawler_service = CrawlerService()
result = await crawler_service.crawl_page(
url=url,
selectors=template.selectors,
job_id=f"template_{template_id}_{uuid.uuid4()}",
options=template.options
)
return {
"success": result["success"],
"data": result["data"],
"template_used": template.name,
"message": f"템플릿 '{template.name}'을 사용하여 크롤링을 완료했습니다"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"템플릿 실행 실패: {str(e)}"
)
@router.get("/search/by-url")
async def find_templates_by_url(url: str):
"""URL에 맞는 템플릿 찾기"""
try:
template_service = TemplateService()
templates = await template_service.find_templates_by_url(url)
return {
"success": True,
"templates": templates,
"count": len(templates),
"message": f"{len(templates)}개의 적합한 템플릿을 찾았습니다"
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"템플릿 검색 실패: {str(e)}"
)
@router.post("/import")
async def import_template(template_json: dict):
"""템플릿 가져오기 (JSON 형태)"""
try:
template_service = TemplateService()
# JSON에서 템플릿 객체 생성
template = CrawlTemplate(**template_json)
template.id = str(uuid.uuid4()) # 새 ID 할당
template.created_at = datetime.now()
template.updated_at = datetime.now()
saved_template = await template_service.save_template(template)
return TemplateResponse(
success=True,
template=saved_template,
message="템플릿을 성공적으로 가져왔습니다"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"템플릿 가져오기 실패: {str(e)}"
)
@router.get("/{template_id}/export")
async def export_template(template_id: str):
"""템플릿 내보내기 (JSON 형태)"""
try:
template_service = TemplateService()
template = await template_service.get_template(template_id)
if not template:
raise HTTPException(
status_code=404,
detail="템플릿을 찾을 수 없습니다"
)
# 민감한 정보 제거
export_data = template.dict()
export_data.pop('id', None)
export_data.pop('usage_count', None)
return {
"success": True,
"template": export_data,
"message": "템플릿을 성공적으로 내보냈습니다"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"템플릿 내보내기 실패: {str(e)}"
)파일: backend/app/services/template_service.py
import json
import re
from pathlib import Path
from typing import List, Optional, Tuple
from datetime import datetime
from urllib.parse import urlparse
from app.models.schemas import CrawlTemplate, TemplateUpdateRequest
from app.core.config import settings
class TemplateService:
def __init__(self):
self.templates_dir = Path(settings.TEMPLATES_DIR)
self.templates_dir.mkdir(parents=True, exist_ok=True)
async def save_template(self, template: CrawlTemplate) -> CrawlTemplate:
"""템플릿 저장"""
# 파일 경로 생성
file_path = self.templates_dir / f"{template.id}.json"
# 템플릿 데이터 준비
template_data = template.dict()
template_data['created_at'] = template_data['created_at'].isoformat()
template_data['updated_at'] = template_data['updated_at'].isoformat()
# JSON 파일로 저장
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(template_data, f, ensure_ascii=False, indent=2)
return template
async def get_template(self, template_id: str) -> Optional[CrawlTemplate]:
"""특정 템플릿 조회"""
file_path = self.templates_dir / f"{template_id}.json"
if not file_path.exists():
return None
try:
with open(file_path, 'r', encoding='utf-8') as f:
template_data = json.load(f)
# 날짜 문자열을 datetime 객체로 변환
if 'created_at' in template_data:
template_data['created_at'] = datetime.fromisoformat(template_data['created_at'])
if 'updated_at' in template_data:
template_data['updated_at'] = datetime.fromisoformat(template_data['updated_at'])
return CrawlTemplate(**template_data)
except Exception as e:
print(f"템플릿 로드 실패: {e}")
return None
async def list_templates(
self,
page: int = 1,
limit: int = 10,
search: Optional[str] = None,
url_pattern: Optional[str] = None
) -> Tuple[List[CrawlTemplate], int]:
"""템플릿 목록 조회"""
templates = []
# 모든 템플릿 파일 로드
for file_path in self.templates_dir.glob("*.json"):
template = await self.get_template(file_path.stem)
if template:
templates.append(template)
# 필터링
filtered_templates = templates
if search:
filtered_templates = [
t for t in filtered_templates
if search.lower() in t.name.lower() or
(t.description and search.lower() in t.description.lower())
]
if url_pattern:
filtered_templates = [
t for t in filtered_templates
if url_pattern in t.url_pattern
]
# 정렬 (최근 생성 순)
filtered_templates.sort(key=lambda x: x.created_at, reverse=True)
# 페이지네이션
total = len(filtered_templates)
start_idx = (page - 1) * limit
end_idx = start_idx + limit
page_templates = filtered_templates[start_idx:end_idx]
return page_templates, total
async def update_template(
self,
template_id: str,
update_data: TemplateUpdateRequest
) -> Optional[CrawlTemplate]:
"""템플릿 수정"""
template = await self.get_template(template_id)
if not template:
return None
# 수정할 필드들 업데이트
update_dict = update_data.dict(exclude_unset=True)
for field, value in update_dict.items():
setattr(template, field, value)
template.updated_at = datetime.now()
# 저장
await self.save_template(template)
return template
async def delete_template(self, template_id: str) -> bool:
"""템플릿 삭제"""
file_path = self.templates_dir / f"{template_id}.json"
if not file_path.exists():
return False
try:
file_path.unlink()
return True
except Exception as e:
print(f"템플릿 삭제 실패: {e}")
return False
async def find_templates_by_url(self, url: str) -> List[CrawlTemplate]:
"""URL에 적합한 템플릿 찾기"""
templates, _ = await self.list_templates(limit=1000) # 모든 템플릿 가져오기
matching_templates = []
parsed_url = urlparse(url)
for template in templates:
if self._url_matches_pattern(url, template.url_pattern):
matching_templates.append(template)
# 매칭 점수로 정렬 (사용 횟수도 고려)
matching_templates.sort(
key=lambda t: (self._calculate_match_score(url, t.url_pattern), t.usage_count),
reverse=True
)
return matching_templates
def _url_matches_pattern(self, url: str, pattern: str) -> bool:
"""URL이 패턴과 매치되는지 확인"""
try:
# 간단한 패턴 매칭
parsed_url = urlparse(url)
parsed_pattern = urlparse(pattern)
# 도메인 확인
if parsed_url.netloc != parsed_pattern.netloc:
return False
# 경로 패턴 확인 (* 와일드카드 지원)
url_path = parsed_url.path
pattern_path = parsed_pattern.path
# 와일드카드를 정규표현식으로 변환
regex_pattern = pattern_path.replace('*', '.*')
regex_pattern = f"^{regex_pattern}$"
return bool(re.match(regex_pattern, url_path))
except Exception:
return False
def _calculate_match_score(self, url: str, pattern: str) -> float:
"""URL과 패턴의 매칭 점수 계산"""
try:
parsed_url = urlparse(url)
parsed_pattern = urlparse(pattern)
score = 0.0
# 도메인 매치 (기본 점수)
if parsed_url.netloc == parsed_pattern.netloc:
score += 1.0
# 경로 유사도
url_parts = parsed_url.path.split('/')
pattern_parts = parsed_pattern.path.split('/')
# 공통 경로 부분 계산
common_parts = 0
for i in range(min(len(url_parts), len(pattern_parts))):
if url_parts[i] == pattern_parts[i] or pattern_parts[i] == '*':
common_parts += 1
else:
break
if len(pattern_parts) > 0:
score += common_parts / len(pattern_parts)
return score
except Exception:
return 0.0
async def increment_usage_count(self, template_id: str):
"""템플릿 사용 횟수 증가"""
template = await self.get_template(template_id)
if template:
template.usage_count = (template.usage_count or 0) + 1
template.updated_at = datetime.now()
await self.save_template(template)
async def export_templates(self) -> List[dict]:
"""모든 템플릿 내보내기"""
templates, _ = await self.list_templates(limit=1000)
exported_templates = []
for template in templates:
template_data = template.dict()
# 민감한 정보 제거
template_data.pop('id', None)
template_data.pop('usage_count', None)
exported_templates.append(template_data)
return exported_templates
async def import_templates(self, templates_data: List[dict]) -> List[CrawlTemplate]:
"""템플릿 일괄 가져오기"""
imported_templates = []
for template_data in templates_data:
try:
# 새로운 ID 생성
template = CrawlTemplate(**template_data)
template.id = str(uuid.uuid4())
template.created_at = datetime.now()
template.updated_at = datetime.now()
template.usage_count = 0
saved_template = await self.save_template(template)
imported_templates.append(saved_template)
except Exception as e:
print(f"템플릿 가져오기 실패: {e}")
continue
return imported_templates목표: 쉬운 개발 환경 구축
파일: backend/run_dev.py
#!/usr/bin/env python3
"""
개발 서버 실행 스크립트
"""
import os
import sys
import subprocess
import uvicorn
from pathlib import Path
def setup_environment():
"""개발 환경 설정"""
# 필요한 디렉토리 생성
dirs_to_create = [
"data/templates",
"data/results",
"data/logs"
]
for dir_path in dirs_to_create:
Path(dir_path).mkdir(parents=True, exist_ok=True)
print(f"✓ 디렉토리 생성: {dir_path}")
# 환경 변수 설정
os.environ.setdefault("DEBUG", "True")
os.environ.setdefault("DATABASE_URL", "sqlite:///./easycrawler.db")
print("✓ 환경 설정 완료")
def check_dependencies():
"""의존성 패키지 확인"""
required_packages = [
"fastapi", "uvicorn", "beautifulsoup4",
"selenium", "pandas", "openpyxl"
]
missing_packages = []
for package in required_packages:
try:
__import__(package)
print(f"✓ {package}")
except ImportError:
missing_packages.append(package)
print(f"✗ {package} (누락)")
if missing_packages:
print(f"\n누락된 패키지가 있습니다. 다음 명령어로 설치하세요:")
print(f"pip install {' '.join(missing_packages)}")
return False
return True
def start_server():
"""개발 서버 시작"""
print("\n🚀 EasyCrawler 개발 서버를 시작합니다...")
print("📍 API 문서: http://localhost:8000/docs")
print("🛑 종료하려면 Ctrl+C를 누르세요\n")
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
if __name__ == "__main__":
print("🕷️ EasyCrawler 개발 환경 설정")
print("=" * 50)
setup_environment()
if not check_dependencies():
sys.exit(1)
start_server()파일: Dockerfile
FROM python:3.9-slim
# 시스템 패키지 설치
RUN apt-get update && apt-get install -y \
wget \
gnupg \
unzip \
&& rm -rf /var/lib/apt/lists/*
# Chrome 설치 (Selenium용)
RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \
&& echo "deb http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \
&& apt-get update \
&& apt-get install -y google-chrome-stable \
&& rm -rf /var/lib/apt/lists/*
# ChromeDriver 설치
RUN wget -O /tmp/chromedriver.zip https://chromedriver.storage.googleapis.com/91.0.4472.101/chromedriver_linux64.zip \
&& unzip /tmp/chromedriver.zip -d /usr/local/bin/ \
&& rm /tmp/chromedriver.zip \
&& chmod +x /usr/local/bin/chromedriver
# 작업 디렉토리 설정
WORKDIR /app
# Python 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 애플리케이션 코드 복사
COPY . .
# 데이터 디렉토리 생성
RUN mkdir -p data/templates data/results data/logs
# 포트 노출
EXPOSE 8000
# 서버 시작
CMD ["python", "run_dev.py"]파일: docker-compose.yml
version: '3.8'
services:
easycrawler-api:
build: .
ports:
- "8000:8000"
volumes:
- ./data:/app/data
- ./logs:/app/logs
environment:
- DEBUG=True
- DATABASE_URL=sqlite:///./data/easycrawler.db
restart: unless-stopped
# 향후 Redis 추가용
# redis:
# image: redis:alpine
# ports:
# - "6379:6379"
# restart: unless-stopped목표: 주요 기능 테스트 케이스 작성
파일: backend/tests/test_integration.py
import pytest
import asyncio
from fastapi.testclient import TestClient
from pathlib import Path
import json
from main import app
from app.services.template_service import TemplateService
from app.models.schemas import SelectorData, CrawlTemplate
client = TestClient(app)
class TestIntegration:
"""통합 테스트"""
def test_health_check(self):
"""헬스 체크 테스트"""
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_site_analysis(self):
"""사이트 분석 테스트"""
test_data = {
"url": "https://quotes.toscrape.com",
"user_agent": "*"
}
response = client.post("/api/v1/analyze/site", json=test_data)
assert response.status_code == 200
result = response.json()
assert result["success"] is True
assert "robots" in result
assert "javascript" in result
assert "difficulty" in result
def test_crawl_preview(self):
"""크롤링 미리보기 테스트"""
test_selectors = [
{
"name": "quote_text",
"selector": ".quote .text",
"tagName": "span",
"text": "Sample quote text",
"attributes": {}
},
{
"name": "author",
"selector": ".quote .author",
"tagName": "small",
"text": "Sample author",
"attributes": {}
}
]
test_data = {
"url": "https://quotes.toscrape.com",
"selectors": test_selectors,
"options": {"preview_mode": True}
}
response = client.post("/api/v1/crawl/preview", json=test_data)
assert response.status_code == 200
result = response.json()
assert result["success"] is True
assert "data" in result
assert len(result["data"]) <= 5 # 미리보기는 최대 5개
def test_template_crud(self):
"""템플릿 CRUD 테스트"""
# 1. 템플릿 생성
template_data = {
"name": "테스트 템플릿",
"description": "통합 테스트용 템플릿",
"url_pattern": "https://example.com/*",
"selectors": [
{
"name": "title",
"selector": "h1",
"tagName": "h1",
"text": "Sample title",
"attributes": {}
}
],
"options": {}
}
response = client.post("/api/v1/templates/", json=template_data)
assert response.status_code == 200
created_template = response.json()
assert created_template["success"] is True
template_id = created_template["template"]["id"]
# 2. 템플릿 조회
response = client.get(f"/api/v1/templates/{template_id}")
assert response.status_code == 200
retrieved_template = response.json()
assert retrieved_template["success"] is True
assert retrieved_template["template"]["name"] == "테스트 템플릿"
# 3. 템플릿 수정
update_data = {
"name": "수정된 테스트 템플릿",
"description": "수정된 설명"
}
response = client.put(f"/api/v1/templates/{template_id}", json=update_data)
assert response.status_code == 200
updated_template = response.json()
assert updated_template["success"] is True
assert updated_template["template"]["name"] == "수정된 테스트 템플릿"
# 4. 템플릿 목록 조회
response = client.get("/api/v1/templates/")
assert response.status_code == 200
templates_list = response.json()
assert templates_list["success"] is True
assert templates_list["total"] >= 1
# 5. 템플릿 삭제
response = client.delete(f"/api/v1/templates/{template_id}")
assert response.status_code == 200
# 6. 삭제 확인
response = client.get(f"/api/v1/templates/{template_id}")
assert response.status_code == 404
class TestCrawlerService:
"""크롤링 서비스 테스트"""
@pytest.mark.asyncio
async def test_requests_crawling(self):
"""requests 기반 크롤링 테스트"""
from app.services.crawler_service import CrawlerService
selectors = [
SelectorData(
name="quote",
selector=".quote .text",
tagName="span",
text="",
attributes={}
)
]
crawler = CrawlerService()
result = await crawler.crawl_page(
url="https://quotes.toscrape.com",
selectors=selectors,
job_id="test_job"
)
assert result["success"] is True
assert len(result["data"]) > 0
assert "quote" in result["data"][0]
class TestTemplateService:
"""템플릿 서비스 테스트"""
@pytest.mark.asyncio
async def test_template_operations(self):
"""템플릿 기본 연산 테스트"""
service = TemplateService()
# 템플릿 생성
template = CrawlTemplate(
id="test_template_001",
name="테스트 템플릿",
url_pattern="https://test.com/*",
selectors=[
SelectorData(
name="title",
selector="h1",
tagName="h1",
text="Test Title",
attributes={}
)
]
)
# 저장
saved_template = await service.save_template(template)
assert saved_template.id == "test_template_001"
# 조회
retrieved_template = await service.get_template("test_template_001")
assert retrieved_template is not None
assert retrieved_template.name == "테스트 템플릿"
# 삭제
deleted = await service.delete_template("test_template_001")
assert deleted is True
# 삭제 확인
not_found = await service.get_template("test_template_001")
assert not_found is None
@pytest.mark.asyncio
async def test_url_pattern_matching(self):
"""URL 패턴 매칭 테스트"""
service = TemplateService()
# 테스트용 템플릿 생성
templates = [
CrawlTemplate(
id="template_1",
name="쇼핑몰 템플릿",
url_pattern="https://shop.example.com/products/*",
selectors=[]
),
CrawlTemplate(
id="template_2",
name="블로그 템플릿",
url_pattern="https://blog.example.com/posts/*",
selectors=[]
)
]
# 템플릿 저장
for template in templates:
await service.save_template(template)
# URL 매칭 테스트
matching_templates = await service.find_templates_by_url(
"https://shop.example.com/products/123"
)
assert len(matching_templates) >= 1
assert matching_templates[0].name == "쇼핑몰 템플릿"
# 정리
for template in templates:
await service.delete_template(template.id)
class TestExcelService:
"""Excel 서비스 테스트"""
@pytest.mark.asyncio
async def test_excel_creation(self):
"""Excel 파일 생성 테스트"""
from app.services.excel_service import ExcelService
# 테스트 데이터
test_data = [
{"title": "제목 1", "price": "10000원", "rating": "4.5"},
{"title": "제목 2", "price": "20000원", "rating": "4.2"},
{"title": "제목 3", "price": "15000원", "rating": "4.8"}
]
selectors = [
SelectorData(
name="title",
selector=".title",
tagName="h2",
text="",
attributes={}
)
]
excel_service = ExcelService()
file_path = await excel_service.create_excel(
data=test_data,
job_id="test_excel",
url="https://test.com",
selectors=selectors
)
# 파일 생성 확인
assert file_path.exists()
assert file_path.suffix == ".xlsx"
# 정리
if file_path.exists():
file_path.unlink()
# 테스트 실행 유틸리티
def run_tests():
"""모든 테스트 실행"""
print("🧪 EasyCrawler 테스트 실행")
print("=" * 50)
# pytest 실행
exit_code = pytest.main([
"tests/",
"-v",
"--tb=short",
"--color=yes"
])
if exit_code == 0:
print("\n✅ 모든 테스트가 성공했습니다!")
else:
print("\n❌ 일부 테스트가 실패했습니다.")
return exit_code
if __name__ == "__main__":
run_tests()목표: 사용자를 위한 가이드 문서
파일: docs/user_guide.md
# EasyCrawler 사용자 가이드
## 🚀 시작하기
### 1. Chrome 확장프로그램 설치
1. Chrome 웹스토어에서 "EasyCrawler" 검색
2. "Chrome에 추가" 클릭
3. 확장프로그램 아이콘이 브라우저 상단에 나타남
### 2. 첫 번째 크롤링
1. 크롤링하고 싶은 웹페이지 방문
2. EasyCrawler 아이콘 클릭
3. "📍 요소 선택 모드" 버튼 클릭
4. 수집하고 싶은 데이터 요소들을 클릭하여 선택
5. "▶️ 크롤링 실행" 버튼 클릭
6. 결과를 Excel 파일로 다운로드
## 📋 주요 기능
### 사이트 분석
- **자동 분석**: 페이지 접속 시 크롤링 가능성 자동 판단
- **난이도 표시**: ★☆☆ (쉬움) ~ ★★★★★ (어려움)
- **추천사항**: 크롤링 방법 및 주의사항 제공
### 요소 선택
- **시각적 선택**: 마우스로 직접 클릭하여 요소 선택
- **실시간 미리보기**: 선택한 요소의 정보 즉시 확인
- **패턴 인식**: 유사한 구조의 요소들 자동 감지
### 템플릿 관리
- **저장**: 자주 사용하는 패턴을 템플릿으로 저장
- **재사용**: 저장된 템플릿으로 빠른 크롤링
- **공유**: 템플릿을 JSON 파일로 내보내기/가져오기
### 결과 출력
- **Excel 파일**: 즉시 다운로드 가능한 .xlsx 형태
- **CSV 파일**: 간단한 데이터 형태
- **통계 정보**: 수집 결과 요약
## 💡 사용 팁
### 효과적인 요소 선택
1. **대표 요소 먼저**: 가장 중요한 데이터부터 선택
2. **계층 구조 고려**: 상위 요소부터 하위 요소 순으로
3. **텍스트가 명확한 요소**: 빈 값이 적은 요소 우선 선택
### 템플릿 활용예시: 쇼핑몰 상품 정보
### 자주 발생하는 문제 해결
#### 1. 선택한 요소에서 데이터가 나오지 않음
- **원인**: 잘못된 선택자 또는 동적 로딩
- **해결**: 다른 요소 선택 시도 또는 페이지 새로고침 후 재시도
#### 2. 크롤링 속도가 느림
- **원인**: JavaScript 렌더링 필요 또는 서버 응답 지연
- **해결**: 미리보기로 테스트 후 실행
#### 3. 일부 페이지에서 접근 불가
- **원인**: robots.txt 제한 또는 로그인 필요
- **해결**: 사이트 이용약관 확인 또는 다른 접근 방법 시도
## 🔒 법적 주의사항
### 허용되는 사용
- 공개된 정보의 개인적 연구 목적
- 공정 이용 범위 내의 데이터 수집
- 사이트 이용약관에 위배되지 않는 범위
### 주의할 점
- 개인정보 수집 금지
- 상업적 재사용 시 저작권 확인 필요
- 서버에 과부하를 주지 않도록 적절한 간격 유지
## 🆘 문제 해결
### 자주 묻는 질문
**Q: 로그인이 필요한 사이트도 크롤링할 수 있나요?**
A: 현재 버전에서는 로그인이 필요한 사이트는 지원하지 않습니다.
**Q: 여러 페이지를 한 번에 크롤링할 수 있나요?**
A: 페이지네이션이 있는 사이트는 자동으로 다음 페이지를 찾아 크롤링합니다.
**Q: 크롤링 결과를 다른 형태로 받을 수 있나요?**
A: 현재는 Excel과 CSV를 지원하며, 향후 JSON 등 추가 형태를 지원할 예정입니다.
### 지원 및 문의
- 이메일: support@easycrawler.com
- GitHub: https://github.com/easycrawler/easycrawler
- 문서: https://docs.easycrawler.com위의 기획서와 작업 지침서를 바탕으로 Claude Code에게 다음과 같이 의뢰하세요:
"EasyCrawler 프로젝트의 기본 폴더 구조를 생성하고,
requirements.txt와 기본 설정 파일들을 만들어줘""FastAPI 기반의 백엔드 서버를 구현해줘.
사이트 분석, 크롤링 실행, 템플릿 관리 API가 필요해""Chrome Extension의 manifest.json부터 시작해서
팝업 UI와 Content Script를 구현해줘""BeautifulSoup과 Selenium을 사용한 크롤링 서비스를 구현하고,
Excel 출력 기능도 추가해줘""전체 시스템이 연동되도록 통합하고,
기본적인 테스트 케이스를 작성해줘"각 단계별로 완성된 후 다음 단계로 진행하시면 됩니다! .site_analyzer import SiteAnalyzer
def test_robots_checker(): analyzer = SiteAnalyzer()
# 테스트 URL들
test_urls = [
"https://example.com",
"https://httpbin.org",
"https://quotes.toscrape.com"
]
for url in test_urls:
result = analyzer.check_robots_txt(url)
assert "allowed" in result
assert "crawl_delay" in result
assert isinstance(result["allowed"], bool)
assert isinstance(result["crawl_delay"], (int, float))
### Task 1.2: JavaScript 렌더링 감지
**목표**: 페이지가 JavaScript로 동적 생성되는지 감지
#### 1.2.1 정적/동적 콘텐츠 비교
**파일**: `backend/app/core/site_analyzer.py` (추가)
```python
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import time
class SiteAnalyzer:
# ... 기존 코드 ...
def detect_javascript_rendering(self, url: str) -> Dict:
"""
JavaScript 렌더링 필요 여부 감지
Returns:
{
"needs_js": bool,
"static_content_length": int,
"dynamic_content_length": int,
"difference_ratio": float
}
"""
try:
# 1. 정적 콘텐츠 가져오기
response = requests.get(url, timeout=10)
static_content = response.text
static_length = len(static_content.strip())
# 2. JavaScript 렌더링된 콘텐츠 가져오기
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=chrome_options)
driver.get(url)
time.sleep(3) # JavaScript 실행 대기
dynamic_content = driver.page_source
dynamic_length = len(dynamic_content.strip())
driver.quit()
# 3. 차이 비율 계산
if static_length == 0:
difference_ratio = 1.0
else:
difference_ratio = abs(dynamic_length - static_length) / static_length
needs_js = difference_ratio > 0.3 # 30% 이상 차이나면 JS 필요
return {
"needs_js": needs_js,
"static_content_length": static_length,
"dynamic_content_length": dynamic_length,
"difference_ratio": difference_ratio,
"recommendation": "selenium" if needs_js else "requests"
}
except Exception as e:
return {
"needs_js": True, # 에러 시 안전하게 JS 필요로 판단
"error": str(e)
}목표: 사이트별 크롤링 난이도를 1~5점으로 점수화
파일: backend/app/core/site_analyzer.py (추가)
class SiteAnalyzer:
# ... 기존 코드 ...
def calculate_difficulty_score(self, url: str) -> Dict:
"""
크롤링 난이도 점수 계산 (1: 매우 쉬움 ~ 5: 매우 어려움)
"""
score = 1
factors = []
# robots.txt 확인
robots_result = self.check_robots_txt(url)
if not robots_result["allowed"]:
score += 2
factors.append("robots.txt에서 크롤링 금지")
elif robots_result["crawl_delay"] > 5:
score += 1
factors.append(f"긴 크롤링 지연시간: {robots_result['crawl_delay']}초")
# JavaScript 렌더링 확인
js_result = self.detect_javascript_rendering(url)
if js_result["needs_js"]:
score += 1
factors.append("JavaScript 렌더링 필요")
# 로그인 감지 (간단한 휴리스틱)
try:
response = requests.get(url)
content = response.text.lower()
login_keywords = ["login", "sign in", "로그인", "회원", "password"]
if any(keyword in content for keyword in login_keywords):
score += 1
factors.append("로그인이 필요할 수 있음")
except:
pass
# 점수 제한 (1-5)
score = min(5, max(1, score))
# 난이도 레벨 문자열
levels = {
1: "매우 쉬움 ★☆☆☆☆",
2: "쉬움 ★★☆☆☆",
3: "보통 ★★★☆☆",
4: "어려움 ★★★★☆",
5: "매우 어려움 ★★★★★"
}
return {
"score": score,
"level": levels[score],
"factors": factors,
"recommendation": self._get_recommendation(score)
}
def _get_recommendation(self, score: int) -> str:
"""점수별 권장사항"""
recommendations = {
1: "기본 크롤링으로 충분합니다",
2: "간단한 설정으로 크롤링 가능합니다",
3: "약간의 추가 설정이 필요할 수 있습니다",
4: "고급 설정과 주의가 필요합니다",
5: "크롤링이 매우 어렵거나 불가능할 수 있습니다"
}
return recommendations[score]목표: Chrome Extension의 기본 골격 구성
파일: extension/manifest.json
{
"manifest_version": 3,
"name": "EasyCrawler",
"version": "1.0.0",
"description": "쉬운 웹 크롤링 도구",
"permissions": [
"activeTab",
"storage",
"scripting"
],
"host_permissions": [
"http://*/*",
"https://*/*"
],
"background": {
"service_worker": "background/background.js"
},
"content_scripts": [
{
"matches": ["<all_urls>"],
"js": ["content/content.js"],
"css": ["content/content.css"]
}
],
"action": {
"default_popup": "popup/popup.html",
"default_title": "EasyCrawler"
},
"icons": {
"16": "icons/icon16.png",
"48": "icons/icon48.png",
"128": "icons/icon128.png"
}
}파일: extension/popup/popup.html
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body { width: 350px; padding: 20px; font-family: Arial, sans-serif; }
.header { text-align: center; margin-bottom: 20px; }
.url-info { background: #f5f5f5; padding: 10px; border-radius: 5px; margin-bottom: 15px; }
.difficulty { display: flex; align-items: center; gap: 10px; }
.button {
width: 100%; padding: 12px; margin: 5px 0;
border: none; border-radius: 5px; cursor: pointer;
font-size: 14px; font-weight: bold;
}
.btn-primary { background: #007bff; color: white; }
.btn-success { background: #28a745; color: white; }
.btn-warning { background: #ffc107; color: black; }
.btn-secondary { background: #6c757d; color: white; }
.selected-elements { margin-top: 15px; }
.element-tag {
display: inline-block; background: #e9ecef;
padding: 4px 8px; margin: 2px; border-radius: 3px; font-size: 12px;
}
.status { padding: 5px; border-radius: 3px; text-align: center; margin: 5px 0; }
.status.success { background: #d4edda; color: #155724; }
.status.error { background: #f8d7da; color: #721c24; }
.status.warning { background: #fff3cd; color: #856404; }
</style>
</head>
<body>
<div class="header">
<h3>🕷️ EasyCrawler</h3>
<div class="url-info">
<div><strong>현재 페이지:</strong></div>
<div id="current-url">로딩 중...</div>
</div>
<div class="difficulty">
<span><strong>크롤링 난이도:</strong></span>
<span id="difficulty-score">분석 중...</span>
</div>
</div>
<div id="status" class="status" style="display: none;"></div>
<div class="selected-elements">
<strong>선택된 요소들:</strong>
<div id="selected-list">아직 선택된 요소가 없습니다</div>
</div>
<button id="select-mode" class="button btn-primary">
📍 요소 선택 모드
</button>
<button id="load-template" class="button btn-secondary">
📁 템플릿 불러오기
</button>
<button id="run-crawl" class="button btn-success" disabled>
▶️ 크롤링 실행
</button>
<button id="save-template" class="button btn-warning" disabled>
💾 템플릿 저장
</button>
<button id="settings" class="button btn-secondary">
⚙️ 설정
</button>
<script src="popup.js"></script>
</body>
</html>파일: extension/popup/popup.js
class EasyCrawlerPopup {
constructor() {
this.selectedElements = [];
this.currentUrl = '';
this.init();
}
async init() {
await this.loadCurrentTab();
this.bindEvents();
await this.analyzeSite();
this.updateUI();
}
async loadCurrentTab() {
const [tab] = await chrome.tabs.query({ active: true, currentWindow: true });
this.currentUrl = tab.url;
document.getElementById('current-url').textContent =
this.currentUrl.length > 50 ? this.currentUrl.substring(0, 50) + '...' : this.currentUrl;
}
bindEvents() {
document.getElementById('select-mode').addEventListener('click', () => {
this.toggleSelectMode();
});
document.getElementById('run-crawl').addEventListener('click', () => {
this.runCrawling();
});
document.getElementById('save-template').addEventListener('click', () => {
this.saveTemplate();
});
document.getElementById('load-template').addEventListener('click', () => {
this.loadTemplate();
});
document.getElementById('settings').addEventListener('click', () => {
this.openSettings();
});
// Content script에서 메시지 수신
chrome.runtime.onMessage.addListener((message, sender, sendResponse) => {
if (message.type === 'ELEMENT_SELECTED') {
this.addSelectedElement(message.element);
}
});
}
async analyzeSite() {
try {
this.showStatus('사이트 분석 중...', 'warning');
// Backend API 호출하여 사이트 분석
const response = await fetch('http://localhost:8000/api/analyze', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url: this.currentUrl })
});
const result = await response.json();
if (result.success) {
document.getElementById('difficulty-score').textContent = result.difficulty.level;
this.showStatus('분석 완료', 'success');
} else {
throw new Error(result.error);
}
} catch (error) {
console.error('사이트 분석 실패:', error);
document.getElementById('difficulty-score').textContent = '분석 실패';
this.showStatus('분석 실패: ' + error.message, 'error');
}
}
async toggleSelectMode() {
const [tab] = await chrome.tabs.query({ active: true, currentWindow: true });
await chrome.scripting.executeScript({
target: { tabId: tab.id },
func: () => {
if (window.easyCrawlerSelector) {
window.easyCrawlerSelector.toggle();
} else {
console.error('Content script not loaded');
}
}
});
this.showStatus('요소 선택 모드가 활성화되었습니다', 'success');
}
addSelectedElement(element) {
this.selectedElements.push(element);
this.updateSelectedElementsList();
this.updateButtonStates();
}
updateSelectedElementsList() {
const listElement = document.getElementById('selected-list');
if (this.selectedElements.length === 0) {
listElement.textContent = '아직 선택된 요소가 없습니다';
return;
}
listElement.innerHTML = '';
this.selectedElements.forEach((element, index) => {
const tag = document.createElement('span');
tag.className = 'element-tag';
tag.textContent = `${element.name} (${element.selector})`;
const removeBtn = document.createElement('button');
removeBtn.textContent = '×';
removeBtn.style.marginLeft = '5px';
removeBtn.onclick = () => this.removeSelectedElement(index);
tag.appendChild(removeBtn);
listElement.appendChild(tag);
});
}
removeSelectedElement(index) {
this.selectedElements.splice(index, 1);
this.updateSelectedElementsList();
this.updateButtonStates();
}
updateButtonStates() {
const hasElements = this.selectedElements.length > 0;
document.getElementById('run-crawl').disabled = !hasElements;
document.getElementById('save-template').disabled = !hasElements;
}
showStatus(message, type) {
const statusElement = document.getElementById('status');
statusElement.textContent = message;
statusElement.className = `status ${type}`;
statusElement.style.display = 'block';
setTimeout(() => {
statusElement.style.display = 'none';
}, 3000);
}
async runCrawling() {
if (this.selectedElements.length === 0) {
this.showStatus('선택된 요소가 없습니다', 'error');
return;
}
try {
this.showStatus('크롤링 실행 중...', 'warning');
const response = await fetch('http://localhost:8000/api/crawl', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
url: this.currentUrl,
selectors: this.selectedElements
})
});
const result = await response.json();
if (result.success) {
this.showStatus(`크롤링 완료! ${result.data.length}개 항목 수집`, 'success');
// 결과 다운로드 링크 표시
this.showDownloadLink(result.download_url);
} else {
throw new Error(result.error);
}
} catch (error) {
this.showStatus('크롤링 실패: ' + error.message, 'error');
}
}
showDownloadLink(downloadUrl) {
const linkElement = document.createElement('a');
linkElement.href = downloadUrl;
linkElement.textContent = '📥 Excel 파일 다운로드';
linkElement.className = 'button btn-success';
linkElement.style.textDecoration = 'none';
linkElement.style.display = 'block';
linkElement.style.textAlign = 'center';
document.body.appendChild(linkElement);
}
async saveTemplate() {
const templateName = prompt('템플릿 이름을 입력하세요:');
if (!templateName) return;
try {
const template = {
name: templateName,
url_pattern: this.extractUrlPattern(this.currentUrl),
selectors: this.selectedElements,
created_at: new Date().toISOString()
};
await chrome.storage.local.set({
[`template_${Date.now()}`]: template
});
this.showStatus('템플릿이 저장되었습니다', 'success');
} catch (error) {
this.showStatus('템플릿 저장 실패: ' + error.message, 'error');
}
}
extractUrlPattern(url) {
// URL에서 패턴 추출 (도메인 + 경로 패턴)
try {
const urlObj = new URL(url);
return `${urlObj.origin}${urlObj.pathname.replace(/\/\d+/g, '/*')}`;
} catch {
return url;
}
}
async loadTemplate() {
try {
const templates = await chrome.storage.local.get();
const templateKeys = Object.keys(templates).filter(key => key.startsWith('template_'));
if (templateKeys.length === 0) {
this.showStatus('저장된 템플릿이 없습니다', 'warning');
return;
}
// 간단한 선택 UI (나중에 개선)
const templateNames = templateKeys.map(key => templates[key].name);
const selectedTemplate = prompt('템플릿을 선택하세요:\n' + templateNames.join('\n'));
if (selectedTemplate) {
const template = Object.values(templates).find(t => t.name === selectedTemplate);
if (template) {
this.selectedElements = template.selectors;
this.updateSelectedElementsList();
this.updateButtonStates();
this.showStatus('템플릿이 로드되었습니다', 'success');
}
}
} catch (error) {
this.showStatus('템플릿 로드 실패: ' + error.message, 'error');
}
}
openSettings() {
// 설정 페이지 열기 (나중에 구현)
chrome.tabs.create({ url: chrome.runtime.getURL('settings/settings.html') });
}
}
// 팝업이 로드되면 초기화
document.addEventListener('DOMContentLoaded', () => {
new EasyCrawlerPopup();
});목표: 웹페이지에서 요소를 시각적으로 선택할 수 있는 기능
파일: extension/content/content.js
class ElementSelector {
constructor() {
this.isActive = false;
this.selectedElements = [];
this.overlay = null;
this.tooltip = null;
this.init();
}
init() {
this.createOverlay();
this.createTooltip();
this.bindEvents();
}
createOverlay() {
this.overlay = document.createElement('div');
this.overlay.id = 'easycrawler-overlay';
this.overlay.style.cssText = `
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0, 123, 255, 0.1);
z-index: 999999;
display: none;
pointer-events: none;
`;
document.body.appendChild(this.overlay);
}
createTooltip() {
this.tooltip = document.createElement('div');
this.tooltip.id = 'easycrawler-tooltip';
this.tooltip.style.cssText = `
position: fixed;
background: #333;
color: white;
padding: 8px 12px;
border-radius: 4px;
font-size: 12px;
font-family: Arial, sans-serif;
z-index: 1000000;
display: none;
pointer-events: none;
max-width: 300px;
word-wrap: break-word;
`;
document.body.appendChild(this.tooltip);
}
bindEvents() {
this.handleMouseMove = this.handleMouseMove.bind(this);
this.handleClick = this.handleClick.bind(this);
this.handleKeyPress = this.handleKeyPress.bind(this);
}
toggle() {
if (this.isActive) {
this.deactivate();
} else {
this.activate();
}
}
activate() {
this.isActive = true;
this.overlay.style.display = 'block';
document.addEventListener('mousemove', this.handleMouseMove);
document.addEventListener('click', this.handleClick);
document.addEventListener('keydown', this.handleKeyPress);
document.body.style.cursor = 'crosshair';
this.showInstructions();
}
deactivate() {
this.isActive = false;
this.overlay.style.display = 'none';
this.tooltip.style.display = 'none';
document.removeEventListener('mousemove', this.handleMouseMove);
document.removeEventListener('click', this.handleClick);
document.removeEventListener('keydown', this.handleKeyPress);
document.body.style.cursor = 'default';
this.clearHighlight();
this.hideInstructions();
}
handleMouseMove(event) {
if (!this.isActive) return;
const element = document.elementFromPoint(event.clientX, event.clientY);
if (!element || element === this.overlay || element === this.tooltip) return;
this.highlightElement(element);
this.showTooltip(event, element);
}
handleClick(event) {
if (!this.isActive) return;
event.preventDefault();
event.stopPropagation();
const element = document.elementFromPoint(event.clientX, event.clientY);
if (!element || element === this.overlay || element === this.tooltip) return;
this.selectElement(element);
}
handleKeyPress(event) {
if (!this.isActive) return;
if (event.key === 'Escape') {
this.deactivate();
}
}
highlightElement(element) {
this.clearHighlight();
const rect = element.getBoundingClientRect();
const highlight = document.createElement('div');
highlight.className = 'easycrawler-highlight';
highlight.style.cssText = `
position: fixed;
top: ${rect.top}px;
left: ${rect.left}px;
width: ${rect.width}px;
height: ${rect.height}px;
border: 2px solid #007bff;
background: rgba(0, 123, 255, 0.2);
z-index: 999998;
pointer-events: none;
`;
document.body.appendChild(highlight);
}
clearHighlight() {
const highlights = document.querySelectorAll('.easycrawler-highlight');
highlights.forEach(highlight => highlight.remove());
}
showTooltip(event, element) {
const selector = this.generateSelector(element);
const text = this.getElementText(element);
this.tooltip.innerHTML = `
<strong>태그:</strong> ${element.tagName.toLowerCase()}<br>
<strong>선택자:</strong> ${selector}<br>
<strong>텍스트:</strong> ${text.substring(0, 50)}${text.length > 50 ? '...' : ''}<br>
<em>클릭하여 선택</em>
`;
this.tooltip.style.display = 'block';
this.tooltip.style.left = (event.clientX + 10) + 'px';
this.tooltip.style.top = (event.clientY + 10) + 'px';
// 화면 경계 체크
const tooltipRect = this.tooltip.getBoundingClientRect();
if (tooltipRect.right > window.innerWidth) {
this.tooltip.style.left = (event.clientX - tooltipRect.width - 10) + 'px';
}
if (tooltipRect.bottom > window.innerHeight) {
this.tooltip.style.top = (event.clientY - tooltipRect.height - 10) + 'px';
}
}
selectElement(element) {
const selector = this.generateSelector(element);
const elementData = {
name: this.generateElementName(element),
selector: selector,
tagName: element.tagName.toLowerCase(),
text: this.getElementText(element),
attributes: this.getElementAttributes(element)
};
this.selectedElements.push(elementData);
// 선택된 요소 시각적 표시
this.markAsSelected(element);
// 팝업에 알림
chrome.runtime.sendMessage({
type: 'ELEMENT_SELECTED',
element: elementData
});
console.log('선택된 요소:', elementData);
}
markAsSelected(element) {
element.style.outline = '3px solid #28a745';
element.style.backgroundColor = 'rgba(40, 167, 69, 0.1)';
// 선택 표시 제거 (3초 후)
setTimeout(() => {
element.style.outline = '';
element.style.backgroundColor = '';
}, 3000);
}
generateSelector(element) {
// CSS 선택자 생성 (우선순위: id > class > tag)
if (element.id) {
return `#${element.id}`;
}
if (element.className) {
const classes = element.className.split(' ').filter(c => c.trim());
if (classes.length > 0) {
return `.${classes.join('.')}`;
}
}
// 부모 요소와의 관계를 고려한 선택자
const tagName = element.tagName.toLowerCase();
const parent = element.parentElement;
if (parent) {
const siblings = Array.from(parent.children).filter(child =>
child.tagName.toLowerCase() === tagName
);
if (siblings.length > 1) {
const index = siblings.indexOf(element) + 1;
return `${this.generateSelector(parent)} > ${tagName}:nth-child(${index})`;
} else {
return `${this.generateSelector(parent)} > ${tagName}`;
}
}
return tagName;
}
generateElementName(element) {
// 요소의 의미있는 이름 생성
const text = this.getElementText(element);
const tagName = element.tagName.toLowerCase();
if (text && text.length > 0) {
const shortText = text.substring(0, 20).replace(/\s+/g, '_');
return `${tagName}_${shortText}`;
}
if (element.getAttribute('name')) {
return element.getAttribute('name');
}
if (element.getAttribute('id')) {
return element.getAttribute('id');
}
return `${tagName}_${Date.now()}`;
}
getElementText(element) {
// 요소의 텍스트 내용 추출 (자식 요소 제외)
let text = '';
for (let node of element.childNodes) {
if (node.nodeType === Node.TEXT_NODE) {
text += node.textContent.trim() + ' ';
}
}
return text.trim();
}
getElementAttributes(element) {
const attributes = {};
for (let attr of element.attributes) {
attributes[attr.name] = attr.value;
}
return attributes;
}
showInstructions() {
const instructions = document.createElement('div');
instructions.id = 'easycrawler-instructions';
instructions.style.cssText = `
position: fixed;
top: 20px;
left: 50%;
transform: translateX(-50%);
background: #333;
color: white;
padding: 15px;
border-radius: 8px;
font-family: Arial, sans-serif;
font-size: 14px;
z-index: 1000001;
text-align: center;
`;
instructions.innerHTML = `
<strong>🎯 요소 선택 모드</strong><br>
마우스를 올려서 요소를 확인하고 클릭하여 선택하세요<br>
<small>ESC 키로 종료</small>
`;
document.body.appendChild(instructions);
// 5초 후 자동 제거
setTimeout(() => {
if (instructions.parentNode) {
instructions.remove();
}
}, 5000);
}
hideInstructions() {
const instructions = document.getElementById('easycrawler-instructions');
if (instructions) {
instructions.remove();
}
}
}
// 전역에서 접근 가능하도록 설정
window.easyCrawlerSelector = new ElementSelector();
// Chrome Extension 메시지 리스너
chrome.runtime.onMessage.addListener((message, sender, sendResponse) => {
if (message.type === 'TOGGLE_SELECTOR') {
window.easyCrawlerSelector.toggle();
sendResponse({ success: true });
}
if (message.type === 'GET_SELECTED_ELEMENTS') {
sendResponse({
elements: window.easyCrawlerSelector.selectedElements
});
}
});파일: extension/content/content.css
/* EasyCrawler Content Script Styles */
#easycrawler-overlay {
position: fixed !important;
top: 0 !important;
left: 0 !important;
width: 100% !important;
height: 100% !important;
background: rgba(0, 123, 255, 0.05) !important;
z-index: 999999 !important;
pointer-events: none !important;
}
#easycrawler-tooltip {
position: fixed !important;
background: #333 !important;
color: white !important;
padding: 8px 12px !important;
border-radius: 4px !important;
font-size: 12px !important;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif !important;
z-index: 1000000 !important;
display: none !important;
pointer-events: none !important;
max-width: 300px !important;
word-wrap: break-word !important;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.3) !important;
}
.easycrawler-highlight {
position: fixed !important;
border: 2px solid #007bff !important;
background: rgba(0, 123, 255, 0.15) !important;
z-index: 999998 !important;
pointer-events: none !important;
animation: easycrawler-pulse 1s infinite !important;
}
@keyframes easycrawler-pulse {
0% { background: rgba(0, 123, 255, 0.15); }
50% { background: rgba(0, 123, 255, 0.25); }
100% { background: rgba(0, 123, 255, 0.15); }
}
#easycrawler-instructions {
position: fixed !important;
top: 20px !important;
left: 50% !important;
transform: translateX(-50%) !important;
background: linear-gradient(135deg, #333, #555) !important;
color: white !important;
padding: 15px 20px !important;
border-radius: 8px !important;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif !important;
font-size: 14px !important;
z-index: 1000001 !important;
text-align: center !important;
box-shadow: 0 6px 20px rgba(0, 0, 0, 0.3) !important;
animation: easycrawler-slideDown 0.3s ease-out !important;
}
@keyframes easycrawler-slideDown {
from {
opacity: 0;
transform: translateX(-50%) translateY(-20px);
}
to {
opacity: 1;
transform: translateX(-50%) translateY(0);
}
}
/* 선택된 요소 스타일 */
.easycrawler-selected {
outline: 3px solid #28a745 !important;
background: rgba(40, 167, 69, 0.1) !important;
}목표: 백엔드 API 서버 구축
파일: backend/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import uvicorn
import os
from pathlib import Path
from app.api import router as api_router
from app.core.config import settings
# FastAPI 애플리케이션 생성
app = FastAPI(
title="EasyCrawler API",
description="쉬운 웹 크롤링 도구 API",
version="1.0.0"
)
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 개발 환경용, 운영에서는 제한
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# API 라우터 등록
app.include_router(api_router, prefix="/api/v1")
# 정적 파일 서빙 (결과 다운로드용)
results_dir = Path("data/results")
results_dir.mkdir(parents=True, exist_ok=True)
app.mount("/downloads", StaticFiles(directory=str(results_dir)), name="downloads")
@app.get("/")
async def root():
return {
"message": "EasyCrawler API",
"version": "1.0.0",
"docs": "/docs"
}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)파일: backend/app/core/config.py
from pydantic import BaseSettings
from typing import List
class Settings(BaseSettings):
# 기본 설정
APP_NAME: str = "EasyCrawler"
DEBUG: bool = True
# 데이터베이스
DATABASE_URL: str = "sqlite:///./easycrawler.db"
# 크롤링 설정
DEFAULT_USER_AGENT: str = "EasyCrawler/1.0 (+https://easycrawler.com)"
DEFAULT_CRAWL_DELAY: int = 1
MAX_CONCURRENT_REQUESTS: int = 10
REQUEST_TIMEOUT: int = 30
# 파일 경로
TEMPLATES_DIR: str = "data/templates"
RESULTS_DIR: str = "data/results"
LOGS_DIR: str = "data/logs"
# 보안
ALLOWED_DOMAINS: List[str] = [] # 빈 리스트면 모든 도메인 허용
class Config:
env_file = ".env"
settings = Settings()파일: backend/app/api/__init__.py
from fastapi import APIRouter
from .endpoints import analyze, crawl, templates
router = APIRouter()
# 엔드포인트 등록
router.include_router(analyze.router, prefix="/analyze", tags=["analyze"])
router.include_router(crawl.router, prefix="/crawl", tags=["crawl"])
router.include_router(templates.router, prefix="/templates", tags=["templates"])목표: 웹사이트 크롤링 가능성 분석 API
파일: backend/app/api/endpoints/analyze.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, HttpUrl
from typing import Dict, Any
import logging
from app.core