ฉันกำลังใช้งานไปป์ไลน์การรวมกลุ่มเดียวกันกับแอปพลิเคชัน Spark และบนคอนโซล Mongos บนคอนโซล ข้อมูลจะถูกดึงออกมาภายในพริบตา และจำเป็นต้องใช้ "it" เพียงครั้งเดียวเพื่อดึงข้อมูลที่คาดหวังทั้งหมด อย่างไรก็ตาม แอปพลิเคชัน Spark ใช้เวลาเกือบสองนาทีตาม Spark WebUI
อย่างที่คุณเห็น มีการเปิดตัวงาน 242 งานเพื่อดึงผลลัพธ์ ฉันไม่แน่ใจว่าเหตุใดจึงมีการเปิดตัวงานจำนวนมากในขณะที่การรวม MongoDB ส่งคืนเอกสารเพียง 40 ฉบับ ดูเหมือนว่ามีค่าใช้จ่ายสูง
แบบสอบถามที่ฉันเรียกใช้บนคอนโซล Mongos:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
รหัสแอปพลิเคชัน Spark
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
หลังจากนั้นฉันใช้ hdfs dfs -getmerge /user/spark/output/ output.csv
และเปรียบเทียบผลลัพธ์
ทำไมการรวมตัวจึงช้ามาก? การเรียก withPipeline
มีจุดประสงค์เพื่อลดปริมาณข้อมูลที่จำเป็นในการถ่ายโอนไปยัง Spark ไม่ใช่หรือ ดูเหมือนว่าจะไม่ได้ทำการรวมกลุ่มแบบเดียวกับที่คอนโซล Mongos ทำ บนคอนโซล Mongos มันทำงานเร็วมาก ฉันใช้ Spark 1.6.1 และ mongo-spark-connector_2.10 เวอร์ชัน 1.1.0
แก้ไข: อีกสิ่งหนึ่งที่ฉันสงสัยคือมีการเปิดตัวตัวดำเนินการสองตัว (เพราะฉันใช้การตั้งค่าการดำเนินการเริ่มต้น atm) แต่มีตัวดำเนินการเพียงตัวเดียวเท่านั้นที่ทำงานได้ทั้งหมด เหตุใดผู้ดำเนินการคนที่สองจึงไม่ทำงานใด ๆ
แก้ไข 2: เมื่อใช้ไปป์ไลน์การรวมอื่นและเรียก .count()
แทน saveAsTextFile(..)
จะมีการสร้างงาน 242 งานด้วย ครั้งนี้จะมีการคืนเอกสารจำนวน 65,000 ฉบับ
.count()
บนaggregatedRdd
แทนที่จะบันทึกลงใน hdfs แบบสอบถามที่แตกต่างกันส่งคืนเอกสารสองสามล้านฉบับ สถิติคอลเลกชันของฉันคือ:data : 15.01GiB docs : 45141000 chunks : 443
ฉันสงสัยว่าการเขียนไปยัง HDFS เป็นปัญหา มันเป็นเพียงการกระทำเดียวที่ถูกเรียกในแอปพลิเคชัน Spark ของฉัน นั่นเป็นเหตุผลว่าทำไมจึงถูกระบุว่าเป็นขั้นตอนเดียวใน UI ของเว็บ หรือฉันคิดผิด? - person j9dy   schedule 04.11.2016Document.parse("{ $match: {ts: {$gt: ISODate(\"2016-02-22T08:30:26.000Z\"), $lte: ISODate(\"2016-02-22T08:44:35.000Z\")} } }")
และอีก 242 งานถูกสร้างขึ้นเมื่อฉันโทร.count()
บน rdd มีความคิดอะไรผิดปกติบ้างไหม? ฉันได้เพิ่มรูปภาพอื่นในโพสต์ต้นฉบับ - person j9dy   schedule 04.11.2016count()
ถูกต้อง ซึ่งหมายความว่าการรวมจะถูกส่งต่อไปยัง MongoDB อย่างถูกต้อง ฉันคิดว่าปัญหาคือการเลือกตัวแบ่งพาร์ติชั่นที่ดีกว่าสำหรับเวิร์กโหลดนี้: ถ้าคุณโทรoutputRdd.partitions.size
ค่าคืออะไร? 242? - person Ross   schedule 04.11.2016