คอลัมน์ PySpark ผูกอย่างชาญฉลาด

มีวิธีใดใน PySpark ที่จะเชื่อมโยงสองเฟรมข้อมูลเหมือนกับที่เราทำ cbind ใน r หรือไม่?

ตัวอย่าง:

  1. Data frame 1 มี 10 คอลัมน์
  2. Data frame 2 มี 1 คอลัมน์

ฉันต้องรวมทั้ง data frame และสร้างเป็น data frame เดียวใน PySpark


person Vigneshwar Thiyagarajan    schedule 30.08.2017    source แหล่งที่มา


คำตอบ (2)


ก่อนอื่นเรามาสร้าง dataframes ของเรากันก่อน:

df1 = spark.createDataFrame(sc.parallelize([10*[c] for c in range(10)]), ["c"+ str(i) for i in range(10)])
df2 = spark.createDataFrame(sc.parallelize([[c] for c in range(10, 20, 1)]), ["c10"])
    +---+---+---+---+---+---+---+---+---+---+
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|
    +---+---+---+---+---+---+---+---+---+---+
    |  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
    |  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|
    |  2|  2|  2|  2|  2|  2|  2|  2|  2|  2|
    |  3|  3|  3|  3|  3|  3|  3|  3|  3|  3|
    |  4|  4|  4|  4|  4|  4|  4|  4|  4|  4|
    |  5|  5|  5|  5|  5|  5|  5|  5|  5|  5|
    |  6|  6|  6|  6|  6|  6|  6|  6|  6|  6|
    |  7|  7|  7|  7|  7|  7|  7|  7|  7|  7|
    |  8|  8|  8|  8|  8|  8|  8|  8|  8|  8|
    |  9|  9|  9|  9|  9|  9|  9|  9|  9|  9|
    +---+---+---+---+---+---+---+---+---+---+

    +---+
    |c10|
    +---+
    | 10|
    | 11|
    | 12|
    | 13|
    | 14|
    | 15|
    | 16|
    | 17|
    | 18|
    | 19|
    +---+

ถ้าอย่างนั้น เราต้องการระบุแถวโดยไม่ซ้ำกัน มีฟังก์ชันสำหรับ RDD ที่สามารถทำได้ zipWithIndex

from pyspark.sql.types import LongType
from pyspark.sql import Row
def zipindexdf(df):
    schema_new = df.schema.add("index", LongType(), False)
    return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)

df1_index = zipindexdf(df1)
df1_index.show()
df2_index = zipindexdf(df2)
df2_index.show()

    +---+---+---+---+---+---+---+---+---+---+-----+
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|index|
    +---+---+---+---+---+---+---+---+---+---+-----+
    |  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|    0|
    |  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|    1|
    |  2|  2|  2|  2|  2|  2|  2|  2|  2|  2|    2|
    |  3|  3|  3|  3|  3|  3|  3|  3|  3|  3|    3|
    |  4|  4|  4|  4|  4|  4|  4|  4|  4|  4|    4|
    |  5|  5|  5|  5|  5|  5|  5|  5|  5|  5|    5|
    |  6|  6|  6|  6|  6|  6|  6|  6|  6|  6|    6|
    |  7|  7|  7|  7|  7|  7|  7|  7|  7|  7|    7|
    |  8|  8|  8|  8|  8|  8|  8|  8|  8|  8|    8|
    |  9|  9|  9|  9|  9|  9|  9|  9|  9|  9|    9|
    +---+---+---+---+---+---+---+---+---+---+-----+

    +---+-----+
    |c10|index|
    +---+-----+
    | 10|    0|
    | 11|    1|
    | 12|    2|
    | 13|    3|
    | 14|    4|
    | 15|    5|
    | 16|    6|
    | 17|    7|
    | 18|    8|
    | 19|    9|
    +---+-----+

ในที่สุด เราก็สามารถเข้าร่วมได้:

df = df1_index.join(df2_index, "index", "inner")

    +-----+---+---+---+---+---+---+---+---+---+---+---+
    |index| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|
    +-----+---+---+---+---+---+---+---+---+---+---+---+
    |    0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 10|
    |    7|  7|  7|  7|  7|  7|  7|  7|  7|  7|  7| 17|
    |    6|  6|  6|  6|  6|  6|  6|  6|  6|  6|  6| 16|
    |    9|  9|  9|  9|  9|  9|  9|  9|  9|  9|  9| 19|
    |    5|  5|  5|  5|  5|  5|  5|  5|  5|  5|  5| 15|
    |    1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1| 11|
    |    3|  3|  3|  3|  3|  3|  3|  3|  3|  3|  3| 13|
    |    8|  8|  8|  8|  8|  8|  8|  8|  8|  8|  8| 18|
    |    2|  2|  2|  2|  2|  2|  2|  2|  2|  2|  2| 12|
    |    4|  4|  4|  4|  4|  4|  4|  4|  4|  4|  4| 14|
    +-----+---+---+---+---+---+---+---+---+---+---+---+
person MaFF    schedule 30.08.2017
comment
สิ่งนี้ใช้ไม่ได้กับ DataFrame ขนาดใหญ่สองตัวที่แยกกันซึ่งอาจจัดเก็บไว้ในพาร์ติชั่นที่แตกต่างกัน และ DataFrame แต่ละอันจะถูกแยกระหว่างพาร์ติชั่นในแถวที่ต่างกัน จากเอกสารประกอบ การใช้งานปัจจุบันทำให้ ID พาร์ติชันอยู่ที่ 31 บิตบน และหมายเลขบันทึกภายในแต่ละพาร์ติชันอยู่ที่ 33 บิตล่าง - person Clay; 13.02.2018
comment
คุณพูดถูก ฉันไม่อยากจะเชื่อเลยว่าฉันจะเขียนแบบนั้น... จำนวน MonotonicallyIncreasingID มีต้นกำเนิดที่แตกต่างกันในทุกงาน - person MaFF; 13.02.2018
comment
ฟังก์ชัน rdd ที่อ้างถึงบ่อยครั้ง zipwithindex ทำงานในลักษณะเดียวกัน - person Clay; 17.02.2018
comment
zipWithIndex เป็นวิธีระบุแถวที่ระบุ การใช้ฟังก์ชันหน้าต่างกับกรอบข้อมูลทั้งหมดนั้นไม่มีประสิทธิภาพอย่างมาก ฉันขอแนะนำให้คุณทดสอบและใช้ %timeit - person MaFF; 20.02.2018
comment
ตอนนี้คุณพูดถูก zipWithIndexไม่ทำงานในลักษณะเดียวกัน ฉันจะแก้ไขคำตอบของฉัน อย่างไรก็ตาม เนื่องจากฟังก์ชันหน้าต่างได้รับการประเมินอย่างเกียจคร้าน การทดสอบจังหวะเวลาจึงไม่ตรงไปตรงมา ฉันคิดว่าฉันสามารถหาเวลาไปที่ show() DataFrame ด้วยค่าเทียบเท่า .filter() ก่อนและหลังคอลัมน์ที่สร้างโดยฟังก์ชัน window ถูกเพิ่มลงใน DataFrame - person Clay; 27.02.2018

หากต้องการรับคอลัมน์ที่มี ID ที่เพิ่มขึ้นซ้ำซาก และ ต่อเนื่องกัน ให้ใช้สิ่งต่อไปนี้กับ DataFrame แต่ละตัวของคุณ โดยที่ colName คือชื่อคอลัมน์ที่คุณต้องการจัดเรียง DataFrame แต่ละรายการ โดย.

import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

window = (
    W.partitionBy(F.lit(0))
    .orderBy('colName')
    .rowsBetween(W.unboundedPreceding, W.currentRow)
)

df = (df
 .withColumn('int', F.lit(1))
 .withColumn('consec_id', F.sum('int').over(window))
 .drop('int')
)

หากต้องการตรวจสอบว่าทุกอย่างเรียงกันอย่างถูกต้อง ให้ใช้โค้ดต่อไปนี้เพื่อดูส่วนท้ายหรือ rownums สุดท้ายของ DataFrame

rownums = 10
df.where(F.col('consec_id')>df.count()-rownums).show()

ใช้โค้ดต่อไปนี้เพื่อดูแถวตั้งแต่ start_row ถึง end_row ของ DataFrame

start_row = 20
end_row = 30
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()

#อัปเดต

อีกวิธีหนึ่งที่ใช้ได้ผลคือวิธี RDD zipWithIndex() หากต้องการแก้ไข DataFrame ที่มีอยู่ด้วยคอลัมน์ของรหัสต่อเนื่องกันโดยใช้วิธี RDD นี้ ฉัน:

  1. แปลง df เป็น RDD
  2. ใช้วิธี zipWithIndex()
  3. แปลง RDD ที่ส่งคืนเป็น DataFrame
  4. แปลง DataFrame เป็น RDD
  5. แมปฟังก์ชันแลมบ์ดา RDD เพื่อรวมวัตถุแถว RDD ของ DataFrame ดั้งเดิมเข้ากับดัชนี
  6. แปลง RDD สุดท้ายเป็น DataFrame ด้วยชื่อคอลัมน์ดั้งเดิม + คอลัมน์ ID จากจำนวนเต็มที่สร้างโดย zipWithIndex()

ฉันยังลองใช้วิธีการแก้ไข DataFrame ดั้งเดิมด้วยคอลัมน์ดัชนีที่มีเอาต์พุต zipWithIndex() คล้ายกับที่ @MaFF ทำ แต่ผลลัพธ์ก็ช้ากว่าด้วยซ้ำ ฟังก์ชันหน้าต่างจะมีลำดับความสำคัญเร็วกว่าสิ่งใดสิ่งหนึ่งเหล่านี้ เวลาที่เพิ่มขึ้นส่วนใหญ่นี้ดูเหมือนจะมาจากการแปลง DataFrame เป็น RDD และกลับมาอีกครั้ง

โปรดแจ้งให้เราทราบหากมีวิธีที่เร็วกว่าในการเพิ่มเอาต์พุตของวิธี zipWithIndex() RDD เป็นคอลัมน์ใน DataFrame ดั้งเดิม

การทดสอบบน DataFrame คอลัมน์ 90 แถว 42,000 แถวให้ผลลัพธ์ดังต่อไปนี้

import time

def test_zip(df):
  startTime = time.time()
  df_1 = df \
  .rdd.zipWithIndex().toDF() \
  .rdd.map(lambda row: (row._1) + (row._2,)) \
  .toDF(df.columns + ['consec_id'])

  start_row = 20000
  end_row = 20010
  df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
  endTime = time.time() - startTime
  return str(round(endTime,3)) + " seconds"

[test_zip(df) for _ in range(5)]

['59.813 วินาที', '39.574 วินาที', '36.074 วินาที', '35.436 วินาที', '35.636 วินาที']

import time
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

def test_win(df):
  startTime = time.time()
  window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
  df_2 = df \
  .withColumn('int', F.lit(1)) \
  .withColumn('IDcol', F.sum('int').over(window)) \
  .drop('int')

  start_row = 20000
  end_row = 20010
  df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
  endTime = time.time() - startTime
  return str(round(endTime,3)) + " seconds"  

[test_win(df) for _ in range(5)]

['4.19 วินาที', '4.508 วินาที', '4.099 วินาที', '4.012 วินาที', '4.045 วินาที']

import time
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as T

def test_zip2(df):
  startTime = time.time()
  schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)])
  df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)

  start_row = 20000
  end_row = 20010
  df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show()
  endTime = time.time() - startTime
  return str(round(endTime,3)) + " seconds"

[test_zip2(testdf) for _ in range(5)]

['82.795 วินาที', '61.689 วินาที', '58.181 วินาที', '58.01 วินาที', '57.765 วินาที']

person Clay    schedule 17.02.2018