How to use pyspark connect with kafka to write and read?
在本範例你會學到:
- 於
GCP Dataproc
環境中加入spark-sql-kafka
的 jar 檔 - 於 pyspark 中對 kafka 進行讀/寫
在本範例你需要先準備好:
- 熟悉 gcp 環境
- kafka 的
broker
- kafka 的
Topic
1.引入 jar 檔
基本上在 spark 環境都會引入各種需要的 jar 檔來完成很多事,kafka 也不例外,這裡要使用的是 spark-sql-kafka-0-10_2.11:2.3.2
,相關版本號請自行依需求微調:版本列表!決定好版本後我們就開始吧~
2.啟動 Dataproc
這邊我就不贅述了,就是把 jar 檔案加入 --properties
中即可,其中的格式寫法可參考稍早之前的文章 GCP 系列-新增/啟動 Dataproc clusters 時帶入多個 properties 套件,其他參數就依照自己需求調整!
gcloud dataproc clusters create kafka-pyspark \
--region=global \
--bucket my-bucket \
--master-boot-disk-size 100GB \
--num-workers 2 \
--worker-boot-disk-size 500GB \
--worker-machine-type n1-standard-4 \
--zone asia-east1-b \
--labels pro=report,user=test \
--image-version=1.4 \
--metadata 'PIP_PACKAGES=configparser==4.0.2 PyMySQL==0.9.3'\
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--properties 'spark:spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2,dataproc:dataproc.monitoring.stackdriver.enable=true'
3.pyspark write data into kafka
這邊必須要填入 kafka 的 topic
與 broker
,都查到這篇了我就預設大家都知道這是要幹嘛的了!
# 定義 Topic 與 Kafka_brokers(隨便打的範例)
Topic = "test.data.com.tw"
Kafka_brokers = "11.242.290.235:9092"
df.select(to_json(struct("*")).alias("value"))\
.write\
.format("kafka")\
.option("kafka.bootstrap.servers", Kafka_brokers)\
.option("topic", Topic)\
.save()
基本上就是填入兩個參數到相對應的位置即可,簡單易懂!
4.pyspark read data from kafka
寫的部分都這麼容易了,讀其實也是差不多的:
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", Kafka_brokers) \
.option("subscribe", Topic) \
.load()
今天就介紹到這吧!
參考資料
更多的參數與寫法調整,請參考官網:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
properties 格式的用法請參照:
若有任何問題與指教歡迎與我聯繫,若覺得我的內容不錯麻煩幫我隨便點個廣告,謝謝。