Доброе утро,
У меня вопрос по поводу pyspark-кода. Предполагая, что у нас есть такой фрейм данных:
+---------+--------+-------+--------+
| p_key_1 | p_key_2| status| value_1|
+---------+--------+-------+--------+
| 13| 42| True| 33|
| 13| 42| True| 12|
| 13| 42| True| 106|
| 13| 42| False| 0|
| 13| 42| False| 27|
+---------+--------+-------+--------+
p_key_1
и p_key_1
- ключи раздела, но, чтобы уменьшить размер этого примера, я не менял их.
Разделенный на два p_keys
, я хочу добавить столбец, который содержит минимум value_1
по всем строкам с status = True
.
Я хотел сделать это с помощью:
my_win= Window.partitionBy('p_key_1', 'p_key_2')
my_df.withColumn('new_col', F.min(my_df.where(F.col('status') == True).value_1).over(my_win))
Моя проблема в том, что фильтрация внутри функции F.min(...)
полностью игнорируется, так что все значения new_col
в конечном итоге равны 0, в результате чего получается этот фрейм данных:
+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
| 13| 42| True| 33| 12|
| 13| 42| True| 12| 12|
| 13| 42| True| 106| 12|
| 13| 42| False| 0| 12|
| 13| 42| False| 27| 12|
+---------+--------+-------+--------+--------+
Фрейм данных, который я хотел получить, выглядит так:
+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
| 13| 42| True| 33| 0|
| 13| 42| True| 12| 0|
| 13| 42| True| 106| 0|
| 13| 42| False| 0| 0|
| 13| 42| False| 27| 0|
+---------+--------+-------+--------+--------+
Итак, мой вопрос:
Почему это не работает и какие существуют альтернативные реализации?
F.when()
внутриF.min().over()
, а неmy_df.where()
. - person samkart   schedule 12.08.2019my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win))
Это должно делать то, что вы описываете, что хотите. - person samkart   schedule 12.08.2019