Ikatan bijaksana kolom PySpark

Apakah ada cara khusus di PySpark untuk melakukan cbind dua bingkai data seperti yang kita lakukan cbind di r?

Contoh:

  1. Bingkai data 1 memiliki 10 kolom
  2. Bingkai data 2 memiliki 1 kolom

Saya perlu menggabungkan kedua bingkai data dan menjadikannya sebagai satu bingkai data di PySpark.


person Vigneshwar Thiyagarajan    schedule 30.08.2017    source sumber


Jawaban (2)


Pertama mari kita buat kerangka data kita:

df1 = spark.createDataFrame(sc.parallelize([10*[c] for c in range(10)]), ["c"+ str(i) for i in range(10)])
df2 = spark.createDataFrame(sc.parallelize([[c] for c in range(10, 20, 1)]), ["c10"])
    +---+---+---+---+---+---+---+---+---+---+
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|
    +---+---+---+---+---+---+---+---+---+---+
    |  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
    |  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|
    |  2|  2|  2|  2|  2|  2|  2|  2|  2|  2|
    |  3|  3|  3|  3|  3|  3|  3|  3|  3|  3|
    |  4|  4|  4|  4|  4|  4|  4|  4|  4|  4|
    |  5|  5|  5|  5|  5|  5|  5|  5|  5|  5|
    |  6|  6|  6|  6|  6|  6|  6|  6|  6|  6|
    |  7|  7|  7|  7|  7|  7|  7|  7|  7|  7|
    |  8|  8|  8|  8|  8|  8|  8|  8|  8|  8|
    |  9|  9|  9|  9|  9|  9|  9|  9|  9|  9|
    +---+---+---+---+---+---+---+---+---+---+

    +---+
    |c10|
    +---+
    | 10|
    | 11|
    | 12|
    | 13|
    | 14|
    | 15|
    | 16|
    | 17|
    | 18|
    | 19|
    +---+

Lalu kita ingin mengidentifikasi baris secara unik, ada fungsi untuk RDD yang bisa melakukan ini zipWithIndex

from pyspark.sql.types import LongType
from pyspark.sql import Row
def zipindexdf(df):
    schema_new = df.schema.add("index", LongType(), False)
    return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)

df1_index = zipindexdf(df1)
df1_index.show()
df2_index = zipindexdf(df2)
df2_index.show()

    +---+---+---+---+---+---+---+---+---+---+-----+
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|index|
    +---+---+---+---+---+---+---+---+---+---+-----+
    |  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|    0|
    |  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|    1|
    |  2|  2|  2|  2|  2|  2|  2|  2|  2|  2|    2|
    |  3|  3|  3|  3|  3|  3|  3|  3|  3|  3|    3|
    |  4|  4|  4|  4|  4|  4|  4|  4|  4|  4|    4|
    |  5|  5|  5|  5|  5|  5|  5|  5|  5|  5|    5|
    |  6|  6|  6|  6|  6|  6|  6|  6|  6|  6|    6|
    |  7|  7|  7|  7|  7|  7|  7|  7|  7|  7|    7|
    |  8|  8|  8|  8|  8|  8|  8|  8|  8|  8|    8|
    |  9|  9|  9|  9|  9|  9|  9|  9|  9|  9|    9|
    +---+---+---+---+---+---+---+---+---+---+-----+

    +---+-----+
    |c10|index|
    +---+-----+
    | 10|    0|
    | 11|    1|
    | 12|    2|
    | 13|    3|
    | 14|    4|
    | 15|    5|
    | 16|    6|
    | 17|    7|
    | 18|    8|
    | 19|    9|
    +---+-----+

Akhirnya, kita bisa bergabung dengan mereka:

df = df1_index.join(df2_index, "index", "inner")

    +-----+---+---+---+---+---+---+---+---+---+---+---+
    |index| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|
    +-----+---+---+---+---+---+---+---+---+---+---+---+
    |    0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 10|
    |    7|  7|  7|  7|  7|  7|  7|  7|  7|  7|  7| 17|
    |    6|  6|  6|  6|  6|  6|  6|  6|  6|  6|  6| 16|
    |    9|  9|  9|  9|  9|  9|  9|  9|  9|  9|  9| 19|
    |    5|  5|  5|  5|  5|  5|  5|  5|  5|  5|  5| 15|
    |    1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1| 11|
    |    3|  3|  3|  3|  3|  3|  3|  3|  3|  3|  3| 13|
    |    8|  8|  8|  8|  8|  8|  8|  8|  8|  8|  8| 18|
    |    2|  2|  2|  2|  2|  2|  2|  2|  2|  2|  2| 12|
    |    4|  4|  4|  4|  4|  4|  4|  4|  4|  4|  4| 14|
    +-----+---+---+---+---+---+---+---+---+---+---+---+
person MaFF    schedule 30.08.2017
comment
Ini tidak berfungsi untuk dua DataFrame besar terpisah yang mungkin disimpan di partisi berbeda dan setiap DataFrame dipisahkan antar partisi pada baris berbeda. Dari dokumentasi Implementasi saat ini menempatkan ID partisi di 31 bit atas, dan nomor record dalam setiap partisi di 33 bit bawah. - person Clay; 13.02.2018
comment
Anda benar, saya tidak percaya saya menulis itu... Hitungan MonotonicallyIncreasingID memiliki asal yang berbeda pada setiap tugas - person MaFF; 13.02.2018
comment
Fungsi rdd yang sering dikutip zipwithindex bekerja dengan cara yang sama. - person Clay; 17.02.2018
comment
zipWithIndex adalah cara yang ditunjukkan untuk menghitung baris. Menggunakan fungsi jendela pada seluruh bingkai data sangat tidak efisien. Saya mendorong Anda untuk mengujinya dan menggunakan %timeit. - person MaFF; 20.02.2018
comment
Sekarang kamu benar. zipWithIndex tidak bekerja dengan cara yang sama. Saya akan mengubah jawaban saya. Namun, karena fungsi jendela dievaluasi dengan lambat, pengujian waktunya tidak mudah. Saya kira saya bisa mendapatkan waktu untuk show() DataFrame dengan .filter() yang setara sebelum dan sesudah kolom yang dibuat oleh fungsi jendela ditambahkan ke DataFrame. - person Clay; 27.02.2018

Untuk mendapatkan kolom dengan ID yang meningkat secara monoton, unik dan berturut-turut, gunakan yang berikut ini pada setiap DataFrame Anda, dengan colName adalah nama kolom yang ingin Anda urutkan setiap DataFrame oleh.

import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

window = (
    W.partitionBy(F.lit(0))
    .orderBy('colName')
    .rowsBetween(W.unboundedPreceding, W.currentRow)
)

df = (df
 .withColumn('int', F.lit(1))
 .withColumn('consec_id', F.sum('int').over(window))
 .drop('int')
)

Untuk memeriksa apakah semuanya sudah tersusun dengan benar, gunakan kode berikut untuk melihat bagian ekor, atau rownums terakhir dari DataFrame.

rownums = 10
df.where(F.col('consec_id')>df.count()-rownums).show()

Gunakan kode berikut untuk melihat baris dari start_row hingga end_row DataFrame.

start_row = 20
end_row = 30
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()

#Memperbarui

Metode lain yang berhasil adalah metode RDD zipWithIndex(). Untuk sekadar mengubah DataFrame yang ada dengan kolom id berturut-turut menggunakan metode RDD ini, saya:

  1. mengonversi df ke RDD,
  2. menerapkan metode zipWithIndex(),
  3. mengonversi RDD yang dikembalikan ke DataFrame,
  4. mengonversi DataFrame menjadi RDD,
  5. memetakan fungsi lambda RDD untuk menggabungkan objek baris RDD dari DataFrame asli dengan indeks,
  6. mengonversi RDD akhir menjadi DataFrame dengan nama kolom asli + kolom ID dari bilangan bulat yang dibuat oleh zipWithIndex().

Saya juga mencoba metode mengubah DataFrame asli dengan kolom indeks yang berisi output zipWithIndex() mirip dengan yang dilakukan @MaFF, tetapi hasilnya malah lebih lambat. Fungsi jendela berada pada urutan besarnya lebih cepat daripada keduanya. Sebagian besar peningkatan kali ini tampaknya berasal dari konversi DataFrame ke RDD dan kembali lagi.

Tolong beri tahu saya jika ada cara yang lebih cepat untuk menambahkan output metode zipWithIndex() RDD sebagai kolom di DataFrame asli.

Pengujian pada DataFrame 42.000 baris 90 kolom menghasilkan yang berikut.

import time

def test_zip(df):
  startTime = time.time()
  df_1 = df \
  .rdd.zipWithIndex().toDF() \
  .rdd.map(lambda row: (row._1) + (row._2,)) \
  .toDF(df.columns + ['consec_id'])

  start_row = 20000
  end_row = 20010
  df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
  endTime = time.time() - startTime
  return str(round(endTime,3)) + " seconds"

[test_zip(df) for _ in range(5)]

['59,813 detik', '39,574 detik', '36,074 detik', '35,436 detik', '35,636 detik']

import time
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

def test_win(df):
  startTime = time.time()
  window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
  df_2 = df \
  .withColumn('int', F.lit(1)) \
  .withColumn('IDcol', F.sum('int').over(window)) \
  .drop('int')

  start_row = 20000
  end_row = 20010
  df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
  endTime = time.time() - startTime
  return str(round(endTime,3)) + " seconds"  

[test_win(df) for _ in range(5)]

['4,19 detik', '4,508 detik', '4,099 detik', '4,012 detik', '4,045 detik']

import time
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as T

def test_zip2(df):
  startTime = time.time()
  schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)])
  df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)

  start_row = 20000
  end_row = 20010
  df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show()
  endTime = time.time() - startTime
  return str(round(endTime,3)) + " seconds"

[test_zip2(testdf) for _ in range(5)]

['82,795 detik', '61,689 detik', '58,181 detik', '58,01 detik', '57,765 detik']

person Clay    schedule 17.02.2018