Kolom Pyspark DataFrame berdasarkan nilai DataFrame lainnya

Saya memiliki dua DataFrame:

df1= 
+---+----------+
| id|filter    |
+---+----------+
|  1|       YES|
|  2|        NO|
|  3|        NO|
+---+----------+

df2 = 
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|                   1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|XXXXXX              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|YYYYYY              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

Yang ingin saya lakukan adalah membuat kolom baru di df1, memfilter nama bidang di df2 berdasarkan nilai baris df1. Output saya akan seperti ini:

df3 =
+---+----------+----------------+
| id|filter    | value          |
+---+----------+----------------+
|  1|       YES|[XXXXXX, YYYYYY]|
|  2|        NO|        []      |
|  3|        NO|        []      |
+---+----------+----------------+

Saya tahu cara melakukannya dengan Pandas, tapi saya tidak tahu cara melakukannya dengan PySpark.

Saya sudah mencoba yang berikut ini, tetapi sepertinya tidak berhasil:

df3 = df1.withColumn('value', f.when(df1['filter'] == 'YES', df2.select(f.col('id')).collect()).otherwise(f.lit([]))

Terima kasih banyak


person FranG91    schedule 26.06.2020    source sumber


Jawaban (1)


Muat data pengujian yang disediakan

  val data1 =
      """
        | id|filter
        |  1|       YES
        |  2|        NO
        |  3|        NO
      """.stripMargin
    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.printSchema()
    df1.show(false)
    /**
      * root
      * |-- id: integer (nullable = true)
      * |-- filter: string (nullable = true)
      *
      * +---+------+
      * |id |filter|
      * +---+------+
      * |1  |YES   |
      * |2  |NO    |
      * |3  |NO    |
      * +---+------+
      */

    val data2 =
      """
        |                   1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15
        |XXXXXX              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN
        |YYYYYY              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN
      """.stripMargin
    val stringDS2 = data2.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS2)
    df2.printSchema()
    df2.show(false)
    /**
      * root
      * |-- 1: string (nullable = true)
      * |-- 2: double (nullable = true)
      * |-- 3: double (nullable = true)
      * |-- 4: double (nullable = true)
      * |-- 5: double (nullable = true)
      * |-- 6: double (nullable = true)
      * |-- 7: double (nullable = true)
      * |-- 8: double (nullable = true)
      * |-- 9: double (nullable = true)
      * |-- 10: double (nullable = true)
      * |-- 11: double (nullable = true)
      * |-- 12: double (nullable = true)
      * |-- 13: double (nullable = true)
      * |-- 14: double (nullable = true)
      * |-- 15: double (nullable = true)
      *
      * +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
      * |1     |2  |3  |4  |5  |6  |7  |8  |9  |10 |11 |12 |13 |14 |15 |
      * +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
      * |XXXXXX|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
      * |YYYYYY|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
      * +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
      */

lelehkan/lepaskan kerangka data 2 lalu gabung


    val stringCol = df2.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")

    val processedDF = df2.selectExpr(s"stack(${df2.columns.length}, $stringCol) as (id, value)")
    processedDF.show(false)

    /**
      * +---+------+
      * |id |value |
      * +---+------+
      * |1  |XXXXXX|
      * |2  |NaN   |
      * |3  |NaN   |
      * |4  |NaN   |
      * |5  |NaN   |
      * |6  |NaN   |
      * |7  |NaN   |
      * |8  |NaN   |
      * |9  |NaN   |
      * |10 |NaN   |
      * |11 |NaN   |
      * |12 |NaN   |
      * |13 |NaN   |
      * |14 |NaN   |
      * |15 |NaN   |
      * |1  |YYYYYY|
      * |2  |NaN   |
      * |3  |NaN   |
      * |4  |NaN   |
      * |5  |NaN   |
      * +---+------+
      * only showing top 20 rows
      */
    df1.join(processedDF, "id")
      .groupBy("id", "filter")
      .agg(collect_list("value").as("value"))
      .selectExpr("id", "filter", "FILTER(value, x -> x != 'NaN') as value")
      .show(false)

    /**
      * +---+------+----------------+
      * |id |filter|value           |
      * +---+------+----------------+
      * |2  |NO    |[]              |
      * |1  |YES   |[XXXXXX, YYYYYY]|
      * |3  |NO    |[]              |
      * +---+------+----------------+
      */
person Som    schedule 26.06.2020
comment
Terima kasih banyak! Tahukah Anda cara menerapkannya dengan Python? (bagian yang meleleh/tidak berputar) - person FranG91; 28.06.2020
comment
ambil bantuan dari ini dan ubah menjadi python - stackoverflow.com/a/62574110/4758823 - person Som; 28.06.2020