Чтобы получить столбец с монотонно увеличивающимися идентификаторами, уникальными и последовательными, используйте следующее для каждого из ваших DataFrame, где colName
- это имя столбца, которое вы хотите отсортировать для каждого DataFrame. к.
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = (
W.partitionBy(F.lit(0))
.orderBy('colName')
.rowsBetween(W.unboundedPreceding, W.currentRow)
)
df = (df
.withColumn('int', F.lit(1))
.withColumn('consec_id', F.sum('int').over(window))
.drop('int')
)
Чтобы убедиться, что все выстроено правильно, используйте следующий код, чтобы посмотреть на хвост или последний rownums
фрейма данных.
rownums = 10
df.where(F.col('consec_id')>df.count()-rownums).show()
Используйте следующий код, чтобы просмотреть строки от start_row
до end_row
фрейма данных.
start_row = 20
end_row = 30
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
#Обновлять
Другой эффективный метод - это метод RDD zipWithIndex()
. Чтобы просто изменить существующий DataFrame столбцом последовательных идентификаторов с помощью этого метода RDD, я:
- преобразовал df в RDD,
- применил метод
zipWithIndex()
,
- преобразовал возвращенный RDD в DataFrame,
- преобразовал DataFrame в RDD,
- сопоставил лямбда-функцию RDD для объединения объекта строки RDD исходного DataFrame с индексами,
- преобразовал окончательный RDD в DataFrame с исходными именами столбцов + столбец идентификатора из целых чисел, созданных
zipWithIndex()
.
Я также попробовал метод изменения исходного DataFrame с помощью столбца индекса, содержащего результат zipWithIndex()
, аналогично тому, что сделал @MaFF, но результаты были еще медленнее. Оконная функция примерно на порядок быстрее, чем любая из них. Большую часть этого времени увеличение связано с преобразованием DataFrame в RDD и обратно.
Сообщите мне, есть ли более быстрый способ добавить результат zipWithIndex()
RDD в качестве столбца в исходный DataFrame.
Тестирование DataFrame на 42 000 строк и 90 столбцов дает следующее.
import time
def test_zip(df):
startTime = time.time()
df_1 = df \
.rdd.zipWithIndex().toDF() \
.rdd.map(lambda row: (row._1) + (row._2,)) \
.toDF(df.columns + ['consec_id'])
start_row = 20000
end_row = 20010
df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip(df) for _ in range(5)]
['59,813 секунды ',' 39,574 секунды ',' 36,074 секунды ', '35,436 секунды', '35,636 секунды ']
import time
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
def test_win(df):
startTime = time.time()
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df_2 = df \
.withColumn('int', F.lit(1)) \
.withColumn('IDcol', F.sum('int').over(window)) \
.drop('int')
start_row = 20000
end_row = 20010
df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_win(df) for _ in range(5)]
[«4,19 секунды», «4,508 секунды», «4,099 секунды», «4,012 секунды», «4,045 секунды»]
import time
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as T
def test_zip2(df):
startTime = time.time()
schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)])
df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
start_row = 20000
end_row = 20010
df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip2(testdf) for _ in range(5)]
['82 0,795 секунды', '61 0,689 секунды', '58 0,181 секунды', '58 0,01 секунды', '57 0,765 секунды']
person
Clay
schedule
17.02.2018