Spark Scala Split DataFrame ตามช่วงค่าบางช่วง

สมมติว่าฉันมี dataframe ที่มีคอลัมน์ชื่อ x โดยมีช่วงค่าเป็น [0, 1] ฉันหวังว่าจะแบ่งตามค่าของคอลัมน์ x โดยมีช่วงเช่น [0, 0.1), [0.1, 0.2)...[0.9, 1] มีวิธีที่ดีและรวดเร็วในการทำเช่นนั้นหรือไม่? ฉันใช้ Spark 2 ใน Scala

อัปเดต: ตามหลักการแล้วควรมี 10 dataframes ใหม่ที่มีข้อมูลสำหรับแต่ละช่วง


person Mr.cysl    schedule 29.05.2017    source แหล่งที่มา


คำตอบ (2)


การขยายโซลูชันของ @Psidom สำหรับการสร้างช่วง ต่อไปนี้เป็นวิธีหนึ่งในการสร้าง dataframe สำหรับแต่ละช่วง:

import org.apache.spark.sql.types.IntegerType
val df = Seq(0.2, 0.71, 0.95, 0.33, 0.28, 0.8, 0.73).toDF("x")
val df2 = df.withColumn("g", ($"x" * 10.0).cast(IntegerType))

df2.show
+----+---+
|   x|  g|
+----+---+
| 0.2|  2|
|0.71|  7|
|0.95|  9|
|0.33|  3|
|0.28|  2|
| 0.8|  8|
|0.73|  7|
+----+---+

val dfMap = df2.select($"g").distinct.
  collect.
  flatMap(_.toSeq).
  map( g => g -> df2.where($"g" === g) ).
  toMap

dfMap.getOrElse(3, null).show
+----+---+
|   x|  g|
+----+---+
|0.33|  3|
+----+---+

dfMap.getOrElse(7, null).show
+----+---+
|   x|  g|
+----+---+
|0.71|  7|
|0.73|  7|
+----+---+

[อัปเดต]

หากช่วงของคุณไม่สม่ำเสมอ คุณสามารถกำหนดฟังก์ชันที่จะจับคู่ Double เข้ากับรหัสช่วง Int ที่เกี่ยวข้อง จากนั้นล้อมด้วย UDF ดังตัวอย่างต่อไปนี้:

val g: Double => Int = x => x match {
  case x if (x >= 0.0 && x < 0.12345) => 1
  case x if (x >= 0.12345 && x < 0.4834) => 2
  case x if (x >= 0.4834 && x < 1.0) => 3
  case _ => 99  // catch-all
}

val groupUDF = udf(g)

val df = Seq(0.1, 0.2, 0.71, 0.95, 0.03, 0.09, 0.44, 5.0).toDF("x")
val df2 = df.withColumn("g", groupUDF($"x"))

df2.show
+----+---+
|   x|  g|
+----+---+
| 0.1|  1|
| 0.2|  2|
|0.71|  3|
|0.95|  3|
|0.03|  1|
|0.09|  1|
|0.44|  2|
| 5.0| 99|
+----+---+
person Leo C    schedule 29.05.2017
comment
คุณช่วยอธิบายเพิ่มเติมอีกเล็กน้อยเกี่ยวกับ dfMap ได้ไหม และคุณคิดว่าวิธีนี้มีประสิทธิภาพหรือไม่? ขอบคุณ! - person Mr.cysl; 29.05.2017
comment
การแปลงสำหรับการสร้าง dfMap ซึ่งเป็น Map โดยมี g ที่แตกต่างกันเป็นคีย์และ dataframes (โดยมีค่า g ที่สอดคล้องกัน) เป็นค่า ซึ่งเกี่ยวข้องกับการรวบรวม g ที่แตกต่างกันเป็นอาร์เรย์ภายในเครื่อง การแปลงเป็นรายการ และการสร้าง Map ของ dataframes ที่กรองแล้วสำหรับทุกองค์ประกอบ ในรายการ การแปลงเหล่านี้ใช้กับชุดข้อมูลทั้งหมดดังนั้นจึงไม่แพง ฉันว่ามันคุ้มค่าที่จะทำถ้าคุณวางแผนที่จะทำการคำนวณอย่างกว้างขวางกับดาต้าเฟรมที่เลือกใน dfMap - person Leo C; 29.05.2017
comment
ฉันเห็นสิ่งที่คุณชี้ สิ่งที่ฉันพยายามทำคือคำนวณผลรวมของ x สำหรับแต่ละหมวดหมู่ (g ที่นี่) มีวิธีที่ดีในการทำเช่นนั้นหรือไม่? - person Mr.cysl; 29.05.2017
comment
ในกรณีนั้น ไม่จำเป็นต้องแยก dataframes เหล่านั้นออกทุกๆ g คุณสามารถทำได้: df2.groupBy($"g").agg(sum($"x")) - person Leo C; 30.05.2017
comment
ไม่แน่ใจว่าฉันทำตามตรรกะในความคิดเห็นล่าสุดของคุณ คุณอาจต้องการเริ่มคำถามแยกต่างหากพร้อมคำอธิบายที่ละเอียดยิ่งขึ้นและกรณีตัวอย่าง - person Leo C; 30.05.2017
comment
ฉันเห็น! อย่างไรก็ตาม หากช่วงไม่ได้แบ่งเท่าๆ กัน เช่น [0, 0.12345), [0.12345, 0.4834)... มีวิธีที่ดีในการแยก (หรือ groupBy it) หรือไม่ ขอบคุณ! - person Mr.cysl; 01.06.2017
comment
@ Mr.cysl โปรดดูคำตอบเพิ่มเติมของฉัน - person Leo C; 01.06.2017
comment
udf เป็นความคิดที่ยอดเยี่ยม! ขอบคุณ! - person Mr.cysl; 01.06.2017

หากคุณต้องการแยกแยะคอลัมน์ที่พิมพ์สองครั้ง คุณอาจทำสิ่งนี้ (คูณคอลัมน์ด้วย 10 แล้วแปลงเป็นประเภทจำนวนเต็ม คอลัมน์จะถูกตัดออกเป็น 10 ถังขยะแยกกัน):

import org.apache.spark.sql.types.IntegerType

val df = Seq(0.32, 0.5, 0.99, 0.72, 0.11, 0.03).toDF("A")
// df: org.apache.spark.sql.DataFrame = [A: double]

df.withColumn("new", ($"A" * 10).cast(IntegerType)).show
+----+---+
|   A|new|
+----+---+
|0.32|  3|
| 0.5|  5|
|0.99|  9|
|0.72|  7|
|0.11|  1|
|0.03|  0|
+----+---+
person Psidom    schedule 29.05.2017
comment
ฉันต้องการ 10 dataframe ใหม่ และทำการคำนวณเพิ่มเติม ฉันกำลังคิดว่าจะใช้ .filter() แต่ฉันไม่แน่ใจว่าวิธีนี้ใช้งานได้และเร็วพอ (ข้อมูลมีขนาดใหญ่) - person Mr.cysl; 29.05.2017
comment
แทนที่จะสร้าง 10 dataframe ใหม่ คุณอาจต้องใช้ groupBy("new") แล้วทำทุกอย่างที่คุณต้องการสำหรับแต่ละกลุ่ม - person Psidom; 29.05.2017
comment
ฉันจะลองสิ่งนั้น! - person Mr.cysl; 29.05.2017