keras系列 - keras model部署與序列化於spark進行預測

How to serialize keras model and apply to all workers on spark?

Keras 先天設計上無法序列化 (serialize),只能在單機上使用 (利用cpu 或 gpu)。但我又有需要大量預測的需求,基本上查詢的到的方法都會有個類序列化的方式(ex:broadcast、pickle),將 model 傳遞到每個 worker上,達到分散的目的,本文將記錄嘗試的各種方式。

在本範例你會學到:

  • 各種傳遞檔案給 worker 的方式或套件

可能需要注意的地方:

  • 各種套件或是方法可能因為作者或是版本更新而有不同結果,若有任何疑問歡迎與我聯絡。
  • spark 叢集的機器有兩種常見的說法,一個是 workerexecutor,以下都用 worker 稱呼。
  • 我使用的 keras model 大小超過 6GB

1.broadcast

這是一個 spark 常見的傳遞參數給 worker 的方式,這裡就簡略的介紹,有需要更多資訊請見官網 pyspark.Broadcast

個人常使用的情境是當我需要讀取一份不是 spark 所產生的分散式檔案時,又需要處理這份檔案得到某個清單(或是從 SQL 處理完的清單),就會從 master 讀取進來,處理成我要的序列後再利用broadcast傳遞給所有 worker。

以下為使用方式:

#傳遞方式
b = sc.broadcast([1, 2, 3, 4, 5])
#取值方式
b.value

將參數傳入sc.broadcast後就會將相關資料帶到所有的 worker 上面,而取用的時候相當簡單, 用b.value就可以在每個 worker 取到值了。

不過此方法無法傳遞一個 model 過去,不符合需求。

2.elephas

model serialize (圖源:來自於套件作者)

這個套件的標題是Distributed Deep Learning with Keras & Spark,這個看似非常符合需求的套件但並不適合所有的情境:

  • 適合利用 spark 線上預測、線上產生結果與 spark 的 ml 非常地相似,但目前無法留下 model (請見相關 issue)
  • 是個前景看好的套件,不過目前並不符合我需要留下 model 的需求(若有理解錯誤請告知)

3.pickle

  • 網路上有非常多種 pickle 使用方式和介紹,他主要的功能就是編碼與序列化,反序列化,是各種模型非常常見的格式,大家應該相對熟悉。
    • 主要針對keras序列的方法都來自於作者 (相關介紹),有許多嘗試都是針對他的方法修改,例如此範例
    • 實測上會一直遇到奇怪的問題,一直說無法 serialize
_pickle.PicklingError: Could not serialize object: error: 'I' format requires 0 <= number <= 4294967295

最後不符合使用上的需求,模型檔案過大。

4.keras pickle wrapper

有作者自己寫出了方便一點的方法,類似懶人包,不像之前自己打包 pickle 需要寫很多東西。

簡易安裝介紹 https://github.com/wwoods/keras_pickle_wrapper

簡單的把自己的 model 丟進去 mw = KerasPickleWrapper(model)就大概完成了,以下為完整範例:

from keras.models import *
import pickle
from keras_pickle_wrapper import KerasPickleWrapper
mw = KerasPickleWrapper(model)
data = pickle.dumps(mw)
mw2 = pickle.loads(data)
#進行預測,相關參數請自行修改
a = mw2().predict([[title_0], [title_1], [title_2], [title_3], [title_4], [hour_matrix_array]], batch_size=1200, verbose=1)

不過他的底層還是用 pickle 所以問題並沒有解決,只是操作上方便許多。不符合使用上的需求,檔案過大。

5.SparkFile

SparkFile 也是常見的用法,是Spark 原生的套件,可以傳遞各種檔案到 worker(文件、py file)。一句語法就可以完成spark.sparkContext.addPyFile("/root/test.py")
用法相對單純。

相對的我想要將我的 model 當成檔案傳過去,完整範例如下,將路徑改為自己的 model 路徑即可:

from pyspark import SparkFiles
spark.sparkContext.addFile("/root/my_keras_model.h5")
model_path = '/root/my_keras_model.h5'
class Mymodel_Classifier:
    clf = None
    @staticmethod
    def is_loaded():
        return Mymodel_Classifier.clf is not None
    @staticmethod
    def load_models(config):
        path = SparkFiles.get(config)
        Mymodel_Classifier.clf = path

# Executed once per interpreter 
Mymodel_Classifier.load_models(model_path)

在測試的時候都可以正常的運行,換成我的 model,會有OOM error記憶體問題,更改 spark config 中的 memory 後並沒有改善,推測應該與 pickle 一樣 超過了可容許的大小。最後不符合使用上的需求,檔案過大。

6.Deep Learning Pipelines for Apache Spark

由 databrick 開發的套件,可以分散式訓練並讀取過去的 model 來預測

https://github.com/databricks/spark-deep-learning

  • 文章中大多數都是關於image的相關應用 在第二版 0.3.0才出現 KerasTransformer & TFTransformer 針對非image的資料訓練與處理
  • 在安裝上必須確認版本問題是否一致,否則後續問題會非常的多
  • 若是使用 gcpdataproc,啟動 cluster 時記得要加上properties參數如下:
# 請自行更換相關版本號,包含spark與此套件
--properties spark:spark.jars.packages=databricks:spark-deep-learning:1.0.0-spark2.3-s_2.11
  • 使用pyspark啟動要自己指向路徑才讀得到,後續就會方便很多,簡單操作:
import sys, glob, os
sys.path.extend(glob.glob(os.path.join(os.path.expanduser("~"), ".ivy2/jars/*.jar"))) 

num_features = 124
num_examples = 1
input_data = [{"features" : [0]*(num_features)} for i in range(num_examples)]
input_df = sqlContext.createDataFrame(input_data)

transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)
final_df = transformer.transform(input_df)
  • 跑官方的範例程式很順利,但是將自己的model放進去時遇到了幾個問題
    • 該套件無法接受多輸入與多輸出,也就是所有的輸入輸出都要自己合成一個array,相當不便,model必須重新訓練
    • 重新訓練符合該模式後,預測時一直出問題:相關issue。有兩種說法,一個是tensorbroad出問題,一個是他們使用了protobufgoogle 的套件,會出現錯誤Protobuf has a hard limit of 2GB
    • 仔細追 code 後發現 是tensorflow graph的問題,graph 有大小的限制,相關參考如下:

https://stackoverflow.com/questions/36349049/overcome-graphdef-cannot-be-larger-than-2gb-in-tensorflow
https://stackoverflow.com/questions/41439136/wide-deep-learning-for-large-data-error-graphdef-cannot-be-larger-than-2gb

最後不符合使用上的需求,依然有大小的問題。

結語

  • 雖然目前的 survey 還是沒有辦法解決我的問題(model 過大無法分散),不過過程中也發現了許多值得關注 repository,後續可期。
  • 以後要產生模型時要特別注意整個模型的大小,盡量不要超過以上所有套件限制的大小,不然在分散運算會遭遇許多困難。
  • 也提供目前的解決方式,由於服務都是在 gcp 上面進行,最後採用了gcp 的 AI platform (舊稱為 ml engine),利用它上面的 GPU 資源來加速運算速度(就是花錢解決問題拉)!

以上的分享有任何問題或是討論歡迎隨時與我聯絡。


 上一篇
GCP系列-使用 Dataproc initialization actions 安裝 python package GCP系列-使用 Dataproc initialization actions 安裝 python package
How to use dataproc initialization actions to install python packages on cluser (on all workers)?在 GCP 的 Dataproc 要啟動 py
2019-12-09
下一篇 
NLP 系列 - Tensorboard 視覺化 word2vec 詞向量 NLP 系列 - Tensorboard 視覺化 word2vec 詞向量
How to use tensorboard(Embedding Projector) to visualize/project word2vec model? word2vec 的應用已經相當的的普及,但是該模型為向量組成,充滿了一般人
2019-11-26
  目錄