Post

spark에서 kafka로 steam user 정보 적재

수정 필요함

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
import requests
import json
from confluent_kafka import Producer, Consumer, KafkaError
import pendulum

# 상수 정의
KAFKA_TOPIC = 'test_topic'
KAFKA_BROKER = 'localhost:9092'  # Docker 환경에 맞게 수정
API_KEY = '1D6B63E8C3375FDCE46BD38620595819'  # 발급받은 Steam API 키
SPECIFIED_USER_ID = 76561197960434622  # 지정된 Steam 사용자 ID

# Kafka Producer 설정
def create_kafka_producer():
    return Producer({'bootstrap.servers': KAFKA_BROKER})

def get_steam_players_data(**context):
    all_players = []
    user_id = SPECIFIED_USER_ID  # 지정된 ID 사용

    # Steam API에서 사용자 정보 요청
    url = f'http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={API_KEY}&steamids={user_id}'
    print(f"Requesting URL: {url}")  # 요청 URL 출력
    res = requests.get(url)
    
    if res.status_code != 200:
        print(f"Failed to fetch player data for {user_id}: {res.status_code}, Response: {res.text}")  # 오류 발생 시 출력
        return  # 오류 발생 시 종료
    
    data = res.json()
    players = data.get('response', {}).get('players', [])  # 플레이어 리스트 추출
    
    if not players:  # 플레이어가 없으면 종료
        print(f"No player found for {user_id}. Stopping.")
        return
    
    all_players.extend(players)  # 모든 플레이어 정보를 리스트에 추가
    print(f"Fetched player data for {user_id}: {players}")  # 성공적으로 가져온 데이터 출력

    context['task_instance'].xcom_push(key='players_data', value=json.dumps(all_players))  # JSON 문자열로 저장

def send_players_to_kafka(data, **context):
    # Kafka Producer 생성
    producer = create_kafka_producer()
    
    players = json.loads(data)  # XCom에서 가져온 JSON 문자열을 파싱

    for player in players:
        key = player['steamid']  # steamid를 키로 사용
        value = json.dumps(player)  # 각 플레이어 정보를 JSON 문자열로 변환
        
        # Kafka로 메시지 전송
        try:
            producer.produce(KAFKA_TOPIC, key=key, value=value)
            print(f"Sent player data to Kafka: {value}")  # 전송 성공 시 출력
        except Exception as e:
            print(f"Failed to send message to Kafka: {e}")  # 전송 실패 시 출력

    # 전송 완료 후 대기
    producer.flush()

    # Kafka 전송 결과를 XCom에 저장 (예: 전송 성공 메시지)
    context['task_instance'].xcom_push(key='kafka_send_status', value='Players data sent to Kafka successfully')

def process_and_send_to_kafka(**context):
    # XCom에서 플레이어 데이터 가져오기
    players_data = context['task_instance'].xcom_pull(key='players_data', task_ids='get_steam_players_task')

    # Kafka로 데이터 전송
    if players_data:
        send_players_to_kafka(players_data, **context)
    else:
        print("No player data found to send to Kafka.")

local_tz = pendulum.timezone("Asia/Seoul")

@dag(
    start_date=datetime(2024, 7, 25, tzinfo=local_tz),
    schedule='*/5 * * * *',  # 매 5분마다 실행
    catchup=False
)
def steam_usrinfo_dag():
    get_steam_players_task = PythonOperator(
        task_id='get_steam_players_task',
        python_callable=get_steam_players_data,
        provide_context=True
    )

    process_and_send_task = PythonOperator(
        task_id='process_and_send_task',
        python_callable=process_and_send_to_kafka,
        provide_context=True
    )

    get_steam_players_task >> process_and_send_task

steam_usrinfo_dag()

수정된 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
import requests
import json
from confluent_kafka import Producer
import pendulum

# 상수 정의
KAFKA_TOPIC = 'topic_GetUserInfo'
KAFKA_BROKER = '172.31.2.88:9092'  # Docker 환경에 맞게 수정
API_KEY = '1D6B63E8C3375FDCE46BD38620595819'  # 발급받은 Steam API 키
START_USER_ID = 76561197960434622  # 시작 Steam 사용자 ID

# Kafka Producer 설정
def create_kafka_producer():
    return Producer({
        'bootstrap.servers': KAFKA_BROKER,
        'client.id': 'airflow-producer'  # 추가적인 설정
    })

def get_steam_players_data(**context):
    all_players = []
    user_id = START_USER_ID  # 시작 사용자 ID
    player_count = 0  # 카운터 초기화
    max_players = 100  # 최대 플레이어 수 설정

    while player_count < max_players:
        url = f'http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={API_KEY}&steamids={user_id}'
        print(f"Requesting URL: {url}")  # 요청 URL 출력
        res = requests.get(url)

        if res.status_code != 200:
            print(f"Failed to fetch player data for {user_id}: {res.status_code}, Response: {res.text}")  # 오류 발생 시 출력
            user_id -= 1  # 사용자 ID 감소
            continue  # 다음 사용자로 계속 진행

        data = res.json()
        players = data.get('response', {}).get('players', [])  # 플레이어 리스트 추출
        
        if players:  # 플레이어가 있으면 추가
            all_players.extend(players)  # 모든 플레이어 정보를 리스트에 추가
            player_count += len(players)  # 가져온 플레이어 수 카운트
            print(f"Fetched player data for {user_id}: {players}")  # 성공적으로 가져온 데이터 출력
        else:  # 플레이어가 없으면 다음 사용자로 진행
            print(f"No player found for {user_id}. Proceeding to next user.")
        
        user_id -= 1  # 사용자 ID 감소

    context['task_instance'].xcom_push(key='players_data', value=json.dumps(all_players))  # JSON 문자열로 저장

def send_players_to_kafka(**context):
    """ 변환된 플레이어 데이터를 Kafka에 전송하는 함수 """
    producer = create_kafka_producer()
    
    # XCom에서 플레이어 데이터 가져오기
    data = context['task_instance'].xcom_pull(task_ids='get_steam_players_task', key='players_data')
    
    if data is None:
        print("No data found in XCom. Aborting Kafka send.")
        return

    print(f"Data retrieved from XCom: {data}")  # XCom에서 가져온 데이터 출력
    players = json.loads(data)  # XCom에서 가져온 JSON 문자열을 파싱

    for player in players:
        key = player['steamid']  # steamid를 키로 사용
        value = json.dumps(player)  # 각 플레이어 정보를 JSON 문자열로 변환
        
        # Kafka로 메시지 전송
        try:
            producer.produce(KAFKA_TOPIC, key=key, value=value, callback=delivery_report)
            print(f"Sent player data to Kafka: {value}")  # 전송 성공 시 출력
        except Exception as e:
            print(f"Failed to send message to Kafka: {str(e)}")  # 전송 실패 시 출력

    # 전송 완료 후 대기
    producer.flush()

    # Kafka 전송 결과를 XCom에 저장 (예: 전송 성공 메시지)
    context['task_instance'].xcom_push(key='kafka_send_status', value='Players data sent to Kafka successfully')

def delivery_report(err, msg):
    """ 메시지 전송 결과를 확인하는 콜백 함수 """
    if err is not None:
        print(f"Message delivery failed: {err}")  # 에러 메시지 출력
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}] with key {msg.key()}")

local_tz = pendulum.timezone("Asia/Seoul")

@dag(
    start_date=datetime(2024, 7, 26, tzinfo=local_tz),
    schedule='*/5 * * * *',  # 매 5분마다 실행
    catchup=False
)
def steam_usrinfo_dag():
    get_steam_players_task = PythonOperator(
        task_id='get_steam_players_task',
        python_callable=get_steam_players_data,
        provide_context=True,
    )

    send_players_to_kafka_task = PythonOperator(
        task_id='send_players_to_kafka_task',
        python_callable=send_players_to_kafka,
        provide_context=True,
    )

    get_steam_players_task >> send_players_to_kafka_task  # 의존성 설정

# DAG 정의
dag_instance = steam_usrinfo_dag()

이슈: api 데이터에 잡다한 메세지가 섞임

순수 api 데이터만 kafka로 전송하는 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
import requests
import json
from confluent_kafka import Producer
import pendulum

# 상수 정의
KAFKA_TOPIC = 'topic_GetUserInfo'
KAFKA_BROKER = '172.31.2.88:9092'  # Docker 환경에 맞게 수정
API_KEY = '1D6B63E8C3375FDCE46BD38620595819'  # 발급받은 Steam API 키
START_USER_ID = 76561197960434622  # 시작 Steam 사용자 ID

# Kafka Producer 설정
def create_kafka_producer():
    return Producer({
        'bootstrap.servers': KAFKA_BROKER,
        'client.id': 'airflow-producer'  # 추가적인 설정
    })

def get_steam_players_data(**context):
    all_players = []
    user_id = START_USER_ID  # 시작 사용자 ID
    player_count = 0  # 카운터 초기화
    max_players = 10  # 최대 플레이어 수 설정

    while player_count < max_players:
        url = f'http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={API_KEY}&steamids={user_id}'
        res = requests.get(url)

        if res.status_code != 200:
            user_id -= 1  # 사용자 ID 감소
            continue  # 다음 사용자로 계속 진행

        data = res.json()
        players = data.get('response', {}).get('players', [])  # 플레이어 리스트 추출
        
        if players:  # 플레이어가 있으면 추가
            all_players.extend(players)  # 모든 플레이어 정보를 리스트에 추가
            player_count += len(players)  # 가져온 플레이어 수 카운트
        
        user_id -= 1  # 사용자 ID 감소

    context['task_instance'].xcom_push(key='players_data', value=json.dumps(all_players))  # JSON 문자열로 저장

def send_players_to_kafka(**context):
    """ 변환된 플레이어 데이터를 Kafka에 전송하는 함수 """
    producer = create_kafka_producer()
    
    # XCom에서 플레이어 데이터 가져오기
    data = context['task_instance'].xcom_pull(task_ids='get_steam_players_task', key='players_data')
    
    if data is None:
        return

    players = json.loads(data)  # XCom에서 가져온 JSON 문자열을 파싱

    for player in players:
        key = player['steamid']  # steamid를 키로 사용
        value = json.dumps(player)  # 각 플레이어 정보를 JSON 문자열로 변환
        
        # Kafka로 메시지 전송
        try:
            producer.produce(KAFKA_TOPIC, key=key, value=value)
        except Exception:
            pass  # 에러를 무시하고 계속 진행

    # 전송 완료 후 대기
    producer.flush()

local_tz = pendulum.timezone("Asia/Seoul")

@dag(
    start_date=datetime(2024, 7, 26, tzinfo=local_tz),
    schedule='*/5 * * * *',  # 매 5분마다 실행
    catchup=False
)
def steam_usrinfo_dag():
    get_steam_players_task = PythonOperator(
        task_id='get_steam_players_task',
        python_callable=get_steam_players_data,
        provide_context=True,
    )

    send_players_to_kafka_task = PythonOperator(
        task_id='send_players_to_kafka_task',
        python_callable=send_players_to_kafka,
        provide_context=True,
    )

    get_steam_players_task >> send_players_to_kafka_task  # 의존성 설정

# DAG 정의
dag_instance = steam_usrinfo_dag()
This post is licensed under CC BY 4.0 by the author.