How to use pyspark connect with kafka to write and read?
在本範例你會學到:
- 於 
GCP Dataproc環境中加入spark-sql-kafka的 jar 檔 - 於 pyspark 中對 kafka 進行讀/寫
 
在本範例你需要先準備好:
- 熟悉 gcp 環境
 - kafka 的 
broker - kafka 的 
Topic 
1.引入 jar 檔
基本上在 spark 環境都會引入各種需要的 jar 檔來完成很多事,kafka 也不例外,這裡要使用的是 spark-sql-kafka-0-10_2.11:2.3.2,相關版本號請自行依需求微調:版本列表!決定好版本後我們就開始吧~
2.啟動 Dataproc
這邊我就不贅述了,就是把 jar 檔案加入 --properties 中即可,其中的格式寫法可參考稍早之前的文章 GCP 系列-新增/啟動 Dataproc clusters 時帶入多個 properties 套件,其他參數就依照自己需求調整!
gcloud dataproc clusters create kafka-pyspark \
    --region=global \
    --bucket my-bucket \
    --master-boot-disk-size 100GB \
    --num-workers 2 \
    --worker-boot-disk-size 500GB  \
    --worker-machine-type n1-standard-4 \
    --zone asia-east1-b  \
    --labels pro=report,user=test  \
    --image-version=1.4 \
    --metadata 'PIP_PACKAGES=configparser==4.0.2 PyMySQL==0.9.3'\
    --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \    
    --properties 'spark:spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2,dataproc:dataproc.monitoring.stackdriver.enable=true'
3.pyspark write data into kafka
這邊必須要填入 kafka 的 topic 與 broker,都查到這篇了我就預設大家都知道這是要幹嘛的了!
# 定義 Topic 與 Kafka_brokers(隨便打的範例)
Topic = "test.data.com.tw"
Kafka_brokers =  "11.242.290.235:9092"
df.select(to_json(struct("*")).alias("value"))\
    .write\
    .format("kafka")\
    .option("kafka.bootstrap.servers", Kafka_brokers)\
    .option("topic", Topic)\
    .save()
基本上就是填入兩個參數到相對應的位置即可,簡單易懂!
4.pyspark read data from kafka
寫的部分都這麼容易了,讀其實也是差不多的:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", Kafka_brokers) \
  .option("subscribe", Topic) \
  .load()
今天就介紹到這吧!
參考資料
更多的參數與寫法調整,請參考官網:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
properties 格式的用法請參照:
若有任何問題與指教歡迎與我聯繫,若覺得我的內容不錯麻煩幫我隨便點個廣告,謝謝。