Pyspark игнорирует фильтрацию фрейма данных внутри pyspark-sql-functions

Доброе утро,

У меня вопрос по поводу pyspark-кода. Предполагая, что у нас есть такой фрейм данных:

+---------+--------+-------+--------+
| p_key_1 | p_key_2| status| value_1|
+---------+--------+-------+--------+
|       13|      42|   True|      33|
|       13|      42|   True|      12|
|       13|      42|   True|     106|
|       13|      42|  False|       0|
|       13|      42|  False|      27|
+---------+--------+-------+--------+

p_key_1 и p_key_1 - ключи раздела, но, чтобы уменьшить размер этого примера, я не менял их.

Разделенный на два p_keys, я хочу добавить столбец, который содержит минимум value_1 по всем строкам с status = True.

Я хотел сделать это с помощью:

my_win= Window.partitionBy('p_key_1', 'p_key_2')
my_df.withColumn('new_col', F.min(my_df.where(F.col('status') == True).value_1).over(my_win))

Моя проблема в том, что фильтрация внутри функции F.min(...) полностью игнорируется, так что все значения new_col в конечном итоге равны 0, в результате чего получается этот фрейм данных:

+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
|       13|      42|   True|      33|      12|
|       13|      42|   True|      12|      12|
|       13|      42|   True|     106|      12|
|       13|      42|  False|       0|      12|
|       13|      42|  False|      27|      12|
+---------+--------+-------+--------+--------+

Фрейм данных, который я хотел получить, выглядит так:

+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
|       13|      42|   True|      33|       0|
|       13|      42|   True|      12|       0|
|       13|      42|   True|     106|       0|
|       13|      42|  False|       0|       0|
|       13|      42|  False|      27|       0|
+---------+--------+-------+--------+--------+

Итак, мой вопрос:

Почему это не работает и какие существуют альтернативные реализации?


person Nick    schedule 12.08.2019    source источник
comment
Также добавьте, как должен выглядеть результирующий фрейм данных   -  person Prathik Kini    schedule 12.08.2019
comment
Добавлен пример фрейма данных, который я получаю, и того, который мне действительно нужен.   -  person Nick    schedule 12.08.2019
comment
Я думаю, вам нужно использовать F.when() внутри F.min().over(), а не my_df.where().   -  person samkart    schedule 12.08.2019
comment
my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win)) Это должно делать то, что вы описываете, что хотите.   -  person samkart    schedule 12.08.2019
comment
Большое спасибо, это работает для меня. Если хотите, вы можете опубликовать свое предложение в качестве ответа, и я приму его.   -  person Nick    schedule 12.08.2019


Ответы (2)


Самый простой способ добиться того, что вам нужно, - использовать when() вместо df.where().

Взяв переменные из вашего примера -

my_win = Window.partitionBy('p_key_1', 'p_key_2') # your window spec

my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win))

Поле new_col дает минимум поля value_1, только если поле status равно True.

person samkart    schedule 12.08.2019

Вот как это сделать:

(
  my_df
  .withColumn('temp_col', F.when(F.col('status') == True, F.col('value_1')))
  .withColumn(
      'new_col', 
      F.min('temp_col').over(my_win)
  )
  .drop('temp_col')
)

Дело в том, чтобы создать временный столбец, в котором вы храните значения, только если статус равен True и Null, если статус равен False. Затем вы берете min из этого temp_col, и значения Null будут проигнорированы.

person David Vrba    schedule 12.08.2019