MongoDB Spark Connector - การรวมตัวช้า

ฉันกำลังใช้งานไปป์ไลน์การรวมกลุ่มเดียวกันกับแอปพลิเคชัน Spark และบนคอนโซล Mongos บนคอนโซล ข้อมูลจะถูกดึงออกมาภายในพริบตา และจำเป็นต้องใช้ "it" เพียงครั้งเดียวเพื่อดึงข้อมูลที่คาดหวังทั้งหมด อย่างไรก็ตาม แอปพลิเคชัน Spark ใช้เวลาเกือบสองนาทีตาม Spark WebUI

ป้อนคำอธิบายรูปภาพที่นี่

อย่างที่คุณเห็น มีการเปิดตัวงาน 242 งานเพื่อดึงผลลัพธ์ ฉันไม่แน่ใจว่าเหตุใดจึงมีการเปิดตัวงานจำนวนมากในขณะที่การรวม MongoDB ส่งคืนเอกสารเพียง 40 ฉบับ ดูเหมือนว่ามีค่าใช้จ่ายสูง

แบบสอบถามที่ฉันเรียกใช้บนคอนโซล Mongos:

db.data.aggregate([
   {
      $match:{
         signals:{
            $elemMatch:{
               signal:"SomeSignal",
               value:{
                  $gt:0,
                  $lte:100
               }
            }
         }
      }
   },
   {
      $group:{
         _id:"$root_document",
         firstTimestamp:{
            $min:"$ts"
         },
         lastTimestamp:{
            $max:"$ts"
         },
         count:{
            $sum:1
         }
      }
   }
])

รหัสแอปพลิเคชัน Spark

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
            Document.parse(
                    "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
            Document.parse(
                    "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
        @Override
        public String call(Document arg0) throws Exception {
            String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
                    arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
                    arg0.get("count").toString());
            return output;
        }
    });

    outputRdd.saveAsTextFile("/user/spark/output");

หลังจากนั้นฉันใช้ hdfs dfs -getmerge /user/spark/output/ output.csv และเปรียบเทียบผลลัพธ์

ทำไมการรวมตัวจึงช้ามาก? การเรียก withPipeline มีจุดประสงค์เพื่อลดปริมาณข้อมูลที่จำเป็นในการถ่ายโอนไปยัง Spark ไม่ใช่หรือ ดูเหมือนว่าจะไม่ได้ทำการรวมกลุ่มแบบเดียวกับที่คอนโซล Mongos ทำ บนคอนโซล Mongos มันทำงานเร็วมาก ฉันใช้ Spark 1.6.1 และ mongo-spark-connector_2.10 เวอร์ชัน 1.1.0

แก้ไข: อีกสิ่งหนึ่งที่ฉันสงสัยคือมีการเปิดตัวตัวดำเนินการสองตัว (เพราะฉันใช้การตั้งค่าการดำเนินการเริ่มต้น atm) แต่มีตัวดำเนินการเพียงตัวเดียวเท่านั้นที่ทำงานได้ทั้งหมด เหตุใดผู้ดำเนินการคนที่สองจึงไม่ทำงานใด ๆ

ป้อนคำอธิบายรูปภาพที่นี่

แก้ไข 2: เมื่อใช้ไปป์ไลน์การรวมอื่นและเรียก .count() แทน saveAsTextFile(..) จะมีการสร้างงาน 242 งานด้วย ครั้งนี้จะมีการคืนเอกสารจำนวน 65,000 ฉบับ ป้อนคำอธิบายรูปภาพที่นี่


person j9dy    schedule 04.11.2016    source แหล่งที่มา
comment
ฉันจะดู UI มากขึ้นเพื่อพยายามทำความเข้าใจว่า 242 งานคืออะไร ด้วยเอกสาร 40 ฉบับ ฉันจินตนาการว่าเอกสารทั้งหมดจะพอดีกับพาร์ติชันเดียว   -  person Ross    schedule 04.11.2016
comment
งาน @Ross 242 ยังถูกสร้างขึ้นเมื่อฉันเรียกใช้แบบสอบถามอื่นและ .count() บน aggregatedRdd แทนที่จะบันทึกลงใน hdfs แบบสอบถามที่แตกต่างกันส่งคืนเอกสารสองสามล้านฉบับ สถิติคอลเลกชันของฉันคือ: data : 15.01GiB docs : 45141000 chunks : 443 ฉันสงสัยว่าการเขียนไปยัง HDFS เป็นปัญหา มันเป็นเพียงการกระทำเดียวที่ถูกเรียกในแอปพลิเคชัน Spark ของฉัน นั่นเป็นเหตุผลว่าทำไมจึงถูกระบุว่าเป็นขั้นตอนเดียวใน UI ของเว็บ หรือฉันคิดผิด?   -  person j9dy    schedule 04.11.2016
comment
@Ross ฉันรู้สึกเหมือนไม่ได้ดำเนินการไปป์ไลน์การรวม ฉันจำเป็นต้องดำเนินการไปป์ไลน์การรวมกลุ่มโดยเฉพาะหรือไม่   -  person j9dy    schedule 04.11.2016
comment
@Ross ฉันเพิ่งดำเนินการรวมอีกครั้งโดยมีเพียงสิ่งนี้เป็นไปป์ไลน์: Document.parse("{ $match: {ts: {$gt: ISODate(\"2016-02-22T08:30:26.000Z\"), $lte: ISODate(\"2016-02-22T08:44:35.000Z\")} } }") และอีก 242 งานถูกสร้างขึ้นเมื่อฉันโทร .count() บน rdd มีความคิดอะไรผิดปกติบ้างไหม? ฉันได้เพิ่มรูปภาพอื่นในโพสต์ต้นฉบับ   -  person j9dy    schedule 04.11.2016
comment
ฉันคาดว่าผลลัพธ์ของ count() ถูกต้อง ซึ่งหมายความว่าการรวมจะถูกส่งต่อไปยัง MongoDB อย่างถูกต้อง ฉันคิดว่าปัญหาคือการเลือกตัวแบ่งพาร์ติชั่นที่ดีกว่าสำหรับเวิร์กโหลดนี้: ถ้าคุณโทร outputRdd.partitions.size ค่าคืออะไร? 242?   -  person Ross    schedule 04.11.2016
comment
ให้เราสนทนาต่อในการแชท   -  person Ross    schedule 04.11.2016
comment
@Ross ฉันกลับมาจากพักร้อนและได้เพิ่มความคิดเห็นในการแชท ขอบคุณที่พยายามช่วยฉัน   -  person j9dy    schedule 11.11.2016


คำตอบ (1)


งานจำนวนมากมีสาเหตุมาจากกลยุทธ์ตัวแบ่งพาร์ติชัน Mongo Spark ดีฟอลต์ โดยจะละเว้นไปป์ไลน์การรวมเมื่อคำนวณพาร์ติชัน ด้วยเหตุผลหลักสองประการ:

  1. ช่วยลดต้นทุนในการคำนวณพาร์ติชั่น
  2. ตรวจสอบให้แน่ใจว่าลักษณะการทำงานเดียวกันสำหรับพาร์ติชันแบบแบ่งส่วนและไม่แบ่งส่วน

อย่างไรก็ตาม ตามที่คุณพบว่าพวกมันสามารถสร้างพาร์ติชั่นว่างได้ ซึ่งในกรณีของคุณมีค่าใช้จ่ายสูง

ทางเลือกในการแก้ไขอาจเป็นดังนี้:

  1. เปลี่ยนกลยุทธ์การแบ่งพาร์ติชัน

    สำหรับการเลือกพาร์ติชั่นสำรองเพื่อลดจำนวนพาร์ติชั่น ตัวอย่างเช่น PaginateByCount จะแบ่งฐานข้อมูลออกเป็นพาร์ติชันตามจำนวนที่กำหนด

    สร้างตัวแบ่งพาร์ติชั่นของคุณเอง - เพียงใช้คุณสมบัติแล้วคุณจะสามารถใช้ไปป์ไลน์การรวมและแบ่งพาร์ติชั่นผลลัพธ์ได้ ดู HalfwayPartitioner และ การทดสอบตัวแบ่งพาร์ติชันแบบกำหนดเอง สำหรับตัวอย่าง

  2. รวมผลลัพธ์ไว้ล่วงหน้าเป็นคอลเลกชันโดยใช้ $out และอ่านจากที่นั่น

  3. ใช้ coalesce(N) เพื่อรวมพาร์ติชันเข้าด้วยกันและลดจำนวนพาร์ติชัน
  4. เพิ่มการกำหนดค่า spark.mongodb.input.partitionerOptions.partitionSizeMB เพื่อสร้างพาร์ติชันให้น้อยลง

ตัวแบ่งพาร์ติชั่นแบบกำหนดเองควรสร้างทางออกที่ดีที่สุด แต่มีหลายวิธีในการใช้ประโยชน์จากพาร์ติชั่นเริ่มต้นที่มีอยู่ให้ดีขึ้น

หากคุณคิดว่าควรมีตัวแบ่งพาร์ติชันเริ่มต้นที่ใช้ไปป์ไลน์การรวมเพื่อคำนวณพาร์ติชัน โปรดเพิ่มตั๋วลงใน MongoDB โครงการสปาร์ค จิรา

person Ross    schedule 11.11.2016
comment
ฉันสามารถใช้ MongoShardedPartitioner สำหรับคอลเลกชันที่มีการแฮชชาร์ดดิ้งได้หรือไม่ เอกสารระบุว่า shardkey - The field should be indexed and contain unique values. ในกรณีของฉัน ฉันมีคีย์การแบ่งส่วนข้อมูลรวมกันจากฟิลด์ของฉัน log_file_name:day_of_timestamp:hour_of_timestamp ซึ่งส่งผลให้สามารถบันทึกข้อมูลที่เกี่ยวข้องไว้ใกล้กัน - อย่างน้อยฉันก็หวังว่าจะเป็นเช่นนั้น แต่ค่าการแฮชล่วงหน้านั้นไม่ซ้ำกัน เอกสารพูดถึงค่าแฮชหรือไม่ นอกจากนี้ ฉันมีคำถามติดตามเล็กๆ น้อยๆ เกี่ยวกับวิธีใช้ MongoSpark สำหรับการสอบถามหลายรายการในการแชท หากคุณสนใจจะลองดู - person j9dy; 14.11.2016