загрузка в большой запрос с использованием данных сообщения - поток данных google

Я получаю сообщения через pub-sub и хочу загрузить в большой запрос, используя данные сообщения, чтобы определить, в какую таблицу загружать данные.

Я пробовал делать следующее:

Pipeline pipeline = Pipeline.create (опции); String bigQueryTable;

PCollection<String> input = pipeline
        .apply(PubsubIO.Read.subscription("projects/my-data-analysis/subscriptions/myDataflowSub"));

input.apply(ParDo.of(new DoFn<String, TableRow>() {
    @Override
    public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
        JSONObject firstJSONObject = new JSONObject(c.element());
         bigQueryTable = firstJSONObject.get("tableName").toString();

         TableRow tableRow = convertJsonToTableRow(firstJSONObject);  
        c.output(tableRow);

    }

})).apply(BigQueryIO.Write.to("my-data-analysis:mydataset." + bigQueryTable).withSchema(tableSchema));

есть ли способ сделать это без написания моего собственного DOFN?

Если мне нужно реализовать свой собственный doFn, как реализовать его для загрузки в большой запрос?


person dina    schedule 03.11.2016    source источник
comment
Возможный дубликат Запись разных значений в разные таблицы BigQuery в Apache Луч   -  person jkff    schedule 02.08.2017


Ответы (1)


Прямо сейчас это невозможно, но есть различные обходные пути, охватывающие некоторые потенциальные варианты использования. См. Связанные вопросы:

Имя динамической таблицы при записи в BQ из конвейеры потока данных

Указание имени динамически сгенерированной таблицы на основе содержимого строки

person jkff    schedule 03.11.2016