Mengeksekusi beberapa kueri SQL di Spark

Saya memiliki kueri Spark SQL dalam file test.sql -

CREATE GLOBAL TEMPORARY VIEW VIEW_1 AS select a,b from abc

CREATE GLOBAL TEMPORARY VIEW VIEW_2 AS select a,b from VIEW_1

select * from VIEW_2

Sekarang, saya memulai spark-shell saya dan mencoba menjalankannya seperti ini -

val sql = scala.io.Source.fromFile("test.sql").mkString
spark.sql(sql).show

Ini gagal dengan kesalahan berikut -

org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<' expecting {<EOF>, 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'OR', 'AND', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 1, pos 128)

Saya mencoba menjalankan pertanyaan ini 1 per 1 dalam pernyataan spark.sql yang berbeda dan itu berjalan dengan baik. Masalahnya adalah, saya memiliki 6-7 pertanyaan yang membuat tampilan sementara dan akhirnya saya memerlukan output dari tampilan terakhir saya. Apakah ada cara di mana saya dapat menjalankan SQL ini dalam satu pernyataan spark.sql. Saya telah mengerjakan Postgres SQL (Redshift) dan mampu menjalankan pertanyaan semacam itu. Di spark sql, saya harus memelihara banyak file dalam kasus ini.


person White Shadows    schedule 10.03.2018    source sumber


Jawaban (1)


Masalahnya adalah mkString menggabungkan semua baris dalam satu string, yang tidak dapat diurai dengan benar sebagai kueri SQL yang valid.

Setiap baris dari file skrip harus dieksekusi sebagai kueri terpisah, misalnya:

scala.io.Source.fromFile("test.sql").getLines()
  .filterNot(_.isEmpty)  // filter out empty lines
  .foreach(query =>
    spark.sql(query).show
  )

Memperbarui

Jika kueri dipecah menjadi lebih dari satu baris, kasusnya sedikit lebih rumit.

Kita benar-benar perlu memiliki token yang menandai akhir dari sebuah query. Biarkan itu menjadi karakter titik koma, seperti dalam SQL standar.

Pertama, kami mengumpulkan semua baris yang tidak kosong dari file sumber:

val lines = scala.io.Source.fromFile(sqlFile).getLines().filterNot(_.isEmpty)

Kemudian kami memproses baris-baris yang dikumpulkan, menggabungkan setiap baris baru dengan baris sebelumnya, jika tidak diakhiri dengan titik koma:

val queries = lines.foldLeft(List[String]()) { case(queries, line) =>
  queries match {
    case Nil => List(line) // case for the very first line
    case init :+ last =>
      if (last.endsWith(";")) {
        // if a query ended on a previous line, we simply append the new line to the list of queries
        queries :+ line.trim
      } else {
        // the query is not terminated yet, concatenate the line with the previous one
        val queryWithNextLine = last + " " + line.trim
        init :+ queryWithNextLine
      }
  }
}
person Antot    schedule 10.03.2018
comment
Ini bisa berhasil, tetapi masalahnya adalah saya memiliki pertanyaan besar dan pada dasarnya kueri itu sendiri dibagi menjadi beberapa baris. Jadi jika saya menjalankan hal di atas, itu tidak mengurai kueri dengan benar. Apakah ada cara saya dapat mendefinisikan 1 baris yang diakhiri dengan titik koma. - person White Shadows; 10.03.2018
comment
Saya memperbarui jawabannya dengan kasus pertanyaan yang terbagi menjadi lebih dari satu baris. - person Antot; 11.03.2018
comment
Kesalahan kecil dalam kode. Seharusnya turun; juga. Spark tidak mendukung titik koma. Jawaban lainnya terlihat bagus, terima kasih. - person Prakhar; 11.03.2018
comment
@Antot sangat bagus mengapa Anda tidak mencoba menggunakan file yml yang mendefinisikan kondisi kueri...yang dengannya kita dapat membuat kueri secara dinamis? Apakah itu mungkin ? - person BdEngineer; 31.10.2018