Не удается собрать данные из набора данных/фрейма данных в Spark 2.0.1; получить исключение ClassCastException

У меня есть некоторые данные json, которые представляют собой пары ключевых значений с целыми числами в качестве ключей и списками целых чисел в качестве значений. Я хочу прочитать эти данные на карте, а затем передать их, чтобы их можно было использовать в другом RDD для быстрого поиска.

У меня есть код, который работал на искровом кластере 1.6.1 в центре обработки данных, но тот же код не будет работать на искровом кластере 2.0.1 в AWS. Код 1.6.1, который работает:

import scala.collection.mutable.WrappedArray
sc.broadcast(sqlContext.read.schema(mySchema).json(myPath).map(r => (r.getInt(0), r.getAs[WrappedArray[Int]].toArray)).collectAsMap)

Я для 2.0.1 пробовал:

val myData = sqlContext.read.schema(mySchema).json(myPath).map(r => (r.getInt(0), r.getSeq[Int].toArray))

Это дает мне то, что я хочу на данный момент:

org.apache.spark.sql.Dataset[(Int, Array[Int])] = [_1: int, _2: array<int>]

Но тогда, когда я делаю:

sc.broadcast(myData.rdd.collectAsMap) 

Я получил:

java.lang.ClassCastException: невозможно назначить экземпляр scala.collection.immutable.List$SerializationProxy в поле org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ типа scala.collection.Seq в случае org.apache.spark.rdd.MapPartitionsRDD

Кто-нибудь знает, как я могу сделать это в 2.0.1? Это очень простая вещь, которую я пытаюсь сделать.

Заранее спасибо,

Робин


person robinlmorris    schedule 25.10.2016    source источник
comment
Эй, я тоже только что опубликовал тот же вопрос. Нужна помощь, если вы ее найдете..... stackoverflow.com/questions/40233215/   -  person Tanny    schedule 25.10.2016


Ответы (1)


Я понял, что моя проблема была с искровой оболочкой в ​​2.0.1. Код, который я разместил, отлично работает, если я использую существующие sc и sqlContext, которые являются частью сеанса spark, создаваемого оболочкой. Если я вызову остановку и создам новый сеанс с пользовательской конфигурацией, я получу странную ошибку выше. Мне это не нравится, так как я хочу изменить spark.driver.maxResultSize.

В любом случае, урок таков: если вы тестируете код с помощью искровой оболочки, используйте существующую сессию, иначе она может не сработать.

person robinlmorris    schedule 01.11.2016
comment
Вы должны иметь возможность остановить сеанс и создать новый. Может быть, это какое-то состояние гонки? Вы начинаете новую сессию сразу после остановки старой? - person Jakob Odersky; 02.11.2016