한국 주식 시장 분석, 금융 공시 및 지식 그래프 구축을 위한 Apache Airflow 기반 데이터 오케스트레이션 및 ETL 파이프라인입니다.
Stockelper Airflow는 한국 금융 시장 데이터의 수집, 처리 및 저장을 자동화하는 종합적인 데이터 엔지니어링 플랫폼입니다. KRX(한국거래소), DART(전자공시시스템), 다양한 금융 리포트 플랫폼 등 여러 데이터 소스와 통합하여 통합 데이터 웨어하우스 및 지식 그래프를 구축합니다.
- 자동화된 데이터 수집: 주가, 금융 리포트, 규제 공시의 일일 자동 수집
- 다중 소스 통합: KRX, DART, Wisereport 및 기타 금융 데이터 제공자
- 지식 그래프 구축: 고급 분석을 위한 Neo4j 기반 관계형 데이터 모델링
- 확장 가능한 아키텍처: 영구 스토리지를 갖춘 Docker 기반 배포
- 모니터링 및 유지보수: 내장된 로그 관리 및 상태 확인 기능
- PostgreSQL 및 MongoDB: 구조화/비구조화 데이터를 위한 이중 데이터베이스 아키텍처
- 오케스트레이션: Apache Airflow 2.10.4
- 데이터베이스:
- PostgreSQL (구조화 데이터: 주가, DART 공시)
- MongoDB (비구조화 데이터: 리포트, 경쟁사 정보)
- Neo4j (지식 그래프)
- 웹 스크래핑: Selenium 4.27+ with Chrome/ChromeDriver
- 데이터 처리: Pandas, FinanceDataReader
- 컨테이너 플랫폼: Docker & Docker Compose
┌─────────────────┐
│ 데이터 소스 │
│ - KRX │
│ - DART │
│ - Wisereport │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Airflow DAGs │
│ - 크롤러 │
│ - ETL 작업 │
└────────┬────────┘
│
├──────────┐
▼ ▼
┌──────────┐ ┌─────────┐
│PostgreSQL│ │ MongoDB │
│ (주가/공시)│ │(리포트) │
└─────┬────┘ └────┬────┘
│ │
└────┬───────┘
▼
┌──────────────┐
│ Neo4j │
│ (지식 그래프) │
└──────────────┘
stockelper-airflow/
├── dags/ # Airflow DAG 정의
│ ├── stock_report_crawler_dag.py
│ ├── competitor_crawler_dag.py
│ ├── stock_to_postgres_dag.py
│ ├── dart_disclosure_collection_dag.py
│ ├── dart_disclosure_collection_backfill_dag.py
│ ├── neo4j_kg_etl_dag.py
│ ├── neo4j_kg_rebuild_dag.py
│ └── log_cleanup_dag.py
│
├── modules/ # Python 모듈
│ ├── common/ # 공통 유틸리티
│ │ ├── logging_config.py
│ │ ├── airflow_settings.py
│ │ └── db_connections.py
│ ├── report_crawler/ # 금융 리포트 크롤러
│ │ └── stock_report_crawler.py
│ ├── company_crawler/ # 기업 데이터 크롤러
│ │ └── compete_company_crawler.py
│ ├── stock_price/ # 주가 ETL
│ │ ├── stock_to_db.py
│ │ └── fetch_stock_operators.py
│ ├── dart_disclosure/ # DART API 통합
│ │ ├── runner.py
│ │ ├── opendart_api.py
│ │ ├── llm_extractor.py
│ │ ├── mongo_repo.py
│ │ └── universe.py
│ ├── postgres/ # PostgreSQL 커넥터
│ │ └── postgres_connector.py
│ ├── neo4j/ # Neo4j 오퍼레이터
│ │ └── neo4j_operators.py
│ └── api/ # API 유틸리티
│ └── data_validator.py
│
├── scripts/ # 배포 및 유지보수 스크립트
│ ├── deploy.sh
│ ├── stop.sh
│ ├── bootstrap_airflow.sh
│ ├── setup_network.sh
│ ├── cleanup_logs.sh
│ └── smoke_test_dart_disclosure.py
│
├── config/ # 설정 파일
│ └── airflow.cfg
│
├── data/ # 데이터 저장소 (마운트 볼륨)
├── docker-compose.yml # Docker 오케스트레이션
├── Dockerfile # 컨테이너 이미지 정의
├── requirements.txt # Python 의존성
├── .env.example # 환경 변수 템플릿
└── README.md # 이 파일
- 스케줄: 매일 00:00 UTC
- 목적: 한국 금융 플랫폼에서 금융 리포트 크롤링
- 기술: Selenium 기반 웹 스크래핑
- 출력: MongoDB
stock_reports컬렉션 - 작업:
- MongoDB 연결 체크
- 헤드리스 Chrome으로 리포트 크롤링
- 결과 리포트 및 검증
- 스케줄: 매일 00:00 UTC
- 목적: Wisereport에서 경쟁사 정보 수집
- 데이터 소스: KOSPI, KOSDAQ, KONEX 시장 구분
- 출력: MongoDB
competitors컬렉션 - 방법: REST API 통합
- 스케줄: @daily
- 목적: KRX에서 일일 주가 데이터 가져오기 및 저장
- 데이터 소스: FinanceDataReader (KRX API)
- 출력: PostgreSQL
daily_stock_price테이블 - 작업:
- 테이블 스키마 생성 확인
- 주식 심볼 유니버스 업데이트
- 일일 가격 데이터 가져오기
- PostgreSQL에 upsert 로직으로 로드
- 스케줄: 매일 08:00 KST
- 목적: DART에서 36개 주요 공시 유형 수집
- 데이터 소스: OpenDART API
- 출력: PostgreSQL
dart_*테이블 (공시 유형별 1개) - 기능:
- 엄선된 주요 리포트 엔드포인트
- API 키 로테이션 지원
- 자동 커서 관리
- 대용량 데이터셋을 위한 청크 처리
- 참고: 2025-01-06 기준 LLM 기반 이벤트 추출은 보류됨
- 스케줄: 수동 트리거
- 목적: DART 공시 과거 데이터 백필
- 사용 사례: 초기 데이터 로드 또는 누락 구간 채우기
- 스케줄: 매일 02:00 UTC
- 목적: 7일 이상 된 Airflow 로그 자동 정리
- 작업:
- 정리 전 통계
- 로그 파일 삭제
- 정리 후 통계
- 스케줄: 매일 20:10 KST (주가 수집 완료 후)
- 목적: Neo4j 지식 그래프 구축 및 업데이트
- 아키텍처: 날짜 기반 허브 모델
- 데이터 소스:
- PostgreSQL
daily_stock_price - PostgreSQL
dart_*테이블
- PostgreSQL
- 작업:
- 기본 KG 스키마 생성 (제약조건, 인덱스)
- 증분 로드를 위한 대상 날짜 결정
- 일일 주가 로드
- DART 공시 이벤트 로드
- 경쟁사 관계 로드 (MongoDB에서)
- 스케줄: 수동 트리거
- 목적: Neo4j 지식 그래프 전체 재구축
- 사용 사례: 스키마 변경 또는 데이터 손상 복구
- Docker & Docker Compose
- Git
- Docker에 최소 4GB RAM 할당
- 다음에 대한 네트워크 액세스:
- KRX 데이터 소스
- DART OpenAPI (api.opendart.fsk.or.kr)
- MongoDB Atlas (클라우드 MongoDB 사용 시)
-
저장소 클론
cd /path/to/your/workspace git clone <repository-url> stockelper-airflow cd stockelper-airflow
-
환경 변수 설정
cp .env.example .env
.env파일을 편집하여 설정:# MongoDB 연결 MONGODB_URI=mongodb+srv://username:password@cluster.mongodb.net/ MONGO_DATABASE=stockelper # PostgreSQL 연결 POSTGRES_HOST=stockelper-postgresql POSTGRES_PORT=5432 POSTGRES_USER=stockelper POSTGRES_PASSWORD=your_secure_password POSTGRES_DB=postgres # Airflow 보안 AIRFLOW_SECRET_KEY=$(python -c "import secrets; print(secrets.token_urlsafe(32))") AIRFLOW_ADMIN_USERNAME=admin AIRFLOW_ADMIN_PASSWORD=change_this_password AIRFLOW_ADMIN_EMAIL=admin@yourdomain.com # DART API OPEN_DART_API_KEY=your_dart_api_key_here # Neo4j (선택사항) NEO4J_HOST=stockelper-neo4j NEO4J_PORT=7687 NEO4J_USER=neo4j NEO4J_PASSWORD=your_neo4j_password
-
Docker 네트워크 생성
./scripts/setup_network.sh
-
Docker Compose로 배포
./scripts/deploy.sh
-
Airflow UI 접속
- URL: http://localhost:21003
- 사용자명: admin (또는 설정한 값)
- 비밀번호: admin (또는 설정한 값)
-
설치 확인
docker logs stockelper-airflow
-
가상 환경 생성
python3.12 -m venv .venv source .venv/bin/activate # Windows: .venv\Scripts\activate
-
의존성 설치
pip install --upgrade pip pip install -r requirements.txt
-
데이터베이스 설정
- PostgreSQL 및 MongoDB 인스턴스 시작
.env에서 연결 문자열 설정
-
Airflow 초기화
export AIRFLOW_HOME=$(pwd) airflow db init
-
관리자 사용자 생성
airflow users create \ --username admin \ --password admin \ --firstname Admin \ --lastname User \ --role Admin \ --email admin@stockelper.com
-
Airflow 서비스 시작
# 터미널 1: 스케줄러 시작 airflow scheduler # 터미널 2: 웹서버 시작 airflow webserver --port 8080
-
Airflow UI 접속
| 변수 | 필수 | 기본값 | 설명 |
|---|---|---|---|
MONGODB_URI |
예 | - | MongoDB 연결 문자열 |
MONGO_DATABASE |
예 | stockelper |
MongoDB 데이터베이스 이름 |
POSTGRES_HOST |
예 | stockelper-postgresql |
PostgreSQL 호스트 |
POSTGRES_PORT |
아니오 | 5432 |
PostgreSQL 포트 |
POSTGRES_USER |
예 | stockelper |
PostgreSQL 사용자명 |
POSTGRES_PASSWORD |
예 | - | PostgreSQL 비밀번호 |
POSTGRES_DB |
아니오 | postgres |
PostgreSQL 데이터베이스 이름 |
AIRFLOW_SECRET_KEY |
예 | - | Airflow 웹 서버 비밀 키 |
AIRFLOW_ADMIN_USERNAME |
아니오 | admin |
초기 관리자 사용자명 |
AIRFLOW_ADMIN_PASSWORD |
아니오 | admin |
초기 관리자 비밀번호 |
AIRFLOW_ADMIN_EMAIL |
아니오 | admin@stockelper.com |
관리자 이메일 주소 |
OPEN_DART_API_KEY |
예 | - | DART OpenAPI 키 |
DART36_LOOKBACK_DAYS |
아니오 | 30 |
DART 수집 조회 기간 |
DART36_SLEEP_SECONDS |
아니오 | 0.2 |
DART API 호출 간 지연 시간 |
DART36_TIMEOUT_SECONDS |
아니오 | 30 |
DART API 요청 타임아웃 |
DART36_MAX_RETRIES |
아니오 | 3 |
DART API 호출 최대 재시도 횟수 |
STOCK_PRICE_EOD_CUTOFF_HOUR |
아니오 | 18 |
장 마감 기준 시간 (KST) |
NEO4J_HOST |
아니오 | stockelper-neo4j |
Neo4j 호스트 |
NEO4J_PORT |
아니오 | 7687 |
Neo4j bolt 포트 |
NEO4J_USER |
아니오 | neo4j |
Neo4j 사용자명 |
NEO4J_PASSWORD |
아니오 | - | Neo4j 비밀번호 |
다음 Airflow 연결은 부트스트랩 스크립트에 의해 자동으로 생성됩니다:
- postgres_default: 주식 데이터를 위한 PostgreSQL 연결
- neo4j_default: 지식 그래프를 위한 Neo4j 연결
Airflow UI (Admin > Variables)에서 다음을 설정하세요:
OPEN_DART_API_KEY: DART API 키DART_CURATED_UNIVERSE_JSON: 유니버스 JSON 파일 경로DART_CURATED_LOOKBACK_DAYS: DART 데이터 조회 일수
CREATE TABLE daily_stock_price (
symbol VARCHAR(20) NOT NULL,
date DATE NOT NULL,
open DECIMAL(15, 2),
high DECIMAL(15, 2),
low DECIMAL(15, 2),
close DECIMAL(15, 2),
volume BIGINT,
adjusted_close DECIMAL(15, 2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (symbol, date)
);각 DART 공시 유형별로 다음 패턴을 따르는 여러 테이블:
CREATE TABLE dart_{report_type} (
rcept_no VARCHAR(50) PRIMARY KEY,
corp_code VARCHAR(8),
corp_name VARCHAR(255),
stock_code VARCHAR(6),
report_nm VARCHAR(255),
rcept_dt DATE,
flr_nm VARCHAR(255),
-- 리포트 유형별 추가 필드
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);{
_id: ObjectId,
date: ISODate,
company: String,
code: String,
title: String,
summary: String,
url: String,
crawled_at: ISODate
}
// 인덱스: { date: 1, company: 1, code: 1 }{
_id: String, // 기업 코드
target_company: String,
competitors: [
{
name: String,
code: String,
similarity_score: Number
}
],
last_crawled_at: ISODate
}Date: 날짜 허브 노드 (YYYY-MM-DD)Company: 주식 심볼/기업Event: DART 공시 이벤트Sector: 산업 섹터
HAS_PRICE: Company -> Date (주가 데이터)HAS_EVENT: Company -> Event -> DateCOMPETES_WITH: Company -> CompanyBELONGS_TO: Company -> Sector
-
Airflow UI에서 DAG 활성화
- http://localhost:21003 으로 이동
- DAG를 "On" 상태로 토글
-
수동 실행 (선택사항)
- DAG 이름 클릭
- "Trigger DAG" 버튼 클릭
-
실행 모니터링
- Airflow UI에서 로그 확인
- 작업 상태 및 실행 시간 확인
DART 과거 데이터를 백필하려면:
dart_disclosure_collection_backfill_dag로 이동- "Trigger DAG w/ config" 클릭
- 설정 제공:
{ "start_date": "2024-01-01", "end_date": "2024-12-31" }
- DAG 실행 이력: http://localhost:21003/dags
- 작업 로그: 작업 인스턴스 클릭 -> View Log
- 작업 실행 시간: Graph View에서 실행 시간 표시
최근 주가 확인:
SELECT * FROM daily_stock_price
WHERE date >= CURRENT_DATE - INTERVAL '7 days'
ORDER BY date DESC, symbol
LIMIT 100;DART 공시 확인:
SELECT report_nm, COUNT(*) as count
FROM dart_major_reports
WHERE rcept_dt >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY report_nm
ORDER BY count DESC;지식 그래프 검증:
// 레이블별 노드 수
MATCH (n) RETURN labels(n) AS label, COUNT(*) AS count
// 최근 주가
MATCH (c:Company)-[r:HAS_PRICE]->(d:Date)
WHERE d.date >= date() - duration('P7D')
RETURN c.symbol, d.date, r.close
ORDER BY d.date DESC, c.symbol
LIMIT 100로그는 log_cleanup_dag에 의해 자동으로 정리됩니다 (매일 02:00 UTC 실행). 수동 정리:
# 컨테이너 내부
docker exec stockelper-airflow bash /opt/airflow/scripts/cleanup_logs.sh
# 호스트에서
./scripts/cleanup_logs_container.shPostgreSQL vacuum 및 analyze:
VACUUM ANALYZE daily_stock_price;
VACUUM ANALYZE dart_major_reports;MongoDB 인덱스 최적화:
db.stock_reports.reIndex();
db.competitors.reIndex();-
DAG 파일 수정
# DAG는 ./dags 디렉토리에서 마운트됨 vim dags/my_custom_dag.py -
Airflow 자동 변경 감지
- 30-60초 대기하여 Airflow가 새로고침
- 또는 즉시 업데이트를 위해 스케줄러 재시작
-
Airflow 재시작 (필요시)
docker-compose restart airflow
Error: MongoServerError: Authentication failed
해결책:
MONGODB_URI가 올바른지 확인- MongoDB Atlas IP 화이트리스트 확인
- 네트워크 연결 확인
Error: could not connect to server
해결책:
- PostgreSQL 컨테이너가 실행 중인지 확인:
docker ps | grep postgres POSTGRES_*환경 변수 확인- 네트워크 확인:
docker network inspect stockelper
Error: selenium.common.exceptions.WebDriverException: Message: unknown error: Chrome failed to start
해결책:
- Chrome/ChromeDriver는 Docker 이미지에 사전 설치됨
- 충분한 메모리 확보 (Docker에 4GB 이상)
- 로그 확인:
docker logs stockelper-airflow
DAG 파일이 Airflow UI에 표시되지 않음
해결책:
- Python 구문 오류 확인:
python dags/my_dag.py - 파일이
./dags디렉토리에 있는지 확인 - 스케줄러 로그 확인:
docker logs stockelper-airflow | grep scheduler - DAG 새로고침: Admin > Reload DAGs
Error: Bind for 0.0.0.0:21003 failed: port is already allocated
해결책:
docker-compose.yml에서 포트 변경:ports: - "21004:8080" # 21003을 사용 가능한 포트로 변경
Error: MemoryError or container killed
해결책:
- Docker 메모리 제한 증가 (Docker Desktop > Settings > Resources)
- 동시 작업 수 줄이기:
airflow.cfg편집 ->parallelism = 4
디버그 로깅 활성화:
-
환경 변수
# docker-compose.yml에서 - AIRFLOW__CORE__LOGGING_LEVEL=DEBUG -
서비스 재시작
docker-compose restart airflow
-
로그 확인
docker logs -f stockelper-airflow
Airflow 상태 엔드포인트:
curl http://localhost:21003/health데이터베이스 연결 테스트:
# PostgreSQL
docker exec stockelper-postgresql psql -U stockelper -c "SELECT 1"
# MongoDB
docker exec stockelper-airflow python -c "from pymongo import MongoClient; print(MongoClient('$MONGODB_URI').server_info())"-
DAG 파일 생성
# dags/my_new_dag.py from airflow import DAG from airflow.operators.python import PythonOperator import pendulum def my_task(): print("Hello from my task") with DAG( dag_id="my_new_dag", start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"), schedule="@daily", catchup=False, ) as dag: task = PythonOperator( task_id="my_task", python_callable=my_task, )
-
버전 관리에 추가
git add dags/my_new_dag.py git commit -m "Add new DAG for [목적]" -
로컬 테스트
# DAG 검증 python dags/my_new_dag.py # 작업 테스트 airflow tasks test my_new_dag my_task 2025-01-01
단위 테스트 실행:
pytest tests/DART 수집 스모크 테스트 실행:
python scripts/smoke_test_dart_disclosure.py- 저장소 포크
- 기능 브랜치 생성:
git checkout -b feature/my-feature - 변경사항 커밋:
git commit -am 'Add new feature' - 브랜치에 푸시:
git push origin feature/my-feature - Pull Request 생성
-
외부 데이터베이스 사용
- 관리형 PostgreSQL (AWS RDS, GCP Cloud SQL)
- MongoDB Atlas
- Neo4j AuraDB
-
인증 활성화
# 강력한 비밀번호 AIRFLOW_ADMIN_PASSWORD=$(openssl rand -base64 32) AIRFLOW_SECRET_KEY=$(python -c "import secrets; print(secrets.token_urlsafe(32))")
-
백업 구성
- PostgreSQL: 30일 보관 기간의 일일 백업
- MongoDB: 특정 시점 복구 활성화
- Airflow 로그: S3/GCS에 아카이브
-
병렬 처리를 위한 CeleryExecutor 사용
environment: - AIRFLOW__CORE__EXECUTOR=CeleryExecutor
-
모니터링 설정
- 메트릭을 위한 Prometheus + Grafana
- Airflow StatsD 통합
- 데이터베이스 성능 모니터링
-
HTTPS 활성화
- 리버스 프록시 사용 (nginx, traefik)
- SSL/TLS 인증서 (Let's Encrypt)
airflow.cfg에서 실험적 API 활성화:
[api]
auth_backend = airflow.api.auth.backend.basic_auth예제: API를 통한 DAG 트리거
curl -X POST \
http://localhost:21003/api/v1/dags/stock_to_postgres_dag/dagRuns \
-H 'Content-Type: application/json' \
-u admin:admin \
-d '{}'인덱스 존재 확인:
-- PostgreSQL
CREATE INDEX idx_stock_price_date ON daily_stock_price(date);
CREATE INDEX idx_stock_price_symbol ON daily_stock_price(symbol);
CREATE INDEX idx_dart_rcept_dt ON dart_major_reports(rcept_dt);- 독립적인 실행을 위해
depends_on_past=False설정 - 동시 실행 방지를 위해
max_active_runs=1사용 - 리소스 집약적 작업에
pool구현
config/airflow.cfg의 주요 설정:
[core]
parallelism = 32
dag_concurrency = 16
max_active_runs_per_dag = 1
[scheduler]
scheduler_heartbeat_sec = 5
min_file_process_interval = 30-
비밀 정보 커밋 금지
.env파일 사용 (gitignore됨)- 또는 환경 변수
- 또는 Airflow Variables (암호화됨)
-
네트워크 액세스 제한
- 데이터베이스 포트에 대한 방화벽 규칙
- 데이터베이스 액세스를 위한 VPN/VPC
-
정기적 업데이트
pip install --upgrade apache-airflow docker-compose pull
-
감사 로깅
- Airflow 감사 로그 활성화
- 로그인 실패 시도 모니터링
MIT License - 자세한 내용은 LICENSE 파일을 참조하세요.
Copyright (c) 2025 Stockelper-Lab
- 이슈: GitHub Issues
- 문서:
docs/디렉토리 참조 - 이메일: admin@stockelper.com
- Apache Airflow 커뮤니티
- OpenDART API
- FinanceDataReader 라이브러리
- 한국 금융 데이터 제공자
- 포괄적인 문서로 README 개선
- 문제 해결 섹션 추가
- 설정 예제 확장
- LLM 기반 이벤트 추출 보류
- DART 수집을 주요 리포트 유형을 이벤트로 사용하도록 업데이트
- 지식 그래프 ETL 파이프라인 단순화
- Neo4j 지식 그래프 통합 추가
- 엄선된 DART 주요 리포트 수집 구현
- Docker 배포 스크립트 개선
- 초기 릴리스
- 주가 및 DART 수집을 위한 핵심 DAG
- MongoDB 및 PostgreSQL 통합