Чтение большого файла с помощью Akka Streams

Я пробую Akka Streams, и вот короткий фрагмент, который у меня есть:

  override def main(args: Array[String]) {
    val filePath = "/Users/joe/Softwares/data/FoodFacts.csv"//args(0)

    val file = new File(filePath)
    println(file.getAbsolutePath)
    // read 1MB of file as a stream
    val fileSource = SynchronousFileSource(file, 1 * 1024 * 1024)
    val shaFlow = fileSource.map(chunk => {
      println(s"the string obtained is ${chunk.toString}")
    })
    shaFlow.to(Sink.foreach(println(_))).run // fails with a null pointer

    def sha256(s: String) = {
      val  messageDigest = MessageDigest.getInstance("SHA-256")
      messageDigest.digest(s.getBytes("UTF-8"))
    }
  }

Когда я запустил этот фрагмент, я получаю:

Exception in thread "main" java.lang.NullPointerException
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:365)
    at com.test.api.consumer.DataScienceBoot$.main(DataScienceBoot.scala:30)
    at com.test.api.consumer.DataScienceBoot.main(DataScienceBoot.scala)

Мне кажется, что это не fileSource просто пустой? Почему это? Любые идеи? FoodFacts.csv имеет размер 40 МБ, и все, что я пытаюсь сделать, это создать поток данных размером 1 МБ!

Даже использование значения ChunkSize по умолчанию, равного 8192, не сработало!


person joesan    schedule 05.02.2016    source источник
comment
Какую версию потоков akka вы используете? Я думаю, что SynchronousFileSource теперь устарел   -  person Jatin    schedule 05.02.2016
comment
Я использую 1.0. Какой из них я должен использовать для чтения огромного файла и передачи фрагментов в виде потока? Любые подсказки?   -  person joesan    schedule 05.02.2016


Ответы (1)


Ну 1.0 устарел. И если можете, используйте 2.x.

Когда я пытаюсь использовать версию 2.0.1, используя FileIO.fromFile(file) вместо SynchronousFileSource, возникает ошибка компиляции с сообщением fails with null pointer. Это было просто потому, что у него не было ActorMaterializer в области видимости. В том числе это заставляет его работать:

object TImpl extends App {
import java.io.File

  implicit val system = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  val file = new File("somefile.csv")
  val fileSource = FileIO.fromFile(file,1 * 1024 * 1024 )
  val shaFlow: Source[String, Future[Long]] = fileSource.map(chunk => {
    s"the string obtained is ${chunk.toString()}"
  })

  shaFlow.runForeach(println(_))    
}

Это работает для файла любого размера. Для получения дополнительной информации о настройке диспетчера см. здесь.

person Jatin    schedule 05.02.2016