คำถามเกี่ยวกับการออกแบบ Spark Streaming

ฉันเป็นคนใหม่ในจุดประกาย ฉันต้องการตั้งค่าการสตรีมแบบ Spark เพื่อดึงคู่ค่าคีย์ของไฟล์รูปแบบด้านล่าง:

ไฟล์: info1

ป้อนคำอธิบายรูปภาพที่นี่

หมายเหตุ: แต่ละไฟล์ข้อมูลจะมีบันทึกเหล่านี้ประมาณ 1,000 รายการ และระบบของเรากำลังสร้างไฟล์ข้อมูลเหล่านี้อย่างต่อเนื่อง ฉันต้องการทำการแมปหมายเลขบรรทัดและไฟล์ข้อมูลและต้องการผลลัพธ์รวม

เราสามารถป้อนข้อมูลให้กับไฟล์ประเภทนี้ให้กับคลัสเตอร์ Spark ได้หรือไม่? ฉันสนใจตัวคั่น "SF" และ "DA" เท่านั้น "SF" สอดคล้องกับไฟล์ต้นฉบับและ "DA" สอดคล้องกับ ( หมายเลขบรรทัด, นับ)

เนื่องจากข้อมูลอินพุตนี้ไม่ใช่รูปแบบเส้น ดังนั้นนี่เป็นความคิดที่ดีที่จะใช้ไฟล์เหล่านี้สำหรับอินพุตแบบประกายไฟ หรือฉันควรต้องทำขั้นตอนตัวกลางซึ่งฉันต้องล้างไฟล์เหล่านี้เพื่อสร้างไฟล์ใหม่ซึ่งจะมีข้อมูลบันทึกแต่ละรายการ เข้าแถวแทนที่จะเป็นบล็อกเหรอ?

หรือเราสามารถบรรลุเป้าหมายนี้ได้ด้วย Spark เอง? แนวทางที่ถูกต้องควรเป็นอย่างไร?

ฉันต้องการบรรลุอะไร? ฉันต้องการรับข้อมูลระดับบรรทัด หมายถึงการรับบรรทัด (เป็นคีย์) และไฟล์ข้อมูล (เป็นค่า)

ผลลัพธ์สุดท้ายที่ฉันต้องการมีดังนี้: line178 -> (info1, info2, info7.................)

บรรทัด 2908 -> (info3, info90, ..., ... ,)

โปรดแจ้งให้เราทราบหากคำอธิบายของฉันไม่ชัดเจนหรือถ้าฉันขาดหายไป

ขอขอบคุณและขอแสดงความนับถือ Vinti


person Vibhuti    schedule 01.02.2016    source แหล่งที่มา


คำตอบ (1)


คุณสามารถทำอะไรแบบนี้ได้ มีสตรีม DStream ของคุณ:

// this gives you DA & FP lines, with the line number as the key
val validLines =  stream.map(_.split(":")).
  filter(line => Seq("DA", "FP").contains(line._1)).
  map(_._2.split(","))
  map(line => (line._1, line._2))

// now you should accumulate values
val state = validLines.updateStateByKey[Seq[String]](updateFunction _)

def updateFunction(newValues: Seq[Seq[String]], runningValues: Option[Seq[String]]): Option[Seq[String]] = {
  // add the new values 
  val newVals = runnigValues match {
    case Some(list) => list :: newValues
    case _ => newValues
  }
  Some(newVals)
}

สิ่งนี้ควรสะสมสำหรับแต่ละคีย์ตามลำดับที่มีค่าที่เกี่ยวข้อง โดยเก็บไว้ใน สถานะ

person rhernando    schedule 02.02.2016