Ketidaknormalan aliran data dalam waktu untuk menyelesaikan pekerjaan dan total jam CPU dengan perombakan melalui kunci acak

Saya telah membuat aliran data yang mengambil masukan dari penyimpanan data dan melakukan transformasi untuk mengubahnya menjadi BigQuery TableRow. Saya melampirkan stempel waktu dengan setiap elemen dalam transformasi. Kemudian jendela satu hari diterapkan ke PCollection. Output berjendela ditulis ke partisi di tabel BigQuery menggunakan BigQueryIO Apache Beam.

Sebelum menulis ke BigQuery, BigQuery menggunakan perombakan melalui kunci acak sebagai langkah perantara untuk menghindari penggabungan.

Perilaku saluran pipa adalah:

1. For 2.8 million entities in the input: 
Total vCPU time- 5.148 vCPU hr
Time to complete job- 53 min 9 sec
Current workers- 27 
Target workers- 27 
Job ID: 2018-04-04_04_20_34-1951473901769814139

2. For 7 million entites in the input: 
Total vCPU time- 247.772 vCPU hr
Time to complete the job- 3 hr 45 min
Current workers- 69 
Target workers- 1000
Job ID: 2018-04-02_21_59_47-8636729278179820259

Saya tidak mengerti mengapa dibutuhkan begitu banyak waktu untuk menyelesaikan pekerjaan dan jam CPU untuk kasus kedua.

Pipa aliran data pada tingkat tinggi adalah:

// Read from datastore
PCollection<Entity> entities =
        pipeline.apply("ReadFromDatastore",
                DatastoreIO.v1().read().withProjectId(options.getProject())
                        .withQuery(query).withNamespace(options.getNamespace()));

// Apply processing to convert it to BigQuery TableRow
PCollection<TableRow> tableRow =
        entities.apply("ConvertToTableRow", ParDo.of(new ProcessEntityFn()));

// Apply timestamp to TableRow element, and then apply windowing of one day on that
PCollection<TableRow> tableRowWindowTemp =
        tableRow.apply("tableAddTimestamp", ParDo.of(new ApplyTimestampFn())).apply(
                "tableApplyWindow",
                Window.<TableRow> into(CalendarWindows.days(1).withTimeZone(
                        DateTimeZone.forID(options.getTimeZone()))));

//Apply reshuffle with random key for avoiding fusion
PCollection<TableRow> ismTableRowWindow =
        tableRowWindow.apply("ReshuffleViaRandomKey",
                Reshuffle.<TableRow> viaRandomKey());

// Write windowed output to BigQuery partitions
tableRowWindow.apply(
        "WriteTableToBQ",
        BigQueryIO
                .writeTableRows()
                .withSchema(BigqueryHelper.getSchema())
                .to(TableRefPartition.perDay(options.getProject(),
                        options.getBigQueryDataset(), options.getTableName()))
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

person Ashley Thomas    schedule 05.04.2018    source sumber


Jawaban (1)


Saya melihat Anda memposting pertanyaan serupa di sini, dan sekarang Anda telah menambahkan langkah ke kode Anda:

//Apply reshuffle with random key for avoiding fusion
...

Seperti yang sudah dikatakan seseorang kepada Anda di pertanyaan lain:

"OOM mungkin merupakan gejala hot key"

Jadi dalam kasus ini sepertinya hal serupa masih terjadi (Anda memiliki informasi lebih lanjut tentang masalah Hot Key di sini:

Jika hal ini terjadi dan ada beberapa pekerja yang terjebak, maka jumlah entitas vs. waktu untuk menyelesaikan pekerjaan tidak harus mengikuti linearitas apa pun. Dan konsumsi vCPU seharusnya lebih merupakan masalah pengoptimalan kode untuk menghindari masalah hot key.

person VictorGGl    schedule 04.05.2018