Saya telah membuat aliran data yang mengambil masukan dari penyimpanan data dan melakukan transformasi untuk mengubahnya menjadi BigQuery TableRow. Saya melampirkan stempel waktu dengan setiap elemen dalam transformasi. Kemudian jendela satu hari diterapkan ke PCollection. Output berjendela ditulis ke partisi di tabel BigQuery menggunakan BigQueryIO Apache Beam.
Sebelum menulis ke BigQuery, BigQuery menggunakan perombakan melalui kunci acak sebagai langkah perantara untuk menghindari penggabungan.
Perilaku saluran pipa adalah:
1. For 2.8 million entities in the input: Total vCPU time- 5.148 vCPU hr Time to complete job- 53 min 9 sec Current workers- 27 Target workers- 27 Job ID: 2018-04-04_04_20_34-1951473901769814139 2. For 7 million entites in the input: Total vCPU time- 247.772 vCPU hr Time to complete the job- 3 hr 45 min Current workers- 69 Target workers- 1000 Job ID: 2018-04-02_21_59_47-8636729278179820259
Saya tidak mengerti mengapa dibutuhkan begitu banyak waktu untuk menyelesaikan pekerjaan dan jam CPU untuk kasus kedua.
Pipa aliran data pada tingkat tinggi adalah:
// Read from datastore
PCollection<Entity> entities =
pipeline.apply("ReadFromDatastore",
DatastoreIO.v1().read().withProjectId(options.getProject())
.withQuery(query).withNamespace(options.getNamespace()));
// Apply processing to convert it to BigQuery TableRow
PCollection<TableRow> tableRow =
entities.apply("ConvertToTableRow", ParDo.of(new ProcessEntityFn()));
// Apply timestamp to TableRow element, and then apply windowing of one day on that
PCollection<TableRow> tableRowWindowTemp =
tableRow.apply("tableAddTimestamp", ParDo.of(new ApplyTimestampFn())).apply(
"tableApplyWindow",
Window.<TableRow> into(CalendarWindows.days(1).withTimeZone(
DateTimeZone.forID(options.getTimeZone()))));
//Apply reshuffle with random key for avoiding fusion
PCollection<TableRow> ismTableRowWindow =
tableRowWindow.apply("ReshuffleViaRandomKey",
Reshuffle.<TableRow> viaRandomKey());
// Write windowed output to BigQuery partitions
tableRowWindow.apply(
"WriteTableToBQ",
BigQueryIO
.writeTableRows()
.withSchema(BigqueryHelper.getSchema())
.to(TableRefPartition.perDay(options.getProject(),
options.getBigQueryDataset(), options.getTableName()))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));