Коннектор MongoDB Spark — агрегация медленная

Я запускаю один и тот же конвейер агрегации с приложением Spark и на консоли Mongos. На консоли данные извлекаются в мгновение ока, и для получения всех ожидаемых данных требуется только второе использование «it». Однако, согласно веб-интерфейсу Spark, приложение Spark занимает почти две минуты.

введите здесь описание изображения

Как видите, для получения результата запускается 242 задачи. Я не уверен, почему запускается такое большое количество задач, когда агрегация MongoDB возвращает только 40 документов. Похоже, что есть высокие накладные расходы.

Запрос, который я запускаю на консоли Mongos:

db.data.aggregate([
   {
      $match:{
         signals:{
            $elemMatch:{
               signal:"SomeSignal",
               value:{
                  $gt:0,
                  $lte:100
               }
            }
         }
      }
   },
   {
      $group:{
         _id:"$root_document",
         firstTimestamp:{
            $min:"$ts"
         },
         lastTimestamp:{
            $max:"$ts"
         },
         count:{
            $sum:1
         }
      }
   }
])

Код приложения Spark

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
            Document.parse(
                    "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
            Document.parse(
                    "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
        @Override
        public String call(Document arg0) throws Exception {
            String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
                    arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
                    arg0.get("count").toString());
            return output;
        }
    });

    outputRdd.saveAsTextFile("/user/spark/output");

После этого я использую hdfs dfs -getmerge /user/spark/output/ output.csv и сравниваю результаты.

Почему агрегация такая медленная? Разве вызов withPipeline не предназначен для уменьшения объема данных, необходимых для передачи в Spark? Похоже, что он не выполняет ту же агрегацию, что и консоль Mongos. На консоли Mongos это молниеносно. Я использую Spark 1.6.1 и mongo-spark-connector_2.10 версии 1.1.0.

Редактировать: Еще одна вещь, о которой мне интересно, это то, что запускаются два исполнителя (потому что я использую atm с настройками выполнения по умолчанию), но только один исполнитель выполняет всю работу. Почему второй исполнитель не выполняет никакой работы?

введите здесь описание изображения

Редактировать 2: при использовании другого конвейера агрегации и вызове .count() вместо saveAsTextFile(..) также создается 242 задачи. На этот раз будет возвращено 65 000 документов. введите здесь описание изображения


person j9dy    schedule 04.11.2016    source источник
comment
Я бы больше изучил пользовательский интерфейс, чтобы попытаться понять, что такое 242 задачи. Я думаю, что с 40 документами все они поместятся в один раздел.   -  person Ross    schedule 04.11.2016
comment
Задачи @Ross 242 также создаются, когда я запускаю другой запрос и .count() на aggregatedRdd вместо сохранения в hdfs. Другой запрос возвращает несколько миллионов документов. Статистика моей коллекции: data : 15.01GiB docs : 45141000 chunks : 443. Я сомневаюсь, что запись в HDFS является проблемой. Это единственное действие, которое вызывается в моем искровом приложении, поэтому оно указано как единственный этап в веб-интерфейсе. Или я ошибаюсь?   -  person j9dy    schedule 04.11.2016
comment
@Ross Мне почему-то кажется, что конвейер агрегации не выполняется. Должен ли я специально выполнять конвейер агрегации?   -  person j9dy    schedule 04.11.2016
comment
@Ross Я только что запустил еще одну агрегацию, используя в качестве конвейера только это: Document.parse("{ $match: {ts: {$gt: ISODate(\"2016-02-22T08:30:26.000Z\"), $lte: ISODate(\"2016-02-22T08:44:35.000Z\")} } }") и снова создается 242 задачи, когда я вызываю .count() на rdd. Есть идеи, что случилось? Я добавил еще одно изображение к исходному сообщению.   -  person j9dy    schedule 04.11.2016
comment
Я ожидаю, что результат count() будет правильным, что означает, что агрегация правильно передается в MongoDB. Я думаю, что проблема будет заключаться в выборе лучшего разделителя для этой рабочей нагрузки: если вы вызовете outputRdd.partitions.size, каково значение? 242?   -  person Ross    schedule 04.11.2016
comment
Давайте продолжим обсуждение в чате.   -  person Ross    schedule 04.11.2016
comment
@Ross Я вернулся из отпуска и добавил комментарии в чат. Спасибо за попытку помочь мне.   -  person j9dy    schedule 11.11.2016


Ответы (1)


Большое количество задач вызвано стандартной стратегией секционирования Mongo Spark. Он игнорирует конвейер агрегации при расчете разделов по двум основным причинам:

  1. Снижает стоимость расчета разделов
  2. Обеспечивает одинаковое поведение для сегментированных и не сегментированных разделителей.

Однако, как вы обнаружили, они могут генерировать пустые разделы, что в вашем случае дорого обходится.

Варианты исправления могут быть следующими:

  1. Изменить стратегию разбиения

    Для выбора альтернативного разделителя, чтобы уменьшить количество разделов. Например, PaginateByCount разделит базу данных на заданное количество разделов.

    Создайте свой собственный разделитель — просто реализуйте трейт, и вы сможете применить конвейер агрегации и разделить результаты. См. HalfwayPartitioner и тест пользовательского разделителя для примера.

  2. Предварительно объедините результаты в коллекцию с помощью $out и прочитайте оттуда.

  3. Используйте coalesce(N), чтобы объединить разделы вместе и уменьшить количество разделов.
  4. Увеличьте конфигурацию spark.mongodb.input.partitionerOptions.partitionSizeMB, чтобы создать меньше разделов.

Пользовательский разделитель должен дать наилучшее решение, но есть способы лучше использовать доступные по умолчанию разделители.

Если вы считаете, что должен быть разделитель по умолчанию, который использует конвейер агрегации для расчета разделов, добавьте билет в MongoDB Проект Spark Jira.

person Ross    schedule 11.11.2016
comment
Могу ли я использовать MongoShardedPartitioner для коллекции с хешированным сегментированием? В документации сказано shardkey - The field should be indexed and contain unique values. В моем случае у меня есть комбинированный ключ сегментирования из моих полей log_file_name:day_of_timestamp:hour_of_timestamp, что приводит к сохранению соответствующих данных близко друг к другу — по крайней мере, я надеюсь, что это так. Но предварительно хешированные значения не уникальны. В документации говорится о хешированном значении? Также у меня был небольшой дополнительный вопрос о том, как использовать MongoSpark для нескольких запросов в чате — если вы не возражаете, взгляните на него. - person j9dy; 14.11.2016