การสะท้อน Scala พร้อมการทำให้เป็นอนุกรม (บน Spark) - สัญลักษณ์ไม่สามารถทำให้เป็นอนุกรมได้

เริ่มต้นด้วยฉันใช้ scala 2.10.4 และตัวอย่างด้านบนทำงานใน Spark 1.6 (แม้ว่าฉันจะสงสัยว่า Spark มีส่วนเกี่ยวข้องกับสิ่งนี้ แต่ก็เป็นเพียงปัญหาการทำให้เป็นอนุกรม)

นี่คือปัญหาของฉัน: สมมติว่าฉันมีลักษณะ Base ที่นำไปใช้โดยบอกว่ามีสองคลาส B1 และ B2 ตอนนี้ฉันมีลักษณะทั่วไปที่ถูกขยายออกไปโดยกลุ่มของคลาส หนึ่งในนั้นมีลักษณะที่มากกว่าชนิดย่อยของ Base เช่น (ที่นี่ฉันยังคงแนวคิดของ Spark เกี่ยวกับ RDD ไว้ แต่อาจเป็นอย่างอื่นจริง ๆ ทันทีที่มีการซีเรียลไลซ์ บางสิ่งเป็นเพียงผลลัพธ์ไม่ว่าอะไรจะเกิดขึ้นจริง):

trait Foo[T] { def function(rdd: RDD[T]): Something }
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something  = ... }
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something  = ... }
...

ตอนนี้ฉันต้องการวัตถุที่จะใช้ RDD[T] (สมมติว่าไม่มีความคลุมเครือที่นี่ มันเป็นเพียงเวอร์ชันที่เรียบง่าย) และส่งคืน Something ที่สอดคล้องกับผลลัพธ์ของฟังก์ชันที่สอดคล้องกับประเภท T แต่ควรใช้ได้กับ Array[T] ด้วยกลยุทธ์การรวมเข้าด้วยกัน จนถึงตอนนี้ดูเหมือนว่า:

object Obj {
   def compute[T: TypeTag](input: RDD[T]): Something = {
      typeOf[T] match {
         case t if t <:< typeOf[A] => 
            val foo = new Foo[T]
            foo.function(rdd)
         case t if t <:< typeOf[Array[A]] => 
            val foo = new Foo[A]
            foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]])))
         case t if t <:< typeOf[Base] => 
            val foo = new Foo[T]
            foo.function(rdd)
         // here it gets ugly...
         case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why?
            val tt = getSubInfo[T](0)
            val tpe = tt.tpe
            val foo = new Foo[tpe.type]
            foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]]))
      }
   }

   // strategy to transform arrays of T into a T object when possible
   private def mergeArray[T: TypeTag](a: Array[T]): T = ... 

  // extract the subtype, e.g. if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though
   private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ... 
}

น่าเสียดายที่ดูเหมือนว่าจะทำงานได้ดีบนเครื่องท้องถิ่น แต่เมื่อถูกส่งไปยัง Spark (แบบอนุกรม) ฉันจะได้รับ org.apache.spark.SparkException: Task not serializable ด้วย:

Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
    - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types)
    - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)

ฉันมีวิธีแก้ไข (ค่อนข้างชัดเจน และแจกแจงความเป็นไปได้) แต่ด้วยความอยากรู้อยากเห็น มีวิธีแก้ไขปัญหานี้หรือไม่ และเหตุใด Symbol จึงไม่สามารถทำให้เป็นอนุกรมได้ในขณะที่สิ่งที่เทียบเท่าใน Manifests เป็นอย่างไร

ขอบคุณสำหรับความช่วยเหลือ


person Vince.Bdn    schedule 11.02.2016    source แหล่งที่มา


คำตอบ (1)


โดยทั่วไปแล้ว TypeTags จะสามารถซีเรียลไลซ์ได้ในสกาล่า แต่ที่น่าแปลกคือไม่ใช่ประเภทโดยตรง (ซึ่งแปลกเพราะ typetags มีสัญลักษณ์ที่ไม่ใช่ :-/)

นี่อาจทำสิ่งที่คุณต้องการ

// implicit constructor TypeTag parameter is serialized.
abstract class TypeAware[T:TypeTag] extends Serializable {
  def typ:Type = _typeCached

  @transient
  lazy val _typeCached:Type = typeOf[T]
}

trait Foo[T] extends Serializable { 
  def function(rdd: RDD[T]): Something  {... impl here?...}
  def typ:Type 
}

class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{
   def function(rdd: RDD[T]): Something  {... impl here?...}
}
person user48956    schedule 23.08.2016
comment
ฉันเชื่อว่า TypeApi มี scala.reflect.internal.Symbols$PackageClassSymbol ด้วย ดังนั้นสิ่งนี้ก็จะใช้งานไม่ได้เช่นกัน - person tribbloid; 21.11.2016
comment
ได้ผลแน่นอน (ในสกาล่า 2.11) นี่คือคำขอผสาน github.com/scala/scala/pull/3817< /ก> - person user48956; 21.11.2016