Я получаю сообщения через pub-sub и хочу загрузить в большой запрос, используя данные сообщения, чтобы определить, в какую таблицу загружать данные.
Я пробовал делать следующее:
Pipeline pipeline = Pipeline.create (опции); String bigQueryTable;
PCollection<String> input = pipeline
.apply(PubsubIO.Read.subscription("projects/my-data-analysis/subscriptions/myDataflowSub"));
input.apply(ParDo.of(new DoFn<String, TableRow>() {
@Override
public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
JSONObject firstJSONObject = new JSONObject(c.element());
bigQueryTable = firstJSONObject.get("tableName").toString();
TableRow tableRow = convertJsonToTableRow(firstJSONObject);
c.output(tableRow);
}
})).apply(BigQueryIO.Write.to("my-data-analysis:mydataset." + bigQueryTable).withSchema(tableSchema));
есть ли способ сделать это без написания моего собственного DOFN?
Если мне нужно реализовать свой собственный doFn, как реализовать его для загрузки в большой запрос?