ошибка записи потока PubSub в облачное хранилище с помощью Dataflow

Использование SCIO из spotify для написания задания для Dataflow, следуя 2 примерам например1 и e .g2, чтобы записать PubSub поток в GCS, но получить следующую ошибку для приведенного ниже кода

Ошибка

Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 

Код

object StreamingPubSub {
  def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()

val sc = ScioContext(opts)


sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
    _ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
  }
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))


val result = sc.close()

// CTRL-C to cancel the streaming pipeline
    dataflowUtils.waitToFinish(result.internal)
  }
}

Я, возможно, смешиваю концепцию окна с Bounded PCollection, есть ли способ добиться этого или мне нужно применить какое-то преобразование, чтобы это произошло, любой может помочь в этом




Ответы (1)


Я считаю, что SCIO saveAsTextFile внизу использует преобразование Write потока данных, которое поддерживает только ограниченные PCollections. Dataflow пока не предоставляет прямого API для записи неограниченной коллекции PCollection в Google Cloud Storage, хотя мы это исследуем.

Чтобы где-то сохранить неограниченную коллекцию PCollection, рассмотрите, например, BigQuery, Datastore или Bigtable. В API SCIO вы можете использовать, например, saveAsBigQuery.

person Davor Bonaci    schedule 05.10.2016
comment
Спасибо за быстрый ответ - person DAR; 06.10.2016