Spark Scala Разделить DataFrame по некоторому диапазону значений

Предположим, у меня есть фрейм данных со столбцом с именем x с диапазоном значений [0, 1]. Я надеюсь разделить его по значению столбца x с такими диапазонами, как [0, 0.1), [0.1, 0.2)...[0.9, 1]. Есть ли хороший и быстрый способ сделать это? Я использую Spark 2 в Scala.

Обновление: в идеале должно быть 10 новых фреймов данных, содержащих данные для каждого диапазона.


person Mr.cysl    schedule 29.05.2017    source источник


Ответы (2)


Расширяя решение @Psidom для создания диапазонов, вот один из подходов к созданию фрейма данных для каждого диапазона:

import org.apache.spark.sql.types.IntegerType
val df = Seq(0.2, 0.71, 0.95, 0.33, 0.28, 0.8, 0.73).toDF("x")
val df2 = df.withColumn("g", ($"x" * 10.0).cast(IntegerType))

df2.show
+----+---+
|   x|  g|
+----+---+
| 0.2|  2|
|0.71|  7|
|0.95|  9|
|0.33|  3|
|0.28|  2|
| 0.8|  8|
|0.73|  7|
+----+---+

val dfMap = df2.select($"g").distinct.
  collect.
  flatMap(_.toSeq).
  map( g => g -> df2.where($"g" === g) ).
  toMap

dfMap.getOrElse(3, null).show
+----+---+
|   x|  g|
+----+---+
|0.33|  3|
+----+---+

dfMap.getOrElse(7, null).show
+----+---+
|   x|  g|
+----+---+
|0.71|  7|
|0.73|  7|
+----+---+

[ОБНОВИТЬ]

Если ваши диапазоны нерегулярны, вы можете определить функцию, которая сопоставляет Double с соответствующим идентификатором диапазона Int, а затем заключает его в UDF, как показано ниже:

val g: Double => Int = x => x match {
  case x if (x >= 0.0 && x < 0.12345) => 1
  case x if (x >= 0.12345 && x < 0.4834) => 2
  case x if (x >= 0.4834 && x < 1.0) => 3
  case _ => 99  // catch-all
}

val groupUDF = udf(g)

val df = Seq(0.1, 0.2, 0.71, 0.95, 0.03, 0.09, 0.44, 5.0).toDF("x")
val df2 = df.withColumn("g", groupUDF($"x"))

df2.show
+----+---+
|   x|  g|
+----+---+
| 0.1|  1|
| 0.2|  2|
|0.71|  3|
|0.95|  3|
|0.03|  1|
|0.09|  1|
|0.44|  2|
| 5.0| 99|
+----+---+
person Leo C    schedule 29.05.2017
comment
Не могли бы вы рассказать немного больше о dfMap? И считаете ли вы этот способ эффективным? Спасибо! - person Mr.cysl; 29.05.2017
comment
Преобразования для создания dfMap, которое представляет собой Map с отдельными g в качестве ключей и фреймами данных (с соответствующими g) в качестве значений, включают сбор различных g в локальный массив, преобразование в список и создание Map отфильтрованных фреймов данных для каждого элемента. в списке. Эти преобразования применяются ко всему набору данных, поэтому стоят недешево. Я бы сказал, что их стоит делать, если вы планируете выполнять обширные вычисления с выбранными фреймами данных в dfMap. - person Leo C; 29.05.2017
comment
Я вижу, что вы указываете. Я пытаюсь вычислить сумму x для каждой категории (здесь g). Есть ли хороший способ сделать это? - person Mr.cysl; 29.05.2017
comment
В этом случае нет необходимости выделять эти кадры данных для каждого g. Вы можете просто сделать это: df2.groupBy($"g").agg(sum($"x")) - person Leo C; 30.05.2017
comment
Не уверен, что следую логике вашего последнего комментария. Возможно, вы захотите начать отдельный вопрос с более подробным описанием и примерами. - person Leo C; 30.05.2017
comment
Я понимаю! Однако, если диапазон разделен неравномерно, например, [0, 0,12345), [0,12345, 0,4834)..., есть ли хороший способ разделить его (или сгруппировать по нему)? Спасибо! - person Mr.cysl; 01.06.2017
comment
@Mr.cysl, пожалуйста, посмотрите мой расширенный ответ. - person Leo C; 01.06.2017
comment
udf отличная идея! Спасибо! - person Mr.cysl; 01.06.2017

Если вы хотели дискретизировать столбец с двойным типом, вы можете просто сделать это (умножить столбец на 10, а затем привести его к целочисленному типу, столбец будет разделен на 10 дискретных ячеек):

import org.apache.spark.sql.types.IntegerType

val df = Seq(0.32, 0.5, 0.99, 0.72, 0.11, 0.03).toDF("A")
// df: org.apache.spark.sql.DataFrame = [A: double]

df.withColumn("new", ($"A" * 10).cast(IntegerType)).show
+----+---+
|   A|new|
+----+---+
|0.32|  3|
| 0.5|  5|
|0.99|  9|
|0.72|  7|
|0.11|  1|
|0.03|  0|
+----+---+
person Psidom    schedule 29.05.2017
comment
Я хочу 10 новых фреймов данных и делаю дальнейшие вычисления. Я думаю использовать .filter(), но я не уверен, что это работает, и это достаточно быстро (данные огромны). - person Mr.cysl; 29.05.2017
comment
Вместо создания 10 новых фреймов данных вам может понадобиться groupBy("new"), а затем делать все, что вам нужно для каждой группы. - person Psidom; 29.05.2017
comment
Я попробую это! - person Mr.cysl; 29.05.2017