มีวิธีใดใน PySpark ที่จะเชื่อมโยงสองเฟรมข้อมูลเหมือนกับที่เราทำ cbind ใน r หรือไม่?
ตัวอย่าง:
- Data frame 1 มี 10 คอลัมน์
- Data frame 2 มี 1 คอลัมน์
ฉันต้องรวมทั้ง data frame และสร้างเป็น data frame เดียวใน PySpark
มีวิธีใดใน PySpark ที่จะเชื่อมโยงสองเฟรมข้อมูลเหมือนกับที่เราทำ cbind ใน r หรือไม่?
ตัวอย่าง:
ฉันต้องรวมทั้ง data frame และสร้างเป็น data frame เดียวใน PySpark
ก่อนอื่นเรามาสร้าง dataframes ของเรากันก่อน:
df1 = spark.createDataFrame(sc.parallelize([10*[c] for c in range(10)]), ["c"+ str(i) for i in range(10)])
df2 = spark.createDataFrame(sc.parallelize([[c] for c in range(10, 20, 1)]), ["c10"])
+---+---+---+---+---+---+---+---+---+---+
| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|
+---+---+---+---+---+---+---+---+---+---+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2|
| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3|
| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4|
| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5|
| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6|
| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7|
| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8|
| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9|
+---+---+---+---+---+---+---+---+---+---+
+---+
|c10|
+---+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
ถ้าอย่างนั้น เราต้องการระบุแถวโดยไม่ซ้ำกัน มีฟังก์ชันสำหรับ RDD
ที่สามารถทำได้ zipWithIndex
from pyspark.sql.types import LongType
from pyspark.sql import Row
def zipindexdf(df):
schema_new = df.schema.add("index", LongType(), False)
return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
df1_index = zipindexdf(df1)
df1_index.show()
df2_index = zipindexdf(df2)
df2_index.show()
+---+---+---+---+---+---+---+---+---+---+-----+
| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|index|
+---+---+---+---+---+---+---+---+---+---+-----+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2|
| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3|
| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4|
| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5|
| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6|
| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7|
| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8|
| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9|
+---+---+---+---+---+---+---+---+---+---+-----+
+---+-----+
|c10|index|
+---+-----+
| 10| 0|
| 11| 1|
| 12| 2|
| 13| 3|
| 14| 4|
| 15| 5|
| 16| 6|
| 17| 7|
| 18| 8|
| 19| 9|
+---+-----+
ในที่สุด เราก็สามารถเข้าร่วมได้:
df = df1_index.join(df2_index, "index", "inner")
+-----+---+---+---+---+---+---+---+---+---+---+---+
|index| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|
+-----+---+---+---+---+---+---+---+---+---+---+---+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 10|
| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 17|
| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 16|
| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 19|
| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 15|
| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 11|
| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 13|
| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 18|
| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 12|
| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 14|
+-----+---+---+---+---+---+---+---+---+---+---+---+
MonotonicallyIncreasingID
มีต้นกำเนิดที่แตกต่างกันในทุกงาน
- person MaFF; 13.02.2018
zipwithindex
ทำงานในลักษณะเดียวกัน
- person Clay; 17.02.2018
zipWithIndex
เป็นวิธีระบุแถวที่ระบุ การใช้ฟังก์ชันหน้าต่างกับกรอบข้อมูลทั้งหมดนั้นไม่มีประสิทธิภาพอย่างมาก ฉันขอแนะนำให้คุณทดสอบและใช้ %timeit
- person MaFF; 20.02.2018
zipWithIndex
ไม่ทำงานในลักษณะเดียวกัน ฉันจะแก้ไขคำตอบของฉัน อย่างไรก็ตาม เนื่องจากฟังก์ชันหน้าต่างได้รับการประเมินอย่างเกียจคร้าน การทดสอบจังหวะเวลาจึงไม่ตรงไปตรงมา ฉันคิดว่าฉันสามารถหาเวลาไปที่ show()
DataFrame ด้วยค่าเทียบเท่า .filter()
ก่อนและหลังคอลัมน์ที่สร้างโดยฟังก์ชัน window ถูกเพิ่มลงใน DataFrame
- person Clay; 27.02.2018
หากต้องการรับคอลัมน์ที่มี ID ที่เพิ่มขึ้นซ้ำซาก และ ต่อเนื่องกัน ให้ใช้สิ่งต่อไปนี้กับ DataFrame แต่ละตัวของคุณ โดยที่ colName
คือชื่อคอลัมน์ที่คุณต้องการจัดเรียง DataFrame แต่ละรายการ โดย.
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = (
W.partitionBy(F.lit(0))
.orderBy('colName')
.rowsBetween(W.unboundedPreceding, W.currentRow)
)
df = (df
.withColumn('int', F.lit(1))
.withColumn('consec_id', F.sum('int').over(window))
.drop('int')
)
หากต้องการตรวจสอบว่าทุกอย่างเรียงกันอย่างถูกต้อง ให้ใช้โค้ดต่อไปนี้เพื่อดูส่วนท้ายหรือ rownums
สุดท้ายของ DataFrame
rownums = 10
df.where(F.col('consec_id')>df.count()-rownums).show()
ใช้โค้ดต่อไปนี้เพื่อดูแถวตั้งแต่ start_row
ถึง end_row
ของ DataFrame
start_row = 20
end_row = 30
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
#อัปเดต
อีกวิธีหนึ่งที่ใช้ได้ผลคือวิธี RDD zipWithIndex()
หากต้องการแก้ไข DataFrame ที่มีอยู่ด้วยคอลัมน์ของรหัสต่อเนื่องกันโดยใช้วิธี RDD นี้ ฉัน:
zipWithIndex()
zipWithIndex()
ฉันยังลองใช้วิธีการแก้ไข DataFrame ดั้งเดิมด้วยคอลัมน์ดัชนีที่มีเอาต์พุต zipWithIndex()
คล้ายกับที่ @MaFF ทำ แต่ผลลัพธ์ก็ช้ากว่าด้วยซ้ำ ฟังก์ชันหน้าต่างจะมีลำดับความสำคัญเร็วกว่าสิ่งใดสิ่งหนึ่งเหล่านี้ เวลาที่เพิ่มขึ้นส่วนใหญ่นี้ดูเหมือนจะมาจากการแปลง DataFrame เป็น RDD และกลับมาอีกครั้ง
โปรดแจ้งให้เราทราบหากมีวิธีที่เร็วกว่าในการเพิ่มเอาต์พุตของวิธี zipWithIndex()
RDD เป็นคอลัมน์ใน DataFrame ดั้งเดิม
การทดสอบบน DataFrame คอลัมน์ 90 แถว 42,000 แถวให้ผลลัพธ์ดังต่อไปนี้
import time
def test_zip(df):
startTime = time.time()
df_1 = df \
.rdd.zipWithIndex().toDF() \
.rdd.map(lambda row: (row._1) + (row._2,)) \
.toDF(df.columns + ['consec_id'])
start_row = 20000
end_row = 20010
df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip(df) for _ in range(5)]
['59.813 วินาที', '39.574 วินาที', '36.074 วินาที', '35.436 วินาที', '35.636 วินาที']
import time
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
def test_win(df):
startTime = time.time()
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df_2 = df \
.withColumn('int', F.lit(1)) \
.withColumn('IDcol', F.sum('int').over(window)) \
.drop('int')
start_row = 20000
end_row = 20010
df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_win(df) for _ in range(5)]
['4.19 วินาที', '4.508 วินาที', '4.099 วินาที', '4.012 วินาที', '4.045 วินาที']
import time
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as T
def test_zip2(df):
startTime = time.time()
schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)])
df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
start_row = 20000
end_row = 20010
df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip2(testdf) for _ in range(5)]
['82.795 วินาที', '61.689 วินาที', '58.181 วินาที', '58.01 วินาที', '57.765 วินาที']