Saya menjalankan pipa agregasi yang sama dengan Aplikasi Spark dan di konsol Mongos. Di konsol, data diambil dalam sekejap mata, dan hanya diperlukan penggunaan "itu" untuk kedua kalinya untuk mengambil semua data yang diharapkan. Namun Aplikasi Spark membutuhkan waktu hampir dua menit menurut Spark WebUI.
Seperti yang Anda lihat, 242 tugas diluncurkan untuk mengambil hasilnya. Saya tidak yakin mengapa begitu banyak tugas yang diluncurkan sementara hanya ada 40 dokumen yang dikembalikan oleh agregasi MongoDB. Sepertinya ada overhead yang tinggi.
Kueri yang saya jalankan di konsol Mongos:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
Kode Aplikasi Spark
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
Setelah itu saya menggunakan hdfs dfs -getmerge /user/spark/output/ output.csv
dan membandingkan hasilnya.
Mengapa agregasinya sangat lambat? Bukankah panggilan ke withPipeline
dimaksudkan untuk mengurangi jumlah data yang diperlukan untuk ditransfer ke Spark? Sepertinya ia tidak melakukan agregasi yang sama seperti yang dilakukan konsol Mongos. Di konsol Mongos, kecepatannya sangat tinggi. Saya menggunakan Spark 1.6.1 dan mongo-spark-connector_2.10 versi 1.1.0.
Edit: Hal lain yang saya ingin tahu adalah bahwa dua pelaksana diluncurkan (karena saya menggunakan atm pengaturan eksekusi default), tetapi hanya satu pelaksana yang melakukan semua pekerjaan. Mengapa pelaksana kedua tidak melakukan pekerjaan apa pun?
Sunting 2: Saat menggunakan pipa agregasi berbeda dan memanggil .count()
alih-alih saveAsTextFile(..)
, ada juga 242 tugas yang dibuat. Kali ini akan ada 65.000 dokumen yang dikembalikan.
.count()
padaaggregatedRdd
alih-alih menyimpannya ke hdfs. Kueri yang berbeda mengembalikan beberapa juta dokumen. Statistik koleksi saya adalah:data : 15.01GiB docs : 45141000 chunks : 443
. Saya ragu menulisnya ke HDFS adalah masalahnya. Itu hanya satu-satunya tindakan yang dipanggil di aplikasi percikan saya, itulah sebabnya tindakan itu terdaftar sebagai satu-satunya tahapan di UI web. Atau apakah saya salah? - person j9dy   schedule 04.11.2016Document.parse("{ $match: {ts: {$gt: ISODate(\"2016-02-22T08:30:26.000Z\"), $lte: ISODate(\"2016-02-22T08:44:35.000Z\")} } }")
dan lagi 242 tugas dibuat ketika saya memanggil.count()
di rdd. Tahu ada apa? Saya telah menambahkan gambar lain ke postingan asli. - person j9dy   schedule 04.11.2016count()
benar, artinya agregasi diteruskan ke MongoDB dengan benar. Saya pikir masalahnya adalah memilih partisi yang lebih baik untuk beban kerja ini: Jika Anda memanggiloutputRdd.partitions.size
berapa nilainya? 242? - person Ross   schedule 04.11.2016