Python에서 실시간 데이터 스트리밍을 구현하는 방법

Python Eseo Silsigan Deiteo Seuteuliming Eul Guhyeonhaneun Bangbeob



Python에서 실시간 데이터 스트리밍 구현을 마스터하는 것은 오늘날 데이터와 관련된 세계에서 필수적인 기술로 작용합니다. 이 가이드에서는 Python에서 실시간 데이터 스트리밍을 확실하게 활용하기 위한 핵심 단계와 필수 도구를 살펴봅니다. Apache Kafka 또는 Apache Pulsar와 같은 피팅 프레임워크 선택부터 손쉬운 데이터 소비, 처리 및 효과적인 시각화를 위한 Python 코드 작성에 이르기까지 민첩하고 효율적인 실시간 데이터 채널을 구성하는 데 필요한 기술을 습득합니다.

예제 1: Python에서 실시간 데이터 스트리밍 구현

Python에서 실시간 데이터 스트리밍을 구현하는 것은 오늘날의 데이터 중심 시대와 세계에서 매우 중요합니다. 이 상세한 예에서는 Google Colab에서 Apache Kafka와 Python을 사용하여 실시간 데이터 스트리밍 시스템을 구축하는 과정을 안내합니다.







코딩을 시작하기 전에 예제를 초기화하려면 Google Colab에서 특정 환경을 구축하는 것이 필수적입니다. 가장 먼저 해야 할 일은 필요한 라이브러리를 설치하는 것입니다. Kafka 통합을 위해 'kafka-python' 라이브러리를 사용합니다.



! 설치하다 카프카 파이썬


이 명령은 Python 기능과 Apache Kafka에 대한 바인딩을 제공하는 'kafka-python' 라이브러리를 설치합니다. 다음으로 프로젝트에 필요한 라이브러리를 가져옵니다. 'KafkaProducer' 및 'KafkaConsumer'를 포함한 필수 라이브러리 가져오기는 Kafka 브로커와 상호 작용할 수 있게 해주는 'kafka-python' 라이브러리의 클래스입니다. JSON은 메시지를 직렬화 및 역직렬화하는 데 사용하는 JSON 데이터로 작업하는 Python 라이브러리입니다.



kafka에서 KafkaProducer, KafkaConsumer 가져오기
JSON 가져오기


카프카 프로듀서 생성





Kafka 생산자가 데이터를 Kafka 주제로 보내기 때문에 이는 중요합니다. 이 예에서는 시뮬레이션된 실시간 데이터를 'real-time-topic'이라는 주제로 보내는 생산자를 만듭니다.

Kafka 브로커의 주소를 'localhost:9092'로 지정하는 'KafkaProducer' 인스턴스를 생성합니다. 그런 다음 데이터를 Kafka로 보내기 전에 직렬화하는 함수인 'value_serializer'를 사용합니다. 우리의 경우 람다 함수는 데이터를 UTF-8로 인코딩된 JSON으로 인코딩합니다. 이제 일부 실시간 데이터를 시뮬레이션하여 Kafka 주제로 보내겠습니다.



프로듀서 = 카프카프로듀서 ( bootstrap_servers = '로컬호스트:9092' ,
value_serializer =람다 v: json.dumps ( ~에 ) .인코드 ( 'UTF-8' ) )
# 시뮬레이션된 실시간 데이터
데이터 = { '센서_ID' : 1 , '온도' : 25.5 , '습기' : 60.2 }
# 주제에 데이터 보내기
생산자.보내기 ( '실시간 주제' , 데이터 )


이 줄에서는 시뮬레이션된 센서 데이터를 나타내는 '데이터' 사전을 정의합니다. 그런 다음 'send' 메소드를 사용하여 이 데이터를 'real-time-topic'에 게시합니다.

그런 다음 Kafka 소비자를 생성하려고 하며 Kafka 소비자는 Kafka 주제에서 데이터를 읽습니다. '실시간 주제'의 메시지를 소비하고 처리하기 위한 소비자를 생성합니다. (실시간 주제) 및 Kafka 브로커 주소와 같이 사용하려는 주제를 지정하여 'KafkaConsumer' 인스턴스를 생성합니다. 그리고 'value_deserializer'는 Kafka로부터 받은 데이터를 deserialize하는 함수입니다. 우리의 경우 람다 함수는 데이터를 UTF-8로 인코딩된 JSON으로 디코딩합니다.

소비자 = KafkaConsumer ( '실시간 주제' ,
bootstrap_servers = '로컬호스트:9092' ,
value_deserializer =람다 x: json.loads ( x.디코드 ( 'utf-8' ) ) )


반복 루프를 사용하여 주제의 메시지를 지속적으로 소비하고 처리합니다.

# 실시간 데이터 읽기 및 처리
~을 위한 메시지 ~에 소비자:
데이터 = 메시지.값
인쇄 ( 에프 '수신된 데이터: {데이터}' )


루프 내에서 각 메시지의 값과 시뮬레이션된 센서 데이터를 검색하여 콘솔에 인쇄합니다. Kafka 생산자 및 소비자를 실행하려면 Google Colab에서 이 코드를 실행하고 코드 셀을 개별적으로 실행해야 합니다. 생산자는 시뮬레이션된 데이터를 Kafka 토픽으로 보내고, 소비자는 수신된 데이터를 읽고 인쇄합니다.


코드 실행에 따른 출력 분석

실시간으로 생산되고 소비되는 데이터를 관찰해보겠습니다. 데이터 형식은 시뮬레이션이나 실제 데이터 소스에 따라 달라질 수 있습니다. 이 상세한 예에서는 Google Colab에서 Apache Kafka 및 Python을 사용하여 실시간 데이터 스트리밍 시스템을 설정하는 전체 프로세스를 다룹니다. 우리는 이 시스템을 구축하는 데 있어 각 코드 줄과 그 중요성을 설명할 것입니다. 실시간 데이터 스트리밍은 강력한 기능이며 이 예는 보다 복잡한 실제 애플리케이션의 기반이 됩니다.

예제 2: 주식 시장 데이터를 사용하여 Python에서 실시간 데이터 스트리밍 구현

다른 시나리오를 사용하여 Python에서 실시간 데이터 스트리밍을 구현하는 또 다른 독특한 예를 살펴보겠습니다. 이번에는 주식시장 데이터에 집중하겠습니다. Google Colab에서 Apache Kafka 및 Python을 사용하여 주가 변동을 캡처하고 처리하는 실시간 데이터 스트리밍 시스템을 만듭니다. 이전 예에서 설명한 것처럼 Google Colab에서 환경을 구성하는 것부터 시작합니다. 먼저 필요한 라이브러리를 설치합니다.

! 설치하다 카프카 파이썬 yfinance


여기에 실시간 주식 시장 데이터를 얻을 수 있는 'yfinance' 라이브러리를 추가합니다. 다음으로 필요한 라이브러리를 가져옵니다. 우리는 Kafka 상호 작용을 위해 'kafka-python' 라이브러리의 'KafkaProducer' 및 'KafkaConsumer' 클래스를 계속 사용합니다. JSON 데이터를 사용하기 위해 JSON을 가져옵니다. 또한 실시간 주식 시장 데이터를 얻기 위해 “yfinance”를 사용합니다. 또한 실시간 업데이트를 시뮬레이션하기 위해 시간 지연을 추가하기 위해 '시간' 라이브러리를 가져옵니다.

kafka에서 KafkaProducer, KafkaConsumer 가져오기
JSON 가져오기
수입금융 ~처럼 yf
수입 시간


이제 주식 데이터용 Kafka 생산자를 만듭니다. Kafka 생산자는 실시간 주식 데이터를 가져와 'stock-price'라는 Kafka 주제로 보냅니다.

프로듀서 = 카프카프로듀서 ( bootstrap_servers = '로컬호스트:9092' ,
value_serializer =람다 v: json.dumps ( ~에 ) .인코드 ( 'utf-8' ) )

~하는 동안 진실:
주식 = yf.티커 ( 'AAPL' ) # 예: Apple Inc. 주식
주식_데이터 = 주식.역사 ( 기간 = '1일' )
last_price = 재고_데이터 [ '닫다' ] .iloc [ - 1 ]
데이터 = { '상징' : 'AAPL' , '가격' : 마지막 가격 }
생산자.보내기 ( '주가' , 데이터 )
시간.수면 ( 10 ) # 10초마다 실시간 업데이트 시뮬레이션


이 코드에서는 Kafka 브로커의 주소를 사용하여 'KafkaProducer' 인스턴스를 생성합니다. 루프 내에서 'yfinance'를 사용하여 Apple Inc.('AAPL')의 최신 주가를 가져옵니다. 그런 다음 최종 종가를 추출하여 “stock-price” 주제로 보냅니다. 결국 우리는 10초마다 실시간 업데이트를 시뮬레이션하기 위해 시간 지연을 도입했습니다.

'stock-price' 주제에서 주가 데이터를 읽고 처리하는 Kafka 소비자를 만들어 보겠습니다.

소비자 = KafkaConsumer ( '주가' ,
bootstrap_servers = '로컬호스트:9092' ,
value_deserializer =람다 x: json.loads ( x.디코드 ( 'utf-8' ) ) )

~을 위한 메시지 ~에 소비자:
stock_data = 메시지.값
인쇄 ( 에프 '수신된 주식 데이터: {stock_data['symbol']} - 가격: {stock_data['price']}' )


이 코드는 이전 예제의 소비자 설정과 유사합니다. 'stock-price' 주제의 메시지를 지속적으로 읽고 처리하며 주식 기호와 가격을 콘솔에 인쇄합니다. 생산자와 소비자를 실행하기 위해 Google Colab에서 코드 셀을 순차적으로(예: 하나씩) 실행합니다. 생산자는 실시간 주가 업데이트를 받고 전송하는 반면 소비자는 이 데이터를 읽고 표시합니다.

! 설치하다 카프카 파이썬 yfinance
kafka에서 KafkaProducer, KafkaConsumer 가져오기
JSON 가져오기
수입금융 ~처럼 yf
수입 시간
프로듀서 = 카프카프로듀서 ( bootstrap_servers = '로컬호스트:9092' ,
value_serializer =람다 v: json.dumps ( ~에 ) .인코드 ( 'UTF-8' ) )

~하는 동안 진실:
주식 = yf.티커 ( 'AAPL' ) # 애플 주식
주식_데이터 = 주식.역사 ( 기간 = '1일' )
last_price = 재고_데이터 [ '닫다' ] .iloc [ - 1 ]

데이터 = { '상징' : 'AAPL' , '가격' : 마지막 가격 }

생산자.보내기 ( '주가' , 데이터 )

시간.수면 ( 10 ) # 10초마다 실시간 업데이트 시뮬레이션
소비자 = KafkaConsumer ( '주가' ,
bootstrap_servers = '로컬호스트:9092' ,
value_deserializer =람다 x: json.loads ( x.디코드 ( 'utf-8' ) ) )

~을 위한 메시지 ~에 소비자:
stock_data = 메시지.값
인쇄 ( 에프 '수신된 주식 데이터: {stock_data['symbol']} - 가격: {stock_data['price']}' )


코드 실행 후 출력 분석에서는 Apple Inc.의 실시간 주가 업데이트가 생산되고 소비되는 것을 관찰합니다.

결론

이 독특한 예에서는 주식 시장 데이터를 캡처하고 처리하기 위해 Apache Kafka와 'yfinance' 라이브러리를 사용하여 Python에서 실시간 데이터 스트리밍을 구현하는 방법을 시연했습니다. 코드의 각 줄을 철저하게 설명했습니다. 실시간 데이터 스트리밍은 다양한 분야에 적용되어 금융, IoT 등의 실제 애플리케이션을 구축할 수 있습니다.