Untuk mendapatkan kolom dengan ID yang meningkat secara monoton, unik dan berturut-turut, gunakan yang berikut ini pada setiap DataFrame Anda, dengan colName
adalah nama kolom yang ingin Anda urutkan setiap DataFrame oleh.
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = (
W.partitionBy(F.lit(0))
.orderBy('colName')
.rowsBetween(W.unboundedPreceding, W.currentRow)
)
df = (df
.withColumn('int', F.lit(1))
.withColumn('consec_id', F.sum('int').over(window))
.drop('int')
)
Untuk memeriksa apakah semuanya sudah tersusun dengan benar, gunakan kode berikut untuk melihat bagian ekor, atau rownums
terakhir dari DataFrame.
rownums = 10
df.where(F.col('consec_id')>df.count()-rownums).show()
Gunakan kode berikut untuk melihat baris dari start_row
hingga end_row
DataFrame.
start_row = 20
end_row = 30
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
#Memperbarui
Metode lain yang berhasil adalah metode RDD zipWithIndex()
. Untuk sekadar mengubah DataFrame yang ada dengan kolom id berturut-turut menggunakan metode RDD ini, saya:
- mengonversi df ke RDD,
- menerapkan metode
zipWithIndex()
,
- mengonversi RDD yang dikembalikan ke DataFrame,
- mengonversi DataFrame menjadi RDD,
- memetakan fungsi lambda RDD untuk menggabungkan objek baris RDD dari DataFrame asli dengan indeks,
- mengonversi RDD akhir menjadi DataFrame dengan nama kolom asli + kolom ID dari bilangan bulat yang dibuat oleh
zipWithIndex()
.
Saya juga mencoba metode mengubah DataFrame asli dengan kolom indeks yang berisi output zipWithIndex()
mirip dengan yang dilakukan @MaFF, tetapi hasilnya malah lebih lambat. Fungsi jendela berada pada urutan besarnya lebih cepat daripada keduanya. Sebagian besar peningkatan kali ini tampaknya berasal dari konversi DataFrame ke RDD dan kembali lagi.
Tolong beri tahu saya jika ada cara yang lebih cepat untuk menambahkan output metode zipWithIndex()
RDD sebagai kolom di DataFrame asli.
Pengujian pada DataFrame 42.000 baris 90 kolom menghasilkan yang berikut.
import time
def test_zip(df):
startTime = time.time()
df_1 = df \
.rdd.zipWithIndex().toDF() \
.rdd.map(lambda row: (row._1) + (row._2,)) \
.toDF(df.columns + ['consec_id'])
start_row = 20000
end_row = 20010
df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip(df) for _ in range(5)]
['59,813 detik', '39,574 detik', '36,074 detik', '35,436 detik', '35,636 detik']
import time
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
def test_win(df):
startTime = time.time()
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df_2 = df \
.withColumn('int', F.lit(1)) \
.withColumn('IDcol', F.sum('int').over(window)) \
.drop('int')
start_row = 20000
end_row = 20010
df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_win(df) for _ in range(5)]
['4,19 detik', '4,508 detik', '4,099 detik', '4,012 detik', '4,045 detik']
import time
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as T
def test_zip2(df):
startTime = time.time()
schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)])
df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
start_row = 20000
end_row = 20010
df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip2(testdf) for _ in range(5)]
['82,795 detik', '61,689 detik', '58,181 detik', '58,01 detik', '57,765 detik']
person
Clay
schedule
17.02.2018