Pyspark mengabaikan pemfilteran kerangka data di dalam fungsi pyspark-sql

Selamat pagi,

Saya punya pertanyaan tentang beberapa kode pyspark. Dengan asumsi kita memiliki kerangka data seperti ini:

+---------+--------+-------+--------+
| p_key_1 | p_key_2| status| value_1|
+---------+--------+-------+--------+
|       13|      42|   True|      33|
|       13|      42|   True|      12|
|       13|      42|   True|     106|
|       13|      42|  False|       0|
|       13|      42|  False|      27|
+---------+--------+-------+--------+

p_key_1 dan p_key_1 adalah kunci partisi, tetapi untuk memperkecil contoh ini, saya tidak memvariasikannya.

Dipartisi oleh dua p_keys Saya ingin menambahkan kolom yang berisi minimal value_1 di semua baris dengan status = True.

Saya ingin melakukan ini dengan:

my_win= Window.partitionBy('p_key_1', 'p_key_2')
my_df.withColumn('new_col', F.min(my_df.where(F.col('status') == True).value_1).over(my_win))

Masalah saya adalah, pemfilteran di dalam fungsi F.min(...) diabaikan sepenuhnya, sehingga semua nilai new_col menjadi 0, menghasilkan kerangka data ini:

+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
|       13|      42|   True|      33|      12|
|       13|      42|   True|      12|      12|
|       13|      42|   True|     106|      12|
|       13|      42|  False|       0|      12|
|       13|      42|  False|      27|      12|
+---------+--------+-------+--------+--------+

Kerangka data yang ingin saya dapatkan terlihat seperti ini:

+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
|       13|      42|   True|      33|       0|
|       13|      42|   True|      12|       0|
|       13|      42|   True|     106|       0|
|       13|      42|  False|       0|       0|
|       13|      42|  False|      27|       0|
+---------+--------+-------+--------+--------+

Jadi pertanyaan saya adalah:

Mengapa hal ini tidak berhasil dan penerapan alternatif apa yang ada?


person Nick    schedule 12.08.2019    source sumber
comment
Tambahkan juga tampilan kerangka data yang dihasilkan   -  person Prathik Kini    schedule 12.08.2019
comment
Menambahkan contoh kerangka data yang saya dapatkan dan yang sebenarnya saya inginkan.   -  person Nick    schedule 12.08.2019
comment
Saya pikir Anda perlu menggunakan F.when() di dalam F.min().over(), dan bukan my_df.where().   -  person samkart    schedule 12.08.2019
comment
my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win)) Itu akan sesuai dengan apa yang Anda inginkan.   -  person samkart    schedule 12.08.2019
comment
Terima kasih banyak, ini berhasil untuk saya. Jika mau, Anda dapat mengirimkan saran Anda sebagai jawaban dan saya akan menerimanya.   -  person Nick    schedule 12.08.2019


Jawaban (2)


Cara termudah untuk mencapai apa yang Anda perlukan adalah dengan menggunakan when() alih-alih df.where().

Mengambil variabel dari contoh Anda -

my_win = Window.partitionBy('p_key_1', 'p_key_2') # your window spec

my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win))

Bidang new_col memberikan nilai minimum bidang value_1 hanya jika bidang status adalah True.

person samkart    schedule 12.08.2019

Berikut cara melakukannya:

(
  my_df
  .withColumn('temp_col', F.when(F.col('status') == True, F.col('value_1')))
  .withColumn(
      'new_col', 
      F.min('temp_col').over(my_win)
  )
  .drop('temp_col')
)

Intinya adalah membuat kolom temporal di mana Anda menyimpan nilai hanya jika statusnya True dan Null jika statusnya False. Kemudian Anda mengambil min dari nilai temp_col dan Null ini akan diabaikan.

person David Vrba    schedule 12.08.2019