Мудрая привязка столбца PySpark

Есть ли какой-то конкретный способ в PySpark связать два фрейма данных, как мы делаем cbind в r?

Пример:

  1. Фрейм данных 1 имеет 10 столбцов
  2. Фрейм данных 2 имеет 1 столбец

Мне нужно cbind как фрейм данных, так и сделать как один фрейм данных в PySpark.


person Vigneshwar Thiyagarajan    schedule 30.08.2017    source источник


Ответы (2)


Сначала давайте создадим наши фреймы данных:

df1 = spark.createDataFrame(sc.parallelize([10*[c] for c in range(10)]), ["c"+ str(i) for i in range(10)])
df2 = spark.createDataFrame(sc.parallelize([[c] for c in range(10, 20, 1)]), ["c10"])
    +---+---+---+---+---+---+---+---+---+---+
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|
    +---+---+---+---+---+---+---+---+---+---+
    |  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
    |  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|
    |  2|  2|  2|  2|  2|  2|  2|  2|  2|  2|
    |  3|  3|  3|  3|  3|  3|  3|  3|  3|  3|
    |  4|  4|  4|  4|  4|  4|  4|  4|  4|  4|
    |  5|  5|  5|  5|  5|  5|  5|  5|  5|  5|
    |  6|  6|  6|  6|  6|  6|  6|  6|  6|  6|
    |  7|  7|  7|  7|  7|  7|  7|  7|  7|  7|
    |  8|  8|  8|  8|  8|  8|  8|  8|  8|  8|
    |  9|  9|  9|  9|  9|  9|  9|  9|  9|  9|
    +---+---+---+---+---+---+---+---+---+---+

    +---+
    |c10|
    +---+
    | 10|
    | 11|
    | 12|
    | 13|
    | 14|
    | 15|
    | 16|
    | 17|
    | 18|
    | 19|
    +---+

Затем мы хотим однозначно идентифицировать строки, есть функция для RDD, которая может это сделать zipWithIndex

from pyspark.sql.types import LongType
from pyspark.sql import Row
def zipindexdf(df):
    schema_new = df.schema.add("index", LongType(), False)
    return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)

df1_index = zipindexdf(df1)
df1_index.show()
df2_index = zipindexdf(df2)
df2_index.show()

    +---+---+---+---+---+---+---+---+---+---+-----+
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|index|
    +---+---+---+---+---+---+---+---+---+---+-----+
    |  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|    0|
    |  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|    1|
    |  2|  2|  2|  2|  2|  2|  2|  2|  2|  2|    2|
    |  3|  3|  3|  3|  3|  3|  3|  3|  3|  3|    3|
    |  4|  4|  4|  4|  4|  4|  4|  4|  4|  4|    4|
    |  5|  5|  5|  5|  5|  5|  5|  5|  5|  5|    5|
    |  6|  6|  6|  6|  6|  6|  6|  6|  6|  6|    6|
    |  7|  7|  7|  7|  7|  7|  7|  7|  7|  7|    7|
    |  8|  8|  8|  8|  8|  8|  8|  8|  8|  8|    8|
    |  9|  9|  9|  9|  9|  9|  9|  9|  9|  9|    9|
    +---+---+---+---+---+---+---+---+---+---+-----+

    +---+-----+
    |c10|index|
    +---+-----+
    | 10|    0|
    | 11|    1|
    | 12|    2|
    | 13|    3|
    | 14|    4|
    | 15|    5|
    | 16|    6|
    | 17|    7|
    | 18|    8|
    | 19|    9|
    +---+-----+

Наконец, мы можем к ним присоединиться:

df = df1_index.join(df2_index, "index", "inner")

    +-----+---+---+---+---+---+---+---+---+---+---+---+
    |index| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|
    +-----+---+---+---+---+---+---+---+---+---+---+---+
    |    0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 10|
    |    7|  7|  7|  7|  7|  7|  7|  7|  7|  7|  7| 17|
    |    6|  6|  6|  6|  6|  6|  6|  6|  6|  6|  6| 16|
    |    9|  9|  9|  9|  9|  9|  9|  9|  9|  9|  9| 19|
    |    5|  5|  5|  5|  5|  5|  5|  5|  5|  5|  5| 15|
    |    1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1| 11|
    |    3|  3|  3|  3|  3|  3|  3|  3|  3|  3|  3| 13|
    |    8|  8|  8|  8|  8|  8|  8|  8|  8|  8|  8| 18|
    |    2|  2|  2|  2|  2|  2|  2|  2|  2|  2|  2| 12|
    |    4|  4|  4|  4|  4|  4|  4|  4|  4|  4|  4| 14|
    +-----+---+---+---+---+---+---+---+---+---+---+---+
person MaFF    schedule 30.08.2017
comment
Это не работает для двух отдельных больших фреймов данных, которые могут храниться в разных разделах, и каждый фрейм данных разделен между разделами в разных строках. Из документации < / a> Текущая реализация помещает идентификатор раздела в верхний 31 бит, а номер записи в каждом разделе - в нижние 33 бита. - person Clay; 13.02.2018
comment
Вы правы, я не могу поверить, что написал это ... Количество MonotonicallyIncreasingID имеет разное происхождение для каждой задачи - person MaFF; 13.02.2018
comment
Часто цитируемая функция rdd zipwithindex работает таким же образом. - person Clay; 17.02.2018
comment
zipWithIndex - это указанный способ перечисления строк. Использование оконной функции для всего фрейма данных ужасно неэффективно. Я рекомендую вам протестировать его и использовать% timeit. - person MaFF; 20.02.2018
comment
Теперь ты прав. zipWithIndex не работает так же. Я поправлю свой ответ. Однако из-за того, что оконная функция вычисляется лениво, проверить синхронизацию не так просто. Я полагаю, я мог бы получить время show() DataFrame с эквивалентным .filter() до и после того, как столбец, созданный оконной функцией, был добавлен в DataFrame. - person Clay; 27.02.2018

Чтобы получить столбец с монотонно увеличивающимися идентификаторами, уникальными и последовательными, используйте следующее для каждого из ваших DataFrame, где colName - это имя столбца, которое вы хотите отсортировать для каждого DataFrame. к.

import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

window = (
    W.partitionBy(F.lit(0))
    .orderBy('colName')
    .rowsBetween(W.unboundedPreceding, W.currentRow)
)

df = (df
 .withColumn('int', F.lit(1))
 .withColumn('consec_id', F.sum('int').over(window))
 .drop('int')
)

Чтобы убедиться, что все выстроено правильно, используйте следующий код, чтобы посмотреть на хвост или последний rownums фрейма данных.

rownums = 10
df.where(F.col('consec_id')>df.count()-rownums).show()

Используйте следующий код, чтобы просмотреть строки от start_row до end_row фрейма данных.

start_row = 20
end_row = 30
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()

#Обновлять

Другой эффективный метод - это метод RDD zipWithIndex(). Чтобы просто изменить существующий DataFrame столбцом последовательных идентификаторов с помощью этого метода RDD, я:

  1. преобразовал df в RDD,
  2. применил метод zipWithIndex(),
  3. преобразовал возвращенный RDD в DataFrame,
  4. преобразовал DataFrame в RDD,
  5. сопоставил лямбда-функцию RDD для объединения объекта строки RDD исходного DataFrame с индексами,
  6. преобразовал окончательный RDD в DataFrame с исходными именами столбцов + столбец идентификатора из целых чисел, созданных zipWithIndex().

Я также попробовал метод изменения исходного DataFrame с помощью столбца индекса, содержащего результат zipWithIndex(), аналогично тому, что сделал @MaFF, но результаты были еще медленнее. Оконная функция примерно на порядок быстрее, чем любая из них. Большую часть этого времени увеличение связано с преобразованием DataFrame в RDD и обратно.

Сообщите мне, есть ли более быстрый способ добавить результат zipWithIndex() RDD в качестве столбца в исходный DataFrame.

Тестирование DataFrame на 42 000 строк и 90 столбцов дает следующее.

import time

def test_zip(df):
  startTime = time.time()
  df_1 = df \
  .rdd.zipWithIndex().toDF() \
  .rdd.map(lambda row: (row._1) + (row._2,)) \
  .toDF(df.columns + ['consec_id'])

  start_row = 20000
  end_row = 20010
  df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
  endTime = time.time() - startTime
  return str(round(endTime,3)) + " seconds"

[test_zip(df) for _ in range(5)]

['59,813 секунды ',' 39,574 секунды ',' 36,074 секунды ', '35,436 секунды', '35,636 секунды ']

import time
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

def test_win(df):
  startTime = time.time()
  window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
  df_2 = df \
  .withColumn('int', F.lit(1)) \
  .withColumn('IDcol', F.sum('int').over(window)) \
  .drop('int')

  start_row = 20000
  end_row = 20010
  df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
  endTime = time.time() - startTime
  return str(round(endTime,3)) + " seconds"  

[test_win(df) for _ in range(5)]

[«4,19 секунды», «4,508 секунды», «4,099 секунды», «4,012 секунды», «4,045 секунды»]

import time
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as T

def test_zip2(df):
  startTime = time.time()
  schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)])
  df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)

  start_row = 20000
  end_row = 20010
  df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show()
  endTime = time.time() - startTime
  return str(round(endTime,3)) + " seconds"

[test_zip2(testdf) for _ in range(5)]

['82 0,795 секунды', '61 0,689 секунды', '58 0,181 секунды', '58 0,01 секунды', '57 0,765 секунды']

person Clay    schedule 17.02.2018