pyspark系列-如何於 dataframe 增加索引(index)值或 row number

How to add a index number column in pyspark dataframe?

在本範例你會學到:

  • 常見的 function monotonically_increasing_id ()
  • 本範例重點 row_number function

在本範例你需要先準備好:

  • 本範例環境是 pyspark 版本 >= 2.4
  • 歡樂愉快的學習精神

1.常見的新增 index 方法

在網路上搜尋建立 index 索引的相關方法,一定會搜索到 monotonically_increasing_id 這個方法,但是若沒有仔細看文件,有可能會無法達到預期的效果喔!!

根據文件(官網連結)有三個重點:

The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.

The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits.

The assumption is that the dataframe has less than 1 billion partitions, and each partition has less than 8 billion records.

  • 此方法保證 id 是唯一的,但是不會是一個連續號碼
  • 編排號碼的方式會與 partition id有直接的關係
  • 編排號碼的方式會與 partition 筆數有一定的關係

綜合以上說法,簡單來說,如果你想要的效果是一個連續的索引號碼,1, 2, 3......100,此方法就不符合需求,他是會跳號的!但如果是單純的需要產生一個唯一的 id,不在意號碼產生方式,那此方法就會相當的合適。

2.row_number 新增 index 方法

第二種方法之於第一種的不同就是,row_number可以依照自己所需要的條件進行排序,也可以依照不同的 key 來排序(同時有很多個1,2,3…)

我們來定義一個簡單的範例,搭配 Window() 的方法會有更多的變化:

 #!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import Window

spark = SparkSession.builder.appName('index_num').getOrCreate()

# Create dataset
df = spark.createDataFrame([("Jolin", "bell fruit", "apple", "2020/05/26"), \
        ("John", "bell fruit", "banana", "2020/05/27"), \
        ("Lisa", "banana", "apple", "2020/05/28"), \
        ], ["name", "fruit_1", "fruit_2", "date"])

首先,我使用 w = Window().orderBy(F.lit("name"))來定義一個編碼的 slide window ,且使用 name 這個 column 進行排序。

接下來針對 df 進行 orderBy 的動作,寫新增一個 id 欄位給索引值(index),裡面的直將會依照條件 F.row_number().over(w) 填入號碼 1,2,3…

w = Window().orderBy(F.lit("name"))
df_order = df.orderBy("name", ascending=True)\
    .withColumn("id", F.row_number().over(w))

而得到最後的結果如下:

排序結果

這是個相對簡單的範例,個人最常使用的情境是處理 session 問題時,要排序使用者不同 session 中的瀏覽順序就可以用此方法簡單的完成 dataframe 的 index 排序,而且是可以針對不同 key 值的條件新增,而不會是單純的從頭 1,2,3…到尾,而是可以像下面的例子:

----------------------
| session| url| index|
----------------------
|   scfer|   A|     1|
|   scfer|   B|     2|
|   scfer|   C|     3|
|   tuikg|   B|     1|
|   tuikg|   C|     2|
|   redcv|   B|     1|
|   redcv|   C|     2|
|   redcv|   D|     3|
----------------------

以上就是簡單的介紹!

參考資料

若有任何問題與指教歡迎與我聯繫,若覺得我的內容不錯麻煩幫我隨便點個廣告,謝謝。

pyspark.sql.functions 其實有許多好用的小 function 可以直接使用,也不用再自己辛苦的寫 udf(User Defined function),在之後的介紹會再慢慢帶給大家。下方為今天介紹的兩種好用的小function。

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.row_number


 上一篇
GCP 系列-新增/啟動 Dataproc clusters 時帶入多個 properties 套件 GCP 系列-新增/啟動 Dataproc clusters 時帶入多個 properties 套件
How to launch/create Dataproc clusters with multi properties (jars, packages)?在本範例你會學到: 如何在 GCP Dataproc properties 中帶入
2020-06-14
下一篇 
pyspark系列-字串轉時間格式 string to datetime or timestamp pyspark系列-字串轉時間格式 string to datetime or timestamp
How to convert pyspark string to datetime?在本範例你會學到: to_date 使用方式 (New in version 2.2) unix_timestamp 使用方式 在本範例你需要先準備好:
2020-06-04
  目錄