Я запускаю один и тот же конвейер агрегации с приложением Spark и на консоли Mongos. На консоли данные извлекаются в мгновение ока, и для получения всех ожидаемых данных требуется только второе использование «it». Однако, согласно веб-интерфейсу Spark, приложение Spark занимает почти две минуты.
Как видите, для получения результата запускается 242 задачи. Я не уверен, почему запускается такое большое количество задач, когда агрегация MongoDB возвращает только 40 документов. Похоже, что есть высокие накладные расходы.
Запрос, который я запускаю на консоли Mongos:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
Код приложения Spark
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
После этого я использую hdfs dfs -getmerge /user/spark/output/ output.csv
и сравниваю результаты.
Почему агрегация такая медленная? Разве вызов withPipeline
не предназначен для уменьшения объема данных, необходимых для передачи в Spark? Похоже, что он не выполняет ту же агрегацию, что и консоль Mongos. На консоли Mongos это молниеносно. Я использую Spark 1.6.1 и mongo-spark-connector_2.10 версии 1.1.0.
Редактировать: Еще одна вещь, о которой мне интересно, это то, что запускаются два исполнителя (потому что я использую atm с настройками выполнения по умолчанию), но только один исполнитель выполняет всю работу. Почему второй исполнитель не выполняет никакой работы?
Редактировать 2: при использовании другого конвейера агрегации и вызове .count()
вместо saveAsTextFile(..)
также создается 242 задачи. На этот раз будет возвращено 65 000 документов.
.count()
наaggregatedRdd
вместо сохранения в hdfs. Другой запрос возвращает несколько миллионов документов. Статистика моей коллекции:data : 15.01GiB docs : 45141000 chunks : 443
. Я сомневаюсь, что запись в HDFS является проблемой. Это единственное действие, которое вызывается в моем искровом приложении, поэтому оно указано как единственный этап в веб-интерфейсе. Или я ошибаюсь? - person j9dy   schedule 04.11.2016Document.parse("{ $match: {ts: {$gt: ISODate(\"2016-02-22T08:30:26.000Z\"), $lte: ISODate(\"2016-02-22T08:44:35.000Z\")} } }")
и снова создается 242 задачи, когда я вызываю.count()
на rdd. Есть идеи, что случилось? Я добавил еще одно изображение к исходному сообщению. - person j9dy   schedule 04.11.2016count()
будет правильным, что означает, что агрегация правильно передается в MongoDB. Я думаю, что проблема будет заключаться в выборе лучшего разделителя для этой рабочей нагрузки: если вы вызоветеoutputRdd.partitions.size
, каково значение? 242? - person Ross   schedule 04.11.2016