ดำเนินการคำสั่ง SQL หลายรายการบน Spark

ฉันมีแบบสอบถาม Spark SQL ในไฟล์ test.sql -

CREATE GLOBAL TEMPORARY VIEW VIEW_1 AS select a,b from abc

CREATE GLOBAL TEMPORARY VIEW VIEW_2 AS select a,b from VIEW_1

select * from VIEW_2

ตอนนี้ ฉันเริ่ม spark-shell ของฉันแล้วลองดำเนินการแบบนี้ -

val sql = scala.io.Source.fromFile("test.sql").mkString
spark.sql(sql).show

สิ่งนี้ล้มเหลวโดยมีข้อผิดพลาดดังต่อไปนี้ -

org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<' expecting {<EOF>, 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'OR', 'AND', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 1, pos 128)

ฉันพยายามดำเนินการค้นหาเหล่านี้แบบ 1 ต่อ 1 ในคำสั่ง spark.sql ที่แตกต่างกัน และมันก็ทำงานได้ดี ปัญหาคือ ฉันมีคำถาม 6-7 รายการที่สร้างมุมมองชั่วคราว และสุดท้าย ฉันต้องการผลลัพธ์จากการดูครั้งล่าสุด มีวิธีที่ฉันสามารถเรียกใช้ SQL เหล่านี้ในคำสั่ง spark.sql เดียวได้หรือไม่ ฉันเคยทำงานกับ Postgres SQL (Redshift) และสามารถดำเนินการค้นหาประเภทนี้ได้ ใน spark sql ฉันจะต้องรักษาไฟล์จำนวนมากในกรณีนี้


person White Shadows    schedule 10.03.2018    source แหล่งที่มา


คำตอบ (1)


ปัญหาคือ mkString เชื่อมต่อบรรทัดทั้งหมดในสตริงเดียว ซึ่งไม่สามารถแยกวิเคราะห์เป็นแบบสอบถาม SQL ที่ถูกต้องได้

แต่ละบรรทัดจากไฟล์สคริปต์ควรดำเนินการเป็นการสืบค้นแยกต่างหาก ตัวอย่างเช่น:

scala.io.Source.fromFile("test.sql").getLines()
  .filterNot(_.isEmpty)  // filter out empty lines
  .foreach(query =>
    spark.sql(query).show
  )

อัปเดต

ถ้าแบบสอบถามถูกแยกออกเป็นมากกว่าหนึ่งบรรทัด กรณีจะซับซ้อนกว่านี้เล็กน้อย

เราจำเป็นต้องมีโทเค็นที่เป็นจุดสิ้นสุดของแบบสอบถามอย่างแน่นอน ปล่อยให้มันเป็นอักขระเซมิโคลอน เช่นเดียวกับใน SQL มาตรฐาน

ขั้นแรก เรารวบรวมบรรทัดที่ไม่ว่างเปล่าทั้งหมดจากไฟล์ต้นฉบับ:

val lines = scala.io.Source.fromFile(sqlFile).getLines().filterNot(_.isEmpty)

จากนั้นเราจะประมวลผลบรรทัดที่รวบรวม โดยเชื่อมแต่ละบรรทัดใหม่เข้ากับบรรทัดก่อนหน้า หากไม่ได้ลงท้ายด้วยเครื่องหมายอัฒภาค:

val queries = lines.foldLeft(List[String]()) { case(queries, line) =>
  queries match {
    case Nil => List(line) // case for the very first line
    case init :+ last =>
      if (last.endsWith(";")) {
        // if a query ended on a previous line, we simply append the new line to the list of queries
        queries :+ line.trim
      } else {
        // the query is not terminated yet, concatenate the line with the previous one
        val queryWithNextLine = last + " " + line.trim
        init :+ queryWithNextLine
      }
  }
}
person Antot    schedule 10.03.2018
comment
สิ่งนี้สามารถใช้งานได้ แต่ปัญหาคือฉันมีคำถามจำนวนมาก และโดยพื้นฐานแล้วแบบสอบถามนั้นถูกแบ่งออกเป็นบรรทัด ดังนั้นหากฉันดำเนินการข้างต้น มันจะแยกวิเคราะห์แบบสอบถามไม่ถูกต้อง มีวิธีใดที่ฉันสามารถกำหนด 1 บรรทัดที่ลงท้ายด้วยเครื่องหมายอัฒภาค - person White Shadows; 10.03.2018
comment
ฉันอัปเดตคำตอบด้วยกรณีของข้อความค้นหาที่แยกมากกว่าหนึ่งบรรทัด - person Antot; 11.03.2018
comment
ข้อผิดพลาดเล็กน้อยในรหัส มันควรจะลดลง; ด้วย. Spark ไม่รองรับอัฒภาค คำตอบที่เหลือดูดีขอบคุณ - person Prakhar; 11.03.2018
comment
@Antot ดีจริงๆ ทำไมคุณไม่ลองใช้ไฟล์ yml เพื่อกำหนดเงื่อนไขการสืบค้น...ซึ่งเราสามารถสร้างแบบสอบถามแบบไดนามิกได้ เป็นไปได้ไหม? - person BdEngineer; 31.10.2018