แผ่นโกง PySpark สำหรับวิศวกรข้อมูลมือใหม่

Buddy เป็นวิศวกรข้อมูลมือใหม่ที่เพิ่งพบกับ Spark ซึ่งเป็นเฟรมเวิร์กการประมวลผลข้อมูลขนาดใหญ่ยอดนิยม

เมื่อพิจารณาถึงข้อเท็จจริงที่ว่า Spark ได้รับการผสานรวมเข้ากับแพลตฟอร์มข้อมูลบนคลาวด์อย่าง Azure, AWS และ GCP Buddy ได้อย่างราบรื่น ตอนนี้ได้ตระหนักถึงความแน่นอนที่มีอยู่แล้ว สิ่งนี้ได้ผลักดัน Buddy ให้เริ่มต้นการเดินทาง Spark ของเขาอย่างรวดเร็ว โดยจัดการกับแบบฝึกหัดที่ไม่สำคัญที่สุดในวงจรชีวิตการประมวลผลข้อมูลขนาดใหญ่ - "การอ่านและการเขียนข้อมูล"

TL;DR

บัดดี้เต็มไปด้วยงานและจิตใจที่ใจร้อนของเขาตัดสินใจอย่างเป็นเอกฉันท์ว่าจะใช้ทางลัดด้วยเอกสารโกงต่อไปนี้โดยใช้ Python

TS; WM

เมื่อมองย้อนกลับไป บัดดี้เห็นว่าจำเป็นต้องทำใจกับจิตใจที่ใจร้อนของเขา ทางลัดได้รับการพิสูจน์แล้วว่ามีประสิทธิภาพ แต่ต้องใช้เวลาจำนวนมากในการแก้ไขข้อผิดพลาดเล็กๆ น้อยๆ และจัดการกับพฤติกรรมที่ไม่ชัดเจน

ถึงเวลาจัดการรายละเอียดแล้ว

การอ่านและเขียนข้อมูลใน Spark ถือเป็นงานเล็กๆ น้อยๆ โดยส่วนใหญ่แล้วจะเป็นขั้นตอนเริ่มต้นของการประมวลผล Big Data ทุกรูปแบบ Buddy ต้องการทราบไวยากรณ์หลักสำหรับการอ่านและเขียนข้อมูลก่อนที่จะพูดถึงข้อมูลเฉพาะ

ไวยากรณ์หลักสำหรับการอ่านข้อมูลใน Apache Spark

DataFrameReader.format(…).option(“key”, “value”).schema(…).load()

DataFrameReaderเป็นรากฐานสำหรับการอ่านข้อมูลใน Spark ซึ่งสามารถเข้าถึงได้ผ่านแอตทริบิวต์ spark.read

  • รูปแบบ — ระบุรูปแบบไฟล์ในรูปแบบ CSV, JSON หรือปาร์เก้ ค่าเริ่มต้นคือไม้ปาร์เก้
  • ตัวเลือก - ชุดของการกำหนดค่าคีย์-ค่าเพื่อกำหนดพารามิเตอร์วิธีการอ่านข้อมูล
  • สคีมา — ตัวเลือกหนึ่งที่ใช้ระบุว่าคุณต้องการอนุมานสคีมาจากแหล่งข้อมูลหรือไม่

โหมดการอ่าน — บ่อยครั้งในขณะที่อ่านข้อมูลจากแหล่งภายนอก เราพบข้อมูลที่เสียหาย โหมดการอ่านจะสั่งให้ Spark จัดการกับข้อมูลที่เสียหายในลักษณะเฉพาะ

โหมดการอ่านทั่วไปมี 3 โหมด และโหมดการอ่านเริ่มต้นคือ อนุญาต

  • อนุญาต — ฟิลด์ทั้งหมดถูกตั้งค่าเป็นโมฆะและบันทึกที่เสียหายจะถูกวางไว้ในคอลัมน์สตริงที่เรียกว่า _corrupt_record
  • dropMalformed — ปล่อยแถวทั้งหมดที่มีบันทึกที่เสียหาย
  • failedFast - ล้มเหลวเมื่อพบบันทึกที่เสียหาย

ไวยากรณ์หลักสำหรับการเขียนข้อมูลใน Apache Spark

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()

รากฐานสำหรับการเขียนข้อมูลใน Spark คือ DataFrameWriter ซึ่งเข้าถึงได้ต่อ DataFrame โดยใช้แอตทริบิวต์ dataFrame.write

โหมดบันทึก — ระบุสิ่งที่จะเกิดขึ้นหาก Spark พบข้อมูลที่ปลายทางแล้ว

มีโหมดบันทึกทั่วไป 4 โหมดและโหมดเริ่มต้นคือ errorIfExists

  • ผนวก - ผนวกข้อมูลที่ส่งออกไปยังไฟล์ที่มีอยู่แล้ว
  • เขียนทับ — เขียนทับข้อมูลใดๆ ที่ปลายทางโดยสมบูรณ์
  • errorIfExists — Spark ส่งข้อผิดพลาดหากมีข้อมูลอยู่ที่ปลายทางแล้ว
  • ละเว้น — หากมีข้อมูลอยู่ ไม่ต้องทำอะไรกับ dataFrame

นั่นเป็นไพรเมอร์ที่ยอดเยี่ยม! ตอนนี้บัดดี้ดูเหมือนจะเข้าใจเหตุผลเบื้องหลังข้อผิดพลาดที่ทรมานเขาแล้ว เขาต้องการขยายความรู้นี้โดยเจาะลึกประเภทไฟล์ที่พบบ่อยบางประเภทและวิธีจัดการกับไฟล์เหล่านั้น

ไฟล์ CSV

จะอ่านจากไฟล์ CSV ได้อย่างไร?

หากต้องการอ่านไฟล์ CSV คุณต้องสร้าง DataFrameReader ก่อน และตั้งค่าตัวเลือกจำนวนหนึ่ง

df=spark.read.format("csv").option("header","true").load(filePath)

ที่นี่เราโหลดไฟล์ CSV และบอก Spark ว่าไฟล์นั้นมีแถวส่วนหัว ขั้นตอนนี้รับประกันว่าจะทริกเกอร์งาน Spark

งานจุดประกาย:บล็อกของการคำนวณแบบขนานที่ดำเนินงานบางอย่าง

งานจะถูกทริกเกอร์ทุกครั้งที่เราจำเป็นต้องสัมผัสข้อมูลทางกายภาพ ในกรณีนี้ DataFrameReaderจะต้องดูที่บรรทัดแรกของไฟล์เพื่อดูว่าเรามีข้อมูลกี่คอลัมน์ในไฟล์

เมื่ออ่านข้อมูล คุณจะต้องคำนึงถึงโอเวอร์เฮดของประเภทข้อมูลเสมอ มีสองวิธีในการจัดการสิ่งนี้ใน Spark, InferSchema หรือสคีมาที่ผู้ใช้กำหนด

การอ่าน CSV โดยใช้ InferSchema

df=spark.read.format("csv").option("inferSchema","true").load(filePath)

ตัวเลือก inferSchema บอกให้ผู้อ่านอนุมานประเภทข้อมูลจากไฟล์ต้นฉบับ ซึ่งส่งผลให้มีการส่งผ่านไฟล์เพิ่มเติมส่งผลให้งาน Spark สองงานถูกทริกเกอร์ เป็นการดำเนินการที่มีราคาแพงเนื่องจาก Spark ต้องผ่านไฟล์ CSV โดยอัตโนมัติและอนุมานสคีมาสำหรับแต่ละคอลัมน์

การอ่าน CSV โดยใช้ Schema ที่ผู้ใช้กำหนด

ตัวเลือกที่ต้องการในขณะที่อ่านไฟล์ใดๆ ก็คือบังคับใช้สคีมาที่กำหนดเอง ซึ่งช่วยให้มั่นใจว่าประเภทข้อมูลจะสอดคล้องกันและหลีกเลี่ยงพฤติกรรมที่ไม่คาดคิด

ในการทำเช่นนั้น คุณต้องประกาศสคีมาที่จะบังคับใช้ก่อน จากนั้นจึงอ่านข้อมูลโดยตั้งค่าตัวเลือก schema

csvSchema = StructType([StructField(“id",IntegerType(),False)])
df=spark.read.format("csv").schema(csvSchema).load(filePath)

ผลจากการกำหนดสคีมาล่วงหน้าสำหรับข้อมูลของคุณ คุณจึงหลีกเลี่ยงการทริกเกอร์งานใดๆ ได้ Spark ไม่เห็นความจำเป็นที่จะต้องดูไฟล์เนื่องจากเราดูแลสคีมาแล้ว สิ่งนี้เรียกว่า การประเมินแบบ Lazy ซึ่งเป็นเทคนิคการปรับให้เหมาะสมที่สำคัญใน Spark

จะเขียนข้อมูล CSV ได้อย่างไร?

การเขียนข้อมูลใน Spark นั้นค่อนข้างง่าย เนื่องจากเรากำหนดไว้ในไวยากรณ์หลักเพื่อเขียนข้อมูล เราจำเป็นต้องมี dataFrame ที่มีข้อมูลจริงอยู่ในนั้น ซึ่งเราสามารถเข้าถึง DataFrameWriter ได้

df.write.format("csv").mode("overwrite).save(outputPath/file.csv)

ที่นี่เราเขียนเนื้อหาของกรอบข้อมูลลงในไฟล์ CSV การตั้งค่าโหมดเขียนเป็นเขียนทับจะเขียนทับข้อมูลใดๆ ที่มีอยู่แล้วในปลายทางโดยสมบูรณ์

สิ่งที่คุณคาดหวังจากคำสั่งก่อนหน้านี้คือไฟล์ CSV ไฟล์เดียว อย่างไรก็ตาม คุณจะเห็นว่าไฟล์ที่คุณตั้งใจจะเขียนนั้นเป็นโฟลเดอร์ที่มีไฟล์จำนวนมากอยู่ภายใน สิ่งนี้ได้รับการยืนยันเพิ่มเติมโดยการดูเนื้อหาของ outputPath

%fs ls /outputPath/file.csv

นี่เป็นส่วนสำคัญของกลไกแบบกระจาย Spark และสะท้อนถึงจำนวน พาร์ติชัน ใน dataFrame ของเรา ณ เวลาที่เราเขียนออกมา จำนวนไฟล์ที่สร้างขึ้นจะแตกต่างกันหากเราแบ่งพาร์ติชัน dataFrame ใหม่ก่อนที่จะเขียนออกมา

การแบ่งพาร์ติชันหมายถึงการแบ่งชุดข้อมูลขนาดใหญ่ออกเป็นส่วนเล็กๆ (พาร์ติชัน) ใน Spark เป็นหน่วยพื้นฐานของความขนานและช่วยให้คุณสามารถควบคุมตำแหน่งที่จะจัดเก็บข้อมูลในขณะที่คุณเขียนข้อมูลได้

ไฟล์ JSON

จะอ่านจากไฟล์ JSON ได้อย่างไร

การอ่าน JSON ไม่ได้แตกต่างจากการอ่านไฟล์ CSV มากนัก คุณสามารถอ่านโดยใช้ inferSchema หรือโดยการกำหนดสคีมาของคุณเอง

df=spark.read.format("json").option("inferSchema”,"true").load(filePath)

ที่นี่เราอ่านไฟล์ JSON โดยขอให้ Spark สรุปสคีมา เราต้องการงานเดียวเท่านั้นแม้จะอนุมานสคีมาเพราะไม่มีส่วนหัวใน JSON ชื่อคอลัมน์จะดึงมาจากแอตทริบิวต์ของออบเจ็กต์ JSON

เพื่อรักษาความสอดคล้องกัน เราสามารถกำหนดสคีมาที่จะนำไปใช้กับข้อมูล JSON ที่กำลังอ่านได้เสมอ

jsonSchema = StructType([...])
df=spark.read.format("json").schema(jsonSchema).load(filePath)

โปรดจำไว้ว่าไฟล์ JSON สามารถซ้อนกันได้ และสำหรับไฟล์ขนาดเล็กที่สร้างสคีมาด้วยตนเองอาจไม่คุ้มกับความพยายาม แต่สำหรับไฟล์ขนาดใหญ่ นี่เป็นตัวเลือกที่ดีกว่า เมื่อเทียบกับไฟล์ที่ ยาว และ กระบวนการ สรุปสคีมา >ราคาแพง

จะเขียนลงไฟล์ JSON ได้อย่างไร?

ตามที่คุณคาดหวังการเขียนลงในไฟล์ JSON จะเหมือนกับไฟล์ CSV

df.write.format("json").mode("overwrite).save(outputPath/file.json)

เช่นเดียวกับการเขียนลงใน CSV ชุดข้อมูลจะถูกแบ่งออกเป็นหลายไฟล์ตามจำนวนพาร์ติชันใน dataFrame

ไฟล์ปาร์เก้

Apache Parquet เป็นรูปแบบการจัดเก็บข้อมูลแบบเรียงเป็นแนว ฟรีและเป็นโอเพ่นซอร์สซึ่งให้การบีบอัดข้อมูลที่มีประสิทธิภาพ และมีบทบาทสำคัญในการประมวลผล Spark Big Data

จะอ่านข้อมูลจากไฟล์ Parquet ได้อย่างไร

ต่างจากไฟล์ CSV และ JSON ตรงที่ “ไฟล์” ของ Parquet คือชุดของไฟล์ซึ่งส่วนใหญ่ประกอบด้วยข้อมูลจริงและไฟล์บางไฟล์ที่ประกอบด้วยข้อมูลเมตา

หากต้องการอ่านไฟล์ไม้ปาร์เก้ เราสามารถใช้ไวยากรณ์รูปแบบต่างๆ ดังที่แสดงด้านล่าง ซึ่งทั้งสองรูปแบบมีการดำเนินการเหมือนกัน

#option1
df=spark.read.format("parquet).load(parquetDirectory)
#option2
df=spark.read.parquet(parquetDirectory)

ดังที่คุณสังเกตเห็นว่าเราไม่จำเป็นต้องระบุสคีมาประเภทใด ชื่อคอลัมน์และประเภทข้อมูลจะถูกจัดเก็บไว้ในไฟล์ปาร์เก้เอง

กระบวนการอนุมานสคีมาไม่แพงเท่ากับ CSV และ JSON เนื่องจากโปรแกรมอ่าน Parquet จำเป็นต้องประมวลผลเฉพาะไฟล์ข้อมูลเมตาขนาดเล็กเท่านั้น เพื่ออนุมานสคีมาโดยปริยาย แทนที่จะอนุมานทั้งไฟล์

จะเขียนข้อมูลลงไฟล์ Parquet ได้อย่างไร?

การเขียนไม้ปาร์เก้เป็นเรื่องง่ายเหมือนกับการอ่าน เพียงระบุตำแหน่งสำหรับไฟล์ที่จะเขียน

df.write.format(“parquet").mode("overwrite").save("outputPath")

ใช้กฎการแบ่งพาร์ติชันเดียวกันกับที่เรากำหนดไว้สำหรับ CSV และ JSON ที่นี่

เดลต้า

บัดดี้ไม่เคยได้ยินเรื่องนี้มาก่อน ดูเหมือนเป็นแนวคิดที่ค่อนข้างใหม่ สมควรได้รับพื้นหลังเล็กน้อย

Delta Lake เป็นโครงการที่ริเริ่มโดย Databricks ซึ่งปัจจุบันเป็นโอเพ่นซอร์ส Delta Lake เป็นเลเยอร์การจัดเก็บข้อมูลแบบโอเพ่นซอร์สที่ช่วยคุณสร้าง Data Lake ที่ประกอบด้วยตารางอย่างน้อย 1 ตารางในรูปแบบ Delta Lake

ซึ่งเป็นรูปแบบเปิดที่ใช้ Parquet ซึ่งนำธุรกรรม ACID มาสู่ Data Lake และคุณสมบัติที่มีประโยชน์อื่นๆ ที่มุ่งปรับปรุงความน่าเชื่อถือ คุณภาพ และประสิทธิภาพของ Data Lake ที่มีอยู่

เพื่อให้เข้าใจวิธีการอ่านจากรูปแบบเดลต้า การสร้างไฟล์เดลต้าก่อนจึงจะเหมาะสม

จะเขียนข้อมูลในรูปแบบเดลต้าได้อย่างไร?

ในการสร้างไฟล์เดลต้า คุณต้องมี dataFrame พร้อมข้อมูลบางส่วนที่จะเขียน เมื่อคุณมีสิ่งนี้แล้ว การสร้างเดลต้าก็ทำได้ง่ายเหมือนกับการเปลี่ยนประเภทไฟล์ขณะเขียน แทนที่จะเป็น parquet เพียงพูด delta

someDataFrame.write.format(“delta").partitionBy("someColumn").save(path)

จะอ่านข้อมูลจากรูปแบบเดลต้าได้อย่างไร

หากมีไฟล์ Delta อยู่แล้ว คุณสามารถเรียกใช้การสืบค้นได้โดยตรงโดยใช้ Spark SQL บนไดเร็กทอรีของ delta โดยใช้ไวยากรณ์ต่อไปนี้:

SELECT * FROM delta. `/path/to/delta_directory`

ในกรณีส่วนใหญ่ คุณจะต้องการสร้างตารางโดยใช้ไฟล์เดลต้าและดำเนินการกับตารางโดยใช้ SQL สัญกรณ์คือ : CREATE TABLE USING DELTA LOCATION

spark.sql(""" DROP TABLE IF EXISTS delta_table_name""")
spark.sql(""" CREATE TABLE delta_table_name USING DELTA LOCATION '{}' """.format(/path/to/delta_directory))

สิ่งนี้เรียกว่า "ตารางที่ไม่มีการจัดการ" ใน Spark SQL ตอนนี้ทำหน้าที่เป็นอินเทอร์เฟซระหว่าง Spark และข้อมูลในชั้นจัดเก็บข้อมูล การเปลี่ยนแปลงใดๆ ที่ทำกับตารางนี้จะปรากฏในไฟล์และในทางกลับกัน เมื่อสร้างตารางแล้ว คุณจะสามารถสืบค้นได้เหมือนกับตาราง SQL อื่นๆ

นอกเหนือจากการเขียน dataFrame เป็นรูปแบบเดลต้าแล้ว เรายังสามารถดำเนินการแบบกลุ่มอื่นๆ ได้ เช่น ผนวกและผสานบนตารางเดลต้า ซึ่งเป็นการดำเนินการเล็กๆ น้อยๆ บางอย่างในไปป์ไลน์การประมวลผลข้อมูลขนาดใหญ่

บทสรุป

ในบทความนี้ บัดดี้ได้เรียนรู้

  1. วิธีอ่านและเขียนข้อมูลด้วย Apache Spark
  2. วิธีจัดการรูปแบบไฟล์เฉพาะของ Big Data เช่น Apache Parquet และรูปแบบ Delta

รายละเอียดควบคู่ไปกับเอกสารโกงช่วยให้บัดดี้หลีกเลี่ยงปัญหาทั้งหมดได้

Spark ทำอะไรได้อีกมากมาย และเรารู้ว่า Buddy จะไม่หยุดอยู่แค่นั้น!

หากคุณต้องการให้บริการโมเดล ML โดยใช้ Spark นี่คือ "บทช่วยสอนปลายด้าน Spark ที่น่าสนใจ" ที่ฉันพบว่าค่อนข้างมีข้อมูลเชิงลึก ยกนิ้วให้ถ้าคุณชอบมันเหมือนกัน!

อ้างอิง

  1. Databricks — https://databricks.com/spark/getting-started-with-apache-spark
  2. เอกสาร Spark — https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
  3. Spark The Definitive Guide — https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/