Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8aa41b8
fix : dockeroperator bug by adding proxy
Aug 29, 2024
89233f2
feature : get_soup 유틸쪽으로 이동
Sep 3, 2024
fb0f20c
fix : readme
Sep 3, 2024
b763d17
fix : bug- typo
Sep 3, 2024
aba1054
ADD : 인스타그램 url 수집 모듈 추가
Sep 3, 2024
94e2d0d
fix : close driver after url method returned
Sep 3, 2024
f299ae2
add : materialize method in inscrawler
Sep 3, 2024
9ea3206
add : generate uid based on current datetime
Sep 3, 2024
639e49f
add : crawling post data based on url(tested)
Sep 3, 2024
ddeeb45
delete : browser class
Sep 3, 2024
87248de
fix : inscralwer 생성자 dev, prod 분리
Sep 3, 2024
0ab1ba2
add : 정해진 카테고리에 해당되는 브랜드명만 추출하는 모듈
Sep 3, 2024
a66b2c5
ADD : instagram crawling dag
Sep 3, 2024
932b294
fix : instagram 모듈 생성자에서 받는 arg 수정
Sep 4, 2024
f7a56ea
fix : instagram dag 브랜드리스트 받는 부분 task 분리
Sep 4, 2024
405b7e4
ADD : 임시 크롤러 스크립트
Sep 5, 2024
42ef085
fix : 임시 태스크 버그 수정
Sep 5, 2024
4844003
fix : refactor temporal task
Sep 5, 2024
6f6da8b
fix : s3 path
Sep 5, 2024
09ffaf4
fix : crawler avoid account blocked
Sep 8, 2024
5ba7f38
fix : crawler - sleep 조정, 스크롤다운 추가
Sep 8, 2024
c62a82c
add : humanize feature
Sep 28, 2024
0147d9a
add : catch suspicious account popup
Sep 28, 2024
6e1666f
fix : dummy
Sep 28, 2024
a31ea73
refactor : move driver_making component to utils
seoyeong200 Nov 11, 2024
6f8aab7
refactor : brand, items class 분리
seoyeong200 Nov 15, 2024
f5ebd9d
refactor : brand class 변수명 다듬기
seoyeong200 Nov 15, 2024
dae3191
fix : item 기본정보 수집기 동작 오류 수정, dataclass 추가
seoyeong200 Nov 15, 2024
38a8c2e
add : 각 아이템 url 들어가서 리뷰 수집
seoyeong200 Nov 19, 2024
d1d3077
fix : dummy - main 함수, 함수 설명 추가
seoyeong200 Nov 19, 2024
7967502
bug : oliveyoung item 수집 코드
seoyeong200 Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ log

# kafka data
kafka-data
aws_credentials
aws_credentials
brickstudy_ingestion/dags/viral/tmp
brickstudy_ingestion/src/scrapper/results
.DS_Store
4 changes: 3 additions & 1 deletion brickstudy_ingestion/dags/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ def set_env_variables():
"TWITTER_CLIENT_ID",
"TWITTER_CLIENT_PASSWORD",
"TWITTER_TOKEN"
"TWITTER_CRAWLER_AUTH_TOKEN_PASSWORD"
# Instagram
"INSTAGRAM_CLIENT_ID",
"INSTAGRAM_CLIENT_PASSWORD"
]
for ENV_VARIABLE in ALL_ENV_VARIABLES:
os.environ[ENV_VARIABLE] = Variable.get(ENV_VARIABLE, "")
89 changes: 89 additions & 0 deletions brickstudy_ingestion/dags/viral/instagram_crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator
from airflow.models import Variable

from src.scrapper.brand_name_getter import get_brand_list_fr_s3

# =========================================
# Change parameter
DAG_ID = "bronze_viral_instagram"
TARGET_PLATFORM = 'instagram'

# Set aiflow setting
default_args = {
'owner': 'brickstudy',
'start_date': days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=1),
# 'on_failure_callback': on_failure_callback,
}
# =========================================


def get_brand_list():
import os
for ENV_VARIABLE in ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY']:
os.environ[ENV_VARIABLE] = Variable.get(ENV_VARIABLE, "")
return get_brand_list_fr_s3()


def instagram_crawling(brand_lst, id, pwd):
import os
import logging
from src.common.kafka.utils import Kafka
from src.scrapper.inscrawler import InsCrawler
from src.scrapper.ins_url import InsURLCrawler
from src.scrapper.ins_data import InsDataCrawler

os.environ['INSTAGRAM_CLIENT_ID'] = id
os.environ['INSTAGRAM_CLIENT_PASSWORD'] = pwd

def crawl_instagram(keywords: tuple):
crawler = InsURLCrawler(InsCrawler(keywords=keywords)).get_urls()
post_crawler = InsDataCrawler(crawler.data)
post_crawler.get_post_data()
producer.send_data_to_kafka(
kafka_topic='instagram',
data=post_crawler.data
)

try:
producer = Kafka()
crawl_instagram(brand_lst)
except Exception as e:
logging.error("***entrypoint error***", e)
raise


with DAG(
dag_id=DAG_ID,
default_args=default_args,
schedule_interval='@daily',
catchup=False
):
t1 = PythonOperator(
task_id='get_brand_list_from_s3',
python_callable=get_brand_list
)

t2 = PythonVirtualenvOperator(
task_id='crawl_instagram_based_on_keyword',
system_site_packages=False,
op_kwargs={
'brand_lst': "{{ ti.xcom_pull(task_ids='get_brand_list_from_s3') }}",
'id': Variable.get('INSTAGRAM_CLIENT_ID'),
'pwd': Variable.get('INSTAGRAM_CLIENT_PASSWORD')
},
python_version='3.10',
system_site_packages=False,
requirements=['selenium==4.24.0', 'webdriver-manager==4.0.2',
'bs4==0.0.2', 'beautifulsoup4==4.12.3',
'lxml==5.3.0', 'pytz==2024.1',
"python-dotenv==0.19.0", "multiprocess", "kafka-python"],
python_callable=instagram_crawling
)

t1 >> t2
9 changes: 6 additions & 3 deletions brickstudy_ingestion/dags/viral/twitter_crawler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
import os

from airflow import DAG
from airflow.models import Variable
Expand All @@ -20,11 +21,11 @@
}
# =========================================


OUTPUT_FILENAME = "test.csv"
SEARCH_KEYWORD = "enhypen"
LIMIT = 10
TOKEN = Variable.get("TWITTER_CRAWLER_AUTH_TOKEN_PASSWORD")
HOST_BASE_PATH = '/Users/seoyeongkim/Documents/ETL'

with DAG(
dag_id=DAG_ID,
Expand All @@ -36,15 +37,17 @@
task_id='t_docker',
image='brickstudy/twitter_crawler:latest',
container_name='twitter_crawler',
api_version='1.37',
auto_remove=True,
mount_tmp_dir=False,
mounts=[
Mount(source="/opt/airflow/logs/tweets-data", target="/app/tweets-data", type="bind"),
Mount(source=f"{HOST_BASE_PATH}/logs", target="/app/tweets-data", type="bind"),
],
command=[
"bash", "-c",
f"npx --yes tweet-harvest@latest -o {OUTPUT_FILENAME} -s {SEARCH_KEYWORD} -l {LIMIT} --token {TOKEN}"
],
docker_url='unix://var/run/docker.sock',
docker_url='tcp://docker-socket-proxy:2375',
network_mode='bridge',
)

Expand Down
34 changes: 0 additions & 34 deletions brickstudy_ingestion/src/scrapper/__init__.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,4 @@
import urllib
from urllib.request import urlopen
from urllib.error import HTTPError, URLError
from bs4 import BeautifulSoup
import random
import time

from src.common.exception import ExtractError


def get_soup(url: str = None):
user_agent_lst = ['Googlebot', 'Yeti', 'Daumoa', 'Twitterbot']
user_agent = user_agent_lst[random.randint(0, len(user_agent_lst) - 1)]
headers = {'User-Agent': user_agent}

try:
req = urllib.request.Request(url, headers=headers)
page = urlopen(req)
html = page.read().decode("utf-8")
soup = BeautifulSoup(html, "html.parser")
except (HTTPError, URLError) as e:
err = ExtractError(
code=000,
message=f"**{url}** HTTPError/URLError. Sleep 5 and continue.",
log=e
)
time.sleep(5) # TODO 이 경우 해당 url에 대해 재실행 필요
except (ValueError) as e:
err = ExtractError(
code=000,
message=f"**{url}** ValueError. Ignore this url parameter.",
log=e
)
print(err)
soup = None # TODO 해당 url 무시
else:
time.sleep(random.random())
return soup
61 changes: 61 additions & 0 deletions brickstudy_ingestion/src/scrapper/brand_name_getter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json

from src.common.aws.s3_uploader import S3Uploader
from src.scrapper.models import OliveyoungBrand


def get_latest_dt():
return '2024-08-20'


def category_checker(category: list) -> bool:
"""
standard 기준 카테고리에 하나라도 속해있으면 True 반환, 아니라면 False 반환
"""
compare = set([c.split('_')[0] for c in category])
standard = {'메이크업', '스킨케어', '향수', '헤어케어', '바디케어', '마스크팩',
'클렌징', '선케어', '더모코스메틱', '맨즈케어'}
if len(compare & standard) > 0:
return True
return False


def filter_brand(file_content: str) -> list:
filtered = []
for line in file_content.split('\n'):
if line == '':
break
for brandname, brandinfo in json.loads(line).items():
brandinfo_dic = OliveyoungBrand(**brandinfo)
if category_checker(brandinfo_dic.category):
filtered.append(brandname)
return filtered


def get_brand_list_fr_s3():
s3_client = S3Uploader().s3_client
bucket = 'brickstudy'

def file_keys_getter():
paginator = s3_client.get_paginator('list_objects_v2')
prefix = f"bronze/viral/oliveyoung/{get_latest_dt()}"
file_key_lst = []
for page in paginator.paginate(
Bucket=bucket,
Prefix=prefix
):
if 'Contents' in page:
for obj in page['Contents']:
file_key_lst.append(obj['Key'])
return file_key_lst

file_key_lst = file_keys_getter()
filtered_brand_lst = []
for filekey in file_key_lst:
response = s3_client.get_object(
Bucket=bucket,
Key=filekey
)
file_content = response['Body'].read().decode('utf-8')
filtered_brand_lst += filter_brand(file_content)
return filtered_brand_lst
103 changes: 0 additions & 103 deletions brickstudy_ingestion/src/scrapper/browser.py

This file was deleted.

Loading