Skip to content

Commit

Permalink
[feat] add README.md & glue scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
poriz committed Mar 14, 2024
1 parent 12ae175 commit 88c39d8
Show file tree
Hide file tree
Showing 12 changed files with 471 additions and 1 deletion.
60 changes: 59 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,59 @@
-
# 스트리머의 게임 방송과 게임 흥행 사이의 관계

## 주제
실시간으로 스트리머의 게임 방송 데이터를 수집하여 스트리머가 게임 흥행에 미치는 파급력을 확인

## 활용 데이터
| source | data | link |
|-----------|---------------------------------------------------------|------|
| Chzzk | liveTitle, status, concurrentUserCount | https://chzzk.naver.com/ |
| AfreecaTV | broadcastNo, title, category, currentSumViewer | https://www.afreecatv.com/ |
| Steam | gameName, playerCount, discountPercent, positiveNum ... | https://partner.steamgames.com/doc/home |


## 활용 기술 및 프레임 워크
| 분류 | 기술 |
| --- | --- |
| 사용 언어 (Programming Language) | Python, SQL |
| 클라우드 서비스 (Cloud Service) | GCP, AWS (Terraform) |
| 데이터 베이스 (Database) | AWS S3, AWS Redshift, AWS RDS(MySQL) |
| 프로세싱 (Data Processing) | Airflow, AWS Glue(Spark), AWS Athena, AWS Lambda |
| 시각화 (Visualization) | Grafana(dashboard service) |
| 협업 도구 (Collaborative Software) | GitHub, Notion, Slack |
![architecture](/img/architecture.png)

## 프로젝트 보고서
- https://poriz98.notion.site/f49a712bdfa547308bb8516e0c278b5e?pvs=4

## 프로젝트 세부 결과
### Infra
- 시간과 추후 데이터 파이프라인의 규모를 고려하여, 제한된 리소스 내에서도 분산 환경을 고려한 Airflow 환경을 추가로 성공적으로 구축하였습니다.
### Dashboard
![home](/img/dashboard1.png)
![streamer](/img/dashboard2.png)
![game](/img/dashboard3.png)

### 결론
- 이 프로젝트는 스트리밍 플랫폼의 스트리머와 게임에 대한 실시간 통계를 제공하기 위해 효율적인 데이터 수집, 변환, 분석 인프라를 구축하였다. 이를 위해 AWS와 같은 클라우드 서비스를 활용하였고, Airflow, Redshift, Athena 등의 다양한 데이터 엔지니어링 도구를 사용하였다.
- 이 프로젝트를 통해 수집된 데이터는 스트리머의 활동 패턴, 게임의 인기도 및 시청자 수와 같은 다양한 요소를 파악하는 데 도움이 되며, 스트리밍 플랫폼의 현재 상황을 실시간으로 파악하고, 스트리밍 산업의 트렌드를 이해하는 데 기여할 수 있다.
- 데이터 분석 결과는 대시보드를 통해 시각화되어 제공되며, 스트리머, 게임, 전체 플랫폼에 대한 다양한 정보를 포함하고 있다.
사용자는 이 대시보드를 통해 스트리머와 게임의 동향을 쉽게 이해하고 분석할 수 있다. 이러한 정보는 플랫폼 운영자나 스트리머, 게임 제작사 등에게 유용한 인사이트를 제공하며, 비즈니스 전략 수립에 도움이 될 수 있다.

## 개선점 & 회고
- **Liked** 좋았던 점
- 여러가지 AWS 리소스를 사용해보고, 익숙해질 수 있었다.
- 데이터 파이프라인을 자동화 하는 과정을 심도 있게 고민하고 여러가지 기술을 사용해서 구현에 성공한 것
- 시간 및 리소스 제한을 고려하여 GCP를 사용하기로 빠르게 결정한 것
- **Learned** 배운 점
- 데이터 엔지니어링에 유효한 AWS 서비스들을 직접 사용해본 것
- 데이터 처리 과정에 있어서 ERD 및 S3구조 등 사전 설계의 중요성
- Github에 대한 여러가지 기능(Pull Request, Action, Issue, …) 사용
- **Lacked** 부족했던 점
- 코드 리뷰 및 리팩토링에 대해 고민하고 작성한 시간이 부족
- raw data compaction에 대한 DAG 개발 미진행
- 스트리머와 게임 데이터의 크기가 작아서 빅데이터 처리까지 진행 불가
- **Longed for** 갈망했던 점 (하고 싶었으나 하지 못한 것)
- 사용 가능한 리소스가 너무 부족
- Airflow 컴포넌트 분리 등, 필요한 개발 환경을 조성하는데 너무 많은 시간을 소요
- 프로젝트에 취지에 맞는 실시간성 데이터를 확보하지 못함
- 데브코스 과정에서 학습한 기술들을 전반적으로 복습 및 활용함과 동시에 실시간성을 갖춘 데이터를 찾기 힘들었음
88 changes: 88 additions & 0 deletions glue_scripts/de-2-1_followers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
## live_viewer glue script template

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

# SparkContext와 GlueContext 초기화
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Job 초기화 (Job Bookmark 활성화 포함)
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path'])
job.init(args["JOB_NAME"], args)

input_path = args['input_path']
output_path = args['output_path']

# S3에서 데이터를 읽어오는 부분
datasource = glueContext.create_dynamic_frame.from_options(
"s3",
{"paths": [input_path], "recurse": True},
format="json",
transformation_ctx="datasource",
)
# 데이터 가공
datasource_df = datasource.toDF()

chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk"))
afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca"))

# chzzk_source.printSchema()
chzzk_df = chzzk_source.select(
col("col.streamer_id").alias("STREAMER_ID"),
col("col.current_time").alias("COLLECT_TIME_CHZZK"),
col("col.content.followerCount").alias("CHZ_FOLLOWER_NUM"),
)

# afreeca_source.printSchema()
afreeca_df = afreeca_source.select(
col("col.streamer_id").alias("STREAMER_ID"),
col("col.current_time").alias("COLLECT_TIME_AFREECA"),
col("col.broad_info.station.upd.fan_cnt").alias("AFC_FOLLOWER_NUM"),
)

# Perform the outer join
result_df = chzzk_df.join(afreeca_df, on='STREAMER_ID', how='outer')

# Use when() and otherwise() to select the appropriate COLLECT_TIME
result_df = result_df.withColumn(
"COLLECT_TIME",
when(
col("COLLECT_TIME_CHZZK") != "", col("COLLECT_TIME_CHZZK")
).otherwise(
col("COLLECT_TIME_AFREECA")
)
).select(
col("STREAMER_ID"),
col("COLLECT_TIME"),
col("CHZ_FOLLOWER_NUM"),
col("AFC_FOLLOWER_NUM")
)


# 스키마 정보를 로깅
print("Schema Information:")
result_df.printSchema()

coalesce_df = result_df.coalesce(1)

# 파티션된 Spark DataFrame을 DynamicFrame으로 변환
dynamic_frame = DynamicFrame.fromDF(coalesce_df, glueContext, "dynamic_frame")


# Parquet으로 변환하여 S3에 저장
glueContext.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="s3",
connection_options={"path": output_path},
format="parquet",
transformation_ctx="dynamic_frame",
)
71 changes: 71 additions & 0 deletions glue_scripts/glue_game_ccu_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import sys

from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

# SparkContext와 GlueContext 초기화
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Job 초기화 (Job Bookmark 활성화 포함)
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path'])
job.init(args['JOB_NAME'], args)

input_path = args['input_path']
output_path = args['output_path']

# S3에서 데이터를 읽어오는 부분
datasource = glueContext.create_dynamic_frame.from_options(
"s3",
{"paths": [input_path], "recurse": True},
format="json",
transformation_ctx="datasource",
)

# 전처리를 위해 DF로 변환하기
game_ccu_datasource = datasource.toDF()

# 최상위 레벨의 key를 중심으로 explode하기
df = game_ccu_datasource.select(
explode(game_ccu_datasource.raw_game_ccu).alias("raw_game_ccu"),
col("collect_time").alias("COLLECT_TIME"),
)

df = df.select(
col("raw_game_ccu.game_id").cast("string").alias("GAME_ID"),
col("COLLECT_TIME"),
col("raw_game_ccu.player_count").alias("GAME_CCU"),
)

# 한국 시간대로 타임존을 출력하기 위해 하드코딩.
# spark에 시간대를 지정하고 'Z'를 사용하는 방식도 존재함
df = df.withColumn(
"COLLECT_TIME",
date_format(
to_timestamp("COLLECT_TIME", "yyyy-M-d H:m"),
"yyyy-MM-dd'T'HH:mm:ss'+09:00'"
)
)

dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")

# Parquet으로 변환하여 S3에 저장
glueContext.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="s3",
connection_options={"path": output_path},
format="parquet",
transformation_ctx="dynamic_frame",
)


# Job Bookmark의 상태를 최종적으로 커밋
job.commit()
88 changes: 88 additions & 0 deletions glue_scripts/glue_game_price_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import sys

from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from awsglue.utils import getResolvedOptions


def remove_krw(price):
if price is None:
return 0
return int(price.replace("₩", "").replace(",", "").strip())


# SparkContext와 GlueContext 초기화
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Job 초기화 (Job Bookmark 활성화 포함)
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path', 'collect_date'])
job.init(args['JOB_NAME'], args)

input_path = args['input_path']
output_path = args['output_path']
collect_date = args['collect_date']

# S3에서 데이터를 읽어오는 부분
datasource = glueContext.create_dynamic_frame.from_options(
"s3",
{"paths": [input_path], "recurse": True},
format="json",
transformation_ctx="datasource",
)

# 전처리를 위해 DF로 변환하기
game_price_datasource = datasource.toDF()

# 최상위 레벨의 key를 중심으로 explode하기
df = game_price_datasource.select(
explode(game_price_datasource.raw_game_price).alias("raw_game_price")
)

# 게임 가격을 전처리하는 udf 정의하기
remove_krw_udf = udf(remove_krw, StringType())

# 이제 nested된 필드(key)들에 접근할 수 있음
df = df.select(
col("raw_game_price.data.steam_appid").cast("string").alias("GAME_ID"),
to_date(lit(collect_date), "yyyy-M-d").alias("COLLECT_DATE"),
when(col("raw_game_price.data.price_overview").isNull(), 0)
.otherwise(
when(
col("raw_game_price.data.price_overview.final_formatted").isNull(), 0
).otherwise(
remove_krw_udf(col("raw_game_price.data.price_overview.final_formatted")).cast("int")
)
)
.alias("CURRENT_PRICE"),
when(col("raw_game_price.data.price_overview").isNull(), False)
.otherwise(
when(
col("raw_game_price.data.price_overview.discount_percent") == 0, False
).otherwise(True)
)
.alias("IS_DISCOUNT"),
)


dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")

# Parquet으로 변환하여 S3에 저장
glueContext.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="s3",
connection_options={"path": output_path},
format="parquet",
transformation_ctx="dynamic_frame",
)


# Job Bookmark의 상태를 최종적으로 커밋
job.commit()
63 changes: 63 additions & 0 deletions glue_scripts/glue_game_rating_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import sys

from pyspark.context import SparkContext
from pyspark.sql.functions import *

from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

# SparkContext와 GlueContext 초기화
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Job 초기화 (Job Bookmark 활성화 포함)
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path', 'collect_date'])
job.init(args['JOB_NAME'], args)

input_path = args['input_path']
output_path = args['output_path']
collect_date = args['collect_date']

# S3에서 데이터를 읽어오는 부분
datasource = glueContext.create_dynamic_frame.from_options(
"s3",
{"paths": [input_path], "recurse": True},
format="json",
transformation_ctx="datasource",
)

# 전처리를 위해 DF로 변환하기
game_rating_datasource = datasource.toDF()

# 최상위 레벨의 key를 중심으로 explode하기
df = game_rating_datasource.select(
explode(game_rating_datasource.raw_game_rating).alias("raw_game_rating")
)

df = df.select(
col("raw_game_rating.game_id").cast("string").alias("GAME_ID"),
to_date(lit(collect_date), "yyyy-M-d").alias("COLLECT_DATE"),
col("raw_game_rating.recent_positive_num").alias("RECENT_POSITIVE_NUM"),
col("raw_game_rating.recent_positive_percent").alias("RECENT_POSITIVE_PERCENT"),
col("raw_game_rating.all_positive_num").alias("ALL_POSITIVE_NUM"),
col("raw_game_rating.all_positive_percent").alias("ALL_POSITIVE_PERCENT"),
)

dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")

# Parquet으로 변환하여 S3에 저장
glueContext.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="s3",
connection_options={"path": output_path},
format="parquet",
transformation_ctx="dynamic_frame",
)


# Job Bookmark의 상태를 최종적으로 커밋
job.commit()
Loading

0 comments on commit 88c39d8

Please sign in to comment.