Konektor MongoDB Spark - agregasinya lambat

Saya menjalankan pipa agregasi yang sama dengan Aplikasi Spark dan di konsol Mongos. Di konsol, data diambil dalam sekejap mata, dan hanya diperlukan penggunaan "itu" untuk kedua kalinya untuk mengambil semua data yang diharapkan. Namun Aplikasi Spark membutuhkan waktu hampir dua menit menurut Spark WebUI.

masukkan deskripsi gambar di sini

Seperti yang Anda lihat, 242 tugas diluncurkan untuk mengambil hasilnya. Saya tidak yakin mengapa begitu banyak tugas yang diluncurkan sementara hanya ada 40 dokumen yang dikembalikan oleh agregasi MongoDB. Sepertinya ada overhead yang tinggi.

Kueri yang saya jalankan di konsol Mongos:

db.data.aggregate([
   {
      $match:{
         signals:{
            $elemMatch:{
               signal:"SomeSignal",
               value:{
                  $gt:0,
                  $lte:100
               }
            }
         }
      }
   },
   {
      $group:{
         _id:"$root_document",
         firstTimestamp:{
            $min:"$ts"
         },
         lastTimestamp:{
            $max:"$ts"
         },
         count:{
            $sum:1
         }
      }
   }
])

Kode Aplikasi Spark

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
            Document.parse(
                    "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
            Document.parse(
                    "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
        @Override
        public String call(Document arg0) throws Exception {
            String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
                    arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
                    arg0.get("count").toString());
            return output;
        }
    });

    outputRdd.saveAsTextFile("/user/spark/output");

Setelah itu saya menggunakan hdfs dfs -getmerge /user/spark/output/ output.csv dan membandingkan hasilnya.

Mengapa agregasinya sangat lambat? Bukankah panggilan ke withPipeline dimaksudkan untuk mengurangi jumlah data yang diperlukan untuk ditransfer ke Spark? Sepertinya ia tidak melakukan agregasi yang sama seperti yang dilakukan konsol Mongos. Di konsol Mongos, kecepatannya sangat tinggi. Saya menggunakan Spark 1.6.1 dan mongo-spark-connector_2.10 versi 1.1.0.

Edit: Hal lain yang saya ingin tahu adalah bahwa dua pelaksana diluncurkan (karena saya menggunakan atm pengaturan eksekusi default), tetapi hanya satu pelaksana yang melakukan semua pekerjaan. Mengapa pelaksana kedua tidak melakukan pekerjaan apa pun?

masukkan deskripsi gambar di sini

Sunting 2: Saat menggunakan pipa agregasi berbeda dan memanggil .count() alih-alih saveAsTextFile(..), ada juga 242 tugas yang dibuat. Kali ini akan ada 65.000 dokumen yang dikembalikan. masukkan deskripsi gambar di sini


person j9dy    schedule 04.11.2016    source sumber
comment
Saya akan melihat lebih dalam UI untuk mencoba dan memahami apa saja 242 tugasnya. Dengan 40 dokumen, saya membayangkan semuanya akan muat dalam satu partisi.   -  person Ross    schedule 04.11.2016
comment
@Ross 242 tugas juga dibuat ketika saya menjalankan kueri yang berbeda dan .count() pada aggregatedRdd alih-alih menyimpannya ke hdfs. Kueri yang berbeda mengembalikan beberapa juta dokumen. Statistik koleksi saya adalah: data : 15.01GiB docs : 45141000 chunks : 443. Saya ragu menulisnya ke HDFS adalah masalahnya. Itu hanya satu-satunya tindakan yang dipanggil di aplikasi percikan saya, itulah sebabnya tindakan itu terdaftar sebagai satu-satunya tahapan di UI web. Atau apakah saya salah?   -  person j9dy    schedule 04.11.2016
comment
@Ross Saya merasa pipa agregasi tidak dijalankan. Apakah saya harus menjalankan jalur agregasi secara khusus?   -  person j9dy    schedule 04.11.2016
comment
@Ross Saya baru saja menjalankan agregasi lain hanya dengan ini sebagai saluran pipa: Document.parse("{ $match: {ts: {$gt: ISODate(\"2016-02-22T08:30:26.000Z\"), $lte: ISODate(\"2016-02-22T08:44:35.000Z\")} } }") dan lagi 242 tugas dibuat ketika saya memanggil .count() di rdd. Tahu ada apa? Saya telah menambahkan gambar lain ke postingan asli.   -  person j9dy    schedule 04.11.2016
comment
Saya berharap hasil count() benar, artinya agregasi diteruskan ke MongoDB dengan benar. Saya pikir masalahnya adalah memilih partisi yang lebih baik untuk beban kerja ini: Jika Anda memanggil outputRdd.partitions.size berapa nilainya? 242?   -  person Ross    schedule 04.11.2016
comment
Mari kita melanjutkan diskusi ini di chat.   -  person Ross    schedule 04.11.2016
comment
@Ross Saya kembali dari liburan dan menambahkan komentar di obrolan. Terima kasih telah mencoba membantu saya.   -  person j9dy    schedule 11.11.2016


Jawaban (1)


Tingginya jumlah tugas disebabkan oleh strategi partisi Mongo Spark default. Ini mengabaikan pipa agregasi saat menghitung partisi, karena dua alasan utama:

  1. Ini mengurangi biaya penghitungan partisi
  2. Memastikan perilaku yang sama untuk partisi yang di-shard dan yang tidak di-shard

Namun, seperti yang Anda temukan, mereka dapat menghasilkan partisi kosong yang dalam kasus Anda mahal.

Pilihan untuk memperbaikinya bisa berupa:

  1. Ubah strategi partisi

    Untuk memilih partisi alternatif untuk mengurangi jumlah partisi. Misalnya PaginateByCount akan membagi database menjadi beberapa partisi.

    Buat partisi Anda sendiri - cukup terapkan sifat tersebut dan Anda akan dapat menerapkan pipa agregasi dan mempartisi hasilnya. Lihat HalfwayPartitioner dan pengujian partisi khusus sebagai contoh.

  2. Pra-agregat hasilnya ke dalam koleksi menggunakan $out dan baca dari sana.

  3. Gunakan coalesce(N) untuk menggabungkan partisi dan mengurangi jumlah partisi.
  4. Tingkatkan konfigurasi spark.mongodb.input.partitionerOptions.partitionSizeMB untuk menghasilkan lebih sedikit partisi.

Pemartisi khusus seharusnya menghasilkan solusi terbaik, tetapi ada cara untuk memanfaatkan pemartisi default yang tersedia dengan lebih baik.

Jika menurut Anda harus ada partisi default yang menggunakan pipa agregasi untuk menghitung partisi, silakan tambahkan tiket ke MongoDB Proyek Spark Jira.

person Ross    schedule 11.11.2016
comment
Bisakah saya menggunakan MongoShardedPartitioner untuk koleksi dengan sharding hash? Dokumentasinya mengatakan shardkey - The field should be indexed and contain unique values. Dalam kasus saya, saya memiliki kunci sharding gabungan dari bidang saya log_file_name:day_of_timestamp:hour_of_timestamp yang menghasilkan penyimpanan data yang relevan secara berdekatan - setidaknya saya berharap demikian. Namun nilai pra-hashing tidaklah unik. Apakah dokumentasinya berbicara tentang nilai hash? Saya juga punya pertanyaan tindak lanjut kecil tentang cara menggunakan MongoSpark untuk beberapa pertanyaan dalam obrolan -- jika Anda keberatan melihatnya. - person j9dy; 14.11.2016