pyspark系列-如何使用pyspark連結kafka教學

How to use pyspark connect with kafka to write and read?

在本範例你會學到:

  • GCP Dataproc 環境中加入spark-sql-kafka 的 jar 檔
  • 於 pyspark 中對 kafka 進行讀/寫

在本範例你需要先準備好:

  • 熟悉 gcp 環境
  • kafka 的 broker
  • kafka 的 Topic

1.引入 jar 檔

基本上在 spark 環境都會引入各種需要的 jar 檔來完成很多事,kafka 也不例外,這裡要使用的是 spark-sql-kafka-0-10_2.11:2.3.2,相關版本號請自行依需求微調:版本列表!決定好版本後我們就開始吧~

2.啟動 Dataproc

這邊我就不贅述了,就是把 jar 檔案加入 --properties 中即可,其中的格式寫法可參考稍早之前的文章 GCP 系列-新增/啟動 Dataproc clusters 時帶入多個 properties 套件,其他參數就依照自己需求調整!

gcloud dataproc clusters create kafka-pyspark \
    --region=global \
    --bucket my-bucket \
    --master-boot-disk-size 100GB \
    --num-workers 2 \
    --worker-boot-disk-size 500GB  \
    --worker-machine-type n1-standard-4 \
    --zone asia-east1-b  \
    --labels pro=report,user=test  \
    --image-version=1.4 \
    --metadata 'PIP_PACKAGES=configparser==4.0.2 PyMySQL==0.9.3'\
    --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \    
    --properties 'spark:spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2,dataproc:dataproc.monitoring.stackdriver.enable=true'

3.pyspark write data into kafka

這邊必須要填入 kafka 的 topicbroker,都查到這篇了我就預設大家都知道這是要幹嘛的了!

# 定義 Topic 與 Kafka_brokers(隨便打的範例)

Topic = "test.data.com.tw"
Kafka_brokers =  "11.242.290.235:9092"

df.select(to_json(struct("*")).alias("value"))\
    .write\
    .format("kafka")\
    .option("kafka.bootstrap.servers", Kafka_brokers)\
    .option("topic", Topic)\
    .save()

基本上就是填入兩個參數到相對應的位置即可,簡單易懂!

4.pyspark read data from kafka

寫的部分都這麼容易了,讀其實也是差不多的:

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", Kafka_brokers) \
  .option("subscribe", Topic) \
  .load()

今天就介紹到這吧!

參考資料

更多的參數與寫法調整,請參考官網:

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

properties 格式的用法請參照:

https://chilunhuang.github.io/posts/14115/

若有任何問題與指教歡迎與我聯繫,若覺得我的內容不錯麻煩幫我隨便點個廣告,謝謝。


 上一篇
pyspark系列-如何使用pyspark連結clickhouse教學 pyspark系列-如何使用pyspark連結clickhouse教學
How to use pyspark connect with cilckhouse to read?在本範例你會學到: 於 GCP Dataproc 環境中加入ru.yandex.clickhouse:clickhouse-jdbc:0
2020-10-14
下一篇 
登山系列-玉山一日單攻(主峰)攻略&紀錄 登山系列-玉山一日單攻(主峰)攻略&紀錄
玉山一日單攻(主峰)攻略&紀錄登上玉山的計畫從 2020/03 就開始了,但是可以的時間都一直抽不到簽,超級沒有籤運QQ,與排雲山莊非常的沒有緣分,只好開始準備玉山一日單攻抽籤。很幸運的在8月改制後(先搶先贏改成電腦選)就中籤了,也
2020-10-08
  目錄