unggah ke kueri besar menggunakan data pesan - aliran data google

Saya menerima pesan melalui pub-sub dan ingin mengunggah ke kueri besar menggunakan data pesan untuk menentukan tabel tempat data akan diunggah.

Saya mencoba melakukan hal berikut:

Pipa saluran = Pipeline.create(opsi); String bigQueryTable;

PCollection<String> input = pipeline
        .apply(PubsubIO.Read.subscription("projects/my-data-analysis/subscriptions/myDataflowSub"));

input.apply(ParDo.of(new DoFn<String, TableRow>() {
    @Override
    public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
        JSONObject firstJSONObject = new JSONObject(c.element());
         bigQueryTable = firstJSONObject.get("tableName").toString();

         TableRow tableRow = convertJsonToTableRow(firstJSONObject);  
        c.output(tableRow);

    }

})).apply(BigQueryIO.Write.to("my-data-analysis:mydataset." + bigQueryTable).withSchema(tableSchema));

apakah ada cara melakukan ini tanpa menulis DOFN saya sendiri?

Jika saya perlu menerapkan doFn saya sendiri, bagaimana cara mengimplementasikannya untuk mengunggah ke kueri besar?


person dina    schedule 03.11.2016    source sumber
comment
Kemungkinan duplikat Menulis nilai berbeda ke tabel BigQuery berbeda di Apache Balok   -  person jkff    schedule 02.08.2017


Jawaban (1)


Saat ini hal ini tidak dapat dilakukan secara langsung, namun ada berbagai solusi yang mencakup beberapa kasus penggunaan potensial. Lihat pertanyaan terkait:

Nama tabel dinamis saat menulis ke BQ dari pipeline aliran data

Menentukan nama tabel yang dihasilkan secara dinamis berdasarkan konten baris

person jkff    schedule 03.11.2016