steam 동접자 정보 to kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
import requests
import json
from confluent_kafka import Producer
import pendulum
# 상수 정의
KAFKA_TOPIC = 'topic_GetUserInfo'
KAFKA_BROKER = '172.31.2.88:9092' # Docker 환경에 맞게 수정
# 게임의 Steam 앱 ID
GAMES = {
'몬스터 헌터 월드': '582010',
'엘든 링': '1245620',
'팰월드': '1801980'
}
# Kafka Producer 설정
def create_kafka_producer():
return Producer({
'bootstrap.servers': KAFKA_BROKER
})
def get_current_players(game_id):
url = f'http://api.steampowered.com/ISteamUserStats/GetNumberOfCurrentPlayers/v1/?appid={game_id}'
res = requests.get(url)
if res.status_code == 200:
data = res.json()
return data.get('response', {}).get('player_count', 0)
return 0
def get_steam_players_data(**context):
producer = create_kafka_producer()
for game_name, game_id in GAMES.items():
player_count = get_current_players(game_id)
key = game_name # 게임 이름을 키로 사용
value = json.dumps({'game': game_name, 'player_count': player_count}) # 게임 이름과 플레이어 수를 JSON으로 변환
# Kafka로 메시지 전송
try:
producer.produce(KAFKA_TOPIC, key=key, value=value)
except Exception as e:
print(f"Kafka 전송 실패: {e}") # 에러 출력
# 전송 완료 후 대기
producer.flush()
local_tz = pendulum.timezone("Asia/Seoul")
@dag(
start_date=datetime(2024, 7, 30, tzinfo=local_tz),
schedule='* * * * *', # 매 분마다 실행
catchup=False
)
def steam_current_player_dag():
get_current_player_task = PythonOperator(
task_id='get_current_player_task',
python_callable=get_steam_players_data,
# provide_context=True, # Airflow 2.0 이상에서는 생략 가능
)
get_current_player_task # DAG에 작업 추가
# DAG 정의
dag_instance = steam_current_player_dag()
usrinfo 추가예정
This post is licensed under CC BY 4.0 by the author.