How to use pyspark connect with kafka to write and read?
在本範例你會學到:
- 於
GCP Dataproc環境中加入spark-sql-kafka的 jar 檔 - 於 pyspark 中對 kafka 進行讀/寫
在本範例你需要先準備好:
- 熟悉 gcp 環境
- kafka 的
broker - kafka 的
Topic
1.引入 jar 檔
基本上在 spark 環境都會引入各種需要的 jar 檔來完成很多事,kafka 也不例外,這裡要使用的是 spark-sql-kafka-0-10_2.11:2.3.2,相關版本號請自行依需求微調:版本列表!決定好版本後我們就開始吧~
2.啟動 Dataproc
這邊我就不贅述了,就是把 jar 檔案加入 --properties 中即可,其中的格式寫法可參考稍早之前的文章 GCP 系列-新增/啟動 Dataproc clusters 時帶入多個 properties 套件,其他參數就依照自己需求調整!
gcloud dataproc clusters create kafka-pyspark \ |
3.pyspark write data into kafka
這邊必須要填入 kafka 的 topic 與 broker,都查到這篇了我就預設大家都知道這是要幹嘛的了!
# 定義 Topic 與 Kafka_brokers(隨便打的範例) |
基本上就是填入兩個參數到相對應的位置即可,簡單易懂!
4.pyspark read data from kafka
寫的部分都這麼容易了,讀其實也是差不多的:
df = spark \ |
今天就介紹到這吧!
參考資料
更多的參數與寫法調整,請參考官網:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
properties 格式的用法請參照:
若有任何問題與指教歡迎與我聯繫,若覺得我的內容不錯麻煩幫我隨便點個廣告,謝謝。