Вопрос о дизайне Spark Streaming

Я новичок в искре. Я хотел выполнить настройку потоковой передачи искры, чтобы получить пары значений ключа из файлов формата ниже:

файл: информация1

введите здесь описание изображения

Примечание. Каждый информационный файл будет содержать около 1000 таких записей. И наша система постоянно генерирует эти информационные файлы. С помощью искровой потоковой передачи я хотел сделать сопоставление номеров строк и информационных файлов и получить совокупный результат.

Можем ли мы внести данные в искровой кластер таких файлов? Меня интересуют только разделители «SF» и «DA», «SF» соответствует исходному файлу, а «DA» соответствует (номер строки, количество).

Поскольку эти входные данные не являются линейным форматом, рекомендуется ли использовать эти файлы для ввода искры или мне нужно выполнить какой-то промежуточный этап, где мне нужно очистить эти файлы для создания новых файлов, которые будут иметь информацию о каждой записи в очереди вместо блоков?

Или мы можем добиться этого в самом Spark? Каким должен быть правильный подход?

Чего я хотел добиться? Я хотел получить информацию об уровне линии. Значит, получить строку (как ключ) и информационные файлы (как значения)

Окончательный результат, который я хотел, выглядит следующим образом: line178 -> (info1, info2, info7.................)

строка 2908 -> (info3, info90, ..., ...,)

Дайте мне знать, если мое объяснение непонятно или я что-то упустил.

Спасибо и с уважением, Винти


person Vibhuti    schedule 01.02.2016    source источник


Ответы (1)


Вы могли бы сделать что-то вроде этого. Наличие вашего потока DStream:

// this gives you DA & FP lines, with the line number as the key
val validLines =  stream.map(_.split(":")).
  filter(line => Seq("DA", "FP").contains(line._1)).
  map(_._2.split(","))
  map(line => (line._1, line._2))

// now you should accumulate values
val state = validLines.updateStateByKey[Seq[String]](updateFunction _)

def updateFunction(newValues: Seq[Seq[String]], runningValues: Option[Seq[String]]): Option[Seq[String]] = {
  // add the new values 
  val newVals = runnigValues match {
    case Some(list) => list :: newValues
    case _ => newValues
  }
  Some(newVals)
}

Это должно накапливать для каждого ключа последовательность со связанными значениями, сохраняя ее в состоянии.

person rhernando    schedule 02.02.2016