Я создал поток данных, который принимает входные данные из хранилища данных и выполняет преобразование, чтобы преобразовать его в BigQuery TableRow. Я прикрепляю метку времени к каждому элементу преобразования. Затем к коллекции PCollection применяется однодневное окно. Оконный вывод записывается в раздел в таблице BigQuery с помощью BigQueryIO от Apache Beam.
Перед записью в BigQuery в качестве промежуточного шага используется перестановка с использованием случайного ключа, чтобы избежать слияния.
Поведение конвейера:
1. For 2.8 million entities in the input: Total vCPU time- 5.148 vCPU hr Time to complete job- 53 min 9 sec Current workers- 27 Target workers- 27 Job ID: 2018-04-04_04_20_34-1951473901769814139 2. For 7 million entites in the input: Total vCPU time- 247.772 vCPU hr Time to complete the job- 3 hr 45 min Current workers- 69 Target workers- 1000 Job ID: 2018-04-02_21_59_47-8636729278179820259
Я не мог понять, почему во втором случае требуется столько времени на выполнение задания и часы процессора.
Конвейер потока данных на высоком уровне:
// Read from datastore
PCollection<Entity> entities =
pipeline.apply("ReadFromDatastore",
DatastoreIO.v1().read().withProjectId(options.getProject())
.withQuery(query).withNamespace(options.getNamespace()));
// Apply processing to convert it to BigQuery TableRow
PCollection<TableRow> tableRow =
entities.apply("ConvertToTableRow", ParDo.of(new ProcessEntityFn()));
// Apply timestamp to TableRow element, and then apply windowing of one day on that
PCollection<TableRow> tableRowWindowTemp =
tableRow.apply("tableAddTimestamp", ParDo.of(new ApplyTimestampFn())).apply(
"tableApplyWindow",
Window.<TableRow> into(CalendarWindows.days(1).withTimeZone(
DateTimeZone.forID(options.getTimeZone()))));
//Apply reshuffle with random key for avoiding fusion
PCollection<TableRow> ismTableRowWindow =
tableRowWindow.apply("ReshuffleViaRandomKey",
Reshuffle.<TableRow> viaRandomKey());
// Write windowed output to BigQuery partitions
tableRowWindow.apply(
"WriteTableToBQ",
BigQueryIO
.writeTableRows()
.withSchema(BigqueryHelper.getSchema())
.to(TableRefPartition.perDay(options.getProject(),
options.getBigQueryDataset(), options.getTableName()))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));