ไม่สามารถรวบรวมข้อมูลจากชุดข้อมูล / dataframe ใน Spark 2.0.1; รับ ClassCastException

ฉันมีข้อมูล json บางส่วนที่เป็นคู่ของค่าคีย์โดยมี int เป็นคีย์และรายการ ints เป็นค่า ฉันต้องการอ่านข้อมูลนี้ลงในแผนที่แล้วออกอากาศเพื่อให้ RDD อื่นนำไปใช้ในการค้นหาอย่างรวดเร็ว

ฉันมีโค้ดที่ทำงานกับคลัสเตอร์ Spark 1.6.1 ที่อยู่ในศูนย์ข้อมูล แต่โค้ดเดียวกันนี้ใช้ไม่ได้กับคลัสเตอร์ Spark 2.0.1 ใน AWS รหัส 1.6.1 ที่ใช้งานได้:

import scala.collection.mutable.WrappedArray
sc.broadcast(sqlContext.read.schema(mySchema).json(myPath).map(r => (r.getInt(0), r.getAs[WrappedArray[Int]].toArray)).collectAsMap)

ฉันเป็น 2.0.1 ฉันได้ลองแล้ว:

val myData = sqlContext.read.schema(mySchema).json(myPath).map(r => (r.getInt(0), r.getSeq[Int].toArray))

สิ่งนี้ทำให้ฉันได้รับสิ่งที่ฉันต้องการ ณ จุดนี้:

org.apache.spark.sql.Dataset[(Int, Array[Int])] = [_1: int, _2: array<int>]

แต่แล้วเมื่อฉันทำ:

sc.broadcast(myData.rdd.collectAsMap) 

ฉันเข้าใจ:

java.lang.ClassCastException: ไม่สามารถกำหนดอินสแตนซ์ของ scala.collection.immutable.List$SerializationProxy ให้กับฟิลด์ org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ ของประเภท scala.collection.Seq ในอินสแตนซ์ของ org.apache.spark.rdd.MapPartitionsRDD

ไม่มีใครรู้ว่าฉันสามารถทำได้ใน 2.0.1 อย่างไร มันเป็นเรื่องง่ายมากที่ฉันพยายามทำ

ขอบคุณล่วงหน้า,

โรบิน


person robinlmorris    schedule 25.10.2016    source แหล่งที่มา
comment
เฮ้ ฉันก็โพสต์คำถามเดียวกันเมื่อกี้เหมือนกัน.. ต้องการความช่วยเหลือในกรณีที่คุณพบคำถามนี้..... stackoverflow.com/questions/40233215/   -  person Tanny    schedule 25.10.2016


คำตอบ (1)


ฉันคิดว่าปัญหาของฉันอยู่ที่ประกายไฟใน 2.0.1 โค้ดที่ฉันโพสต์ทำงานได้ดีหากฉันใช้ sc และ sqlContext ที่มีอยู่ซึ่งเป็นส่วนหนึ่งของเซสชัน spark ที่เชลล์สร้างขึ้น ถ้าฉันเรียกหยุดและสร้างเซสชันใหม่ด้วยการกำหนดค่าแบบกำหนดเอง ฉันจะได้รับข้อผิดพลาดแปลก ๆ ข้างต้น ฉันไม่ชอบสิ่งนี้เพราะฉันต้องการเปลี่ยน spark.driver.maxResultSize

อย่างไรก็ตาม บทเรียนก็คือ: หากคุณกำลังทดสอบโค้ดโดยใช้ Spark Shell ให้ใช้เซสชันที่มีอยู่ ไม่เช่นนั้นอาจไม่ทำงาน

person robinlmorris    schedule 01.11.2016
comment
คุณควรจะสามารถหยุดเซสชันและสร้างเซสชันใหม่ได้ อาจเป็นสภาพการแข่งขันบางประเภทหรือไม่? คุณเริ่มเซสชั่นใหม่ทันทีหลังจากหยุดเซสชั่นเก่าหรือไม่? - person Jakob Odersky; 02.11.2016