Нарушение потока данных вовремя для завершения задания и общее количество часов ЦП с перестановкой с помощью случайного ключа

Я создал поток данных, который принимает входные данные из хранилища данных и выполняет преобразование, чтобы преобразовать его в BigQuery TableRow. Я прикрепляю метку времени к каждому элементу преобразования. Затем к коллекции PCollection применяется однодневное окно. Оконный вывод записывается в раздел в таблице BigQuery с помощью BigQueryIO от Apache Beam.

Перед записью в BigQuery в качестве промежуточного шага используется перестановка с использованием случайного ключа, чтобы избежать слияния.

Поведение конвейера:

1. For 2.8 million entities in the input: 
Total vCPU time- 5.148 vCPU hr
Time to complete job- 53 min 9 sec
Current workers- 27 
Target workers- 27 
Job ID: 2018-04-04_04_20_34-1951473901769814139

2. For 7 million entites in the input: 
Total vCPU time- 247.772 vCPU hr
Time to complete the job- 3 hr 45 min
Current workers- 69 
Target workers- 1000
Job ID: 2018-04-02_21_59_47-8636729278179820259

Я не мог понять, почему во втором случае требуется столько времени на выполнение задания и часы процессора.

Конвейер потока данных на высоком уровне:

// Read from datastore
PCollection<Entity> entities =
        pipeline.apply("ReadFromDatastore",
                DatastoreIO.v1().read().withProjectId(options.getProject())
                        .withQuery(query).withNamespace(options.getNamespace()));

// Apply processing to convert it to BigQuery TableRow
PCollection<TableRow> tableRow =
        entities.apply("ConvertToTableRow", ParDo.of(new ProcessEntityFn()));

// Apply timestamp to TableRow element, and then apply windowing of one day on that
PCollection<TableRow> tableRowWindowTemp =
        tableRow.apply("tableAddTimestamp", ParDo.of(new ApplyTimestampFn())).apply(
                "tableApplyWindow",
                Window.<TableRow> into(CalendarWindows.days(1).withTimeZone(
                        DateTimeZone.forID(options.getTimeZone()))));

//Apply reshuffle with random key for avoiding fusion
PCollection<TableRow> ismTableRowWindow =
        tableRowWindow.apply("ReshuffleViaRandomKey",
                Reshuffle.<TableRow> viaRandomKey());

// Write windowed output to BigQuery partitions
tableRowWindow.apply(
        "WriteTableToBQ",
        BigQueryIO
                .writeTableRows()
                .withSchema(BigqueryHelper.getSchema())
                .to(TableRefPartition.perDay(options.getProject(),
                        options.getBigQueryDataset(), options.getTableName()))
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

person Ashley Thomas    schedule 05.04.2018    source источник


Ответы (1)


Я видел, что вы разместили аналогичный вопрос здесь, и теперь вы добавили в свой код шаг:

//Apply reshuffle with random key for avoiding fusion
...

Как кто-то уже сказал вам в другом вопросе:

«OOM может указывать на горячую клавишу»

В этом случае похоже, что что-то похожее все еще происходит (у вас есть дополнительная информация о проблемах с горячими клавишами здесь:

Если это так и застрял какой-то рабочий, то количество сущностей по сравнению со временем выполнения задания не обязательно должно соответствовать какой-либо линейности. И потребление vCPU должно быть больше связано с оптимизацией кода, чтобы избежать проблемы с горячими клавишами.

person VictorGGl    schedule 04.05.2018