Pyspark ไม่สนใจการกรอง dataframe ภายใน pyspark-sql-functions

สวัสดีตอนเช้า,

ฉันมีคำถามเกี่ยวกับรหัส pyspark สมมติว่าเรามี dataframe เช่นนี้:

+---------+--------+-------+--------+
| p_key_1 | p_key_2| status| value_1|
+---------+--------+-------+--------+
|       13|      42|   True|      33|
|       13|      42|   True|      12|
|       13|      42|   True|     106|
|       13|      42|  False|       0|
|       13|      42|  False|      27|
+---------+--------+-------+--------+

p_key_1 และ p_key_1 เป็นคีย์พาร์ติชั่น แต่เพื่อให้ตัวอย่างนี้เล็กลง ฉันไม่ได้เปลี่ยนแปลงมัน

แบ่งพาร์ติชันด้วย p_keys สองตัว ฉันต้องการเพิ่มคอลัมน์ที่มีค่าน้อยที่สุดของ value_1 เหนือแถวทั้งหมดด้วย status = True

ฉันต้องการทำสิ่งนี้ด้วย:

my_win= Window.partitionBy('p_key_1', 'p_key_2')
my_df.withColumn('new_col', F.min(my_df.where(F.col('status') == True).value_1).over(my_win))

ปัญหาของฉันคือ การกรองภายในฟังก์ชัน F.min(...) ถูกละเว้นโดยสิ้นเชิง ดังนั้นค่าทั้งหมดของ new_col จะกลายเป็น 0 ส่งผลให้ dataframe นี้:

+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
|       13|      42|   True|      33|      12|
|       13|      42|   True|      12|      12|
|       13|      42|   True|     106|      12|
|       13|      42|  False|       0|      12|
|       13|      42|  False|      27|      12|
+---------+--------+-------+--------+--------+

dataframe ที่ฉันต้องการได้รับมีลักษณะดังนี้:

+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
|       13|      42|   True|      33|       0|
|       13|      42|   True|      12|       0|
|       13|      42|   True|     106|       0|
|       13|      42|  False|       0|       0|
|       13|      42|  False|      27|       0|
+---------+--------+-------+--------+--------+

ดังนั้นคำถามของฉันคือ:

เหตุใดจึงใช้งานไม่ได้และมีการใช้งานทางเลือกใดบ้าง


person Nick    schedule 12.08.2019    source แหล่งที่มา
comment
เพิ่มว่า dataframe ผลลัพธ์ควรมีลักษณะอย่างไร   -  person Prathik Kini    schedule 12.08.2019
comment
เพิ่มตัวอย่างของ dataframe ที่ฉันได้รับและอันที่ฉันต้องการจริงๆ   -  person Nick    schedule 12.08.2019
comment
ฉันคิดว่าคุณต้องใช้ F.when() ภายใน F.min().over() ไม่ใช่ my_df.where()   -  person samkart    schedule 12.08.2019
comment
my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win)) นั่นควรทำตามที่คุณต้องการ   -  person samkart    schedule 12.08.2019
comment
ขอบคุณมาก มันได้ผลสำหรับฉัน หากคุณต้องการคุณสามารถโพสต์ข้อเสนอแนะของคุณเป็นคำตอบและฉันจะยอมรับมัน   -  person Nick    schedule 12.08.2019


คำตอบ (2)


วิธีที่ง่ายที่สุดในการบรรลุสิ่งที่คุณต้องการคือการใช้ when() แทน df.where()

รับตัวแปรจากตัวอย่างของคุณ -

my_win = Window.partitionBy('p_key_1', 'p_key_2') # your window spec

my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win))

ฟิลด์ new_col จะให้ค่าขั้นต่ำของฟิลด์ value_1 เฉพาะในกรณีที่ฟิลด์ status คือ True

person samkart    schedule 12.08.2019

นี่คือวิธีการ:

(
  my_df
  .withColumn('temp_col', F.when(F.col('status') == True, F.col('value_1')))
  .withColumn(
      'new_col', 
      F.min('temp_col').over(my_win)
  )
  .drop('temp_col')
)

ประเด็นคือการสร้างคอลัมน์ชั่วคราวที่คุณจัดเก็บค่าเฉพาะในกรณีที่สถานะเป็น True และ Null หากสถานะเป็น False จากนั้นคุณรับ min ของ temp_col นี้และค่า Null จะถูกละเว้น

person David Vrba    schedule 12.08.2019