แผ่นโกง PySpark สำหรับวิศวกรข้อมูลมือใหม่
Buddy เป็นวิศวกรข้อมูลมือใหม่ที่เพิ่งพบกับ Spark ซึ่งเป็นเฟรมเวิร์กการประมวลผลข้อมูลขนาดใหญ่ยอดนิยม
เมื่อพิจารณาถึงข้อเท็จจริงที่ว่า Spark ได้รับการผสานรวมเข้ากับแพลตฟอร์มข้อมูลบนคลาวด์อย่าง Azure, AWS และ GCP Buddy ได้อย่างราบรื่น ตอนนี้ได้ตระหนักถึงความแน่นอนที่มีอยู่แล้ว สิ่งนี้ได้ผลักดัน Buddy ให้เริ่มต้นการเดินทาง Spark ของเขาอย่างรวดเร็ว โดยจัดการกับแบบฝึกหัดที่ไม่สำคัญที่สุดในวงจรชีวิตการประมวลผลข้อมูลขนาดใหญ่ - "การอ่านและการเขียนข้อมูล"
TL;DR
บัดดี้เต็มไปด้วยงานและจิตใจที่ใจร้อนของเขาตัดสินใจอย่างเป็นเอกฉันท์ว่าจะใช้ทางลัดด้วยเอกสารโกงต่อไปนี้โดยใช้ Python
TS; WM
เมื่อมองย้อนกลับไป บัดดี้เห็นว่าจำเป็นต้องทำใจกับจิตใจที่ใจร้อนของเขา ทางลัดได้รับการพิสูจน์แล้วว่ามีประสิทธิภาพ แต่ต้องใช้เวลาจำนวนมากในการแก้ไขข้อผิดพลาดเล็กๆ น้อยๆ และจัดการกับพฤติกรรมที่ไม่ชัดเจน
ถึงเวลาจัดการรายละเอียดแล้ว
การอ่านและเขียนข้อมูลใน Spark ถือเป็นงานเล็กๆ น้อยๆ โดยส่วนใหญ่แล้วจะเป็นขั้นตอนเริ่มต้นของการประมวลผล Big Data ทุกรูปแบบ Buddy ต้องการทราบไวยากรณ์หลักสำหรับการอ่านและเขียนข้อมูลก่อนที่จะพูดถึงข้อมูลเฉพาะ
ไวยากรณ์หลักสำหรับการอ่านข้อมูลใน Apache Spark
DataFrameReader.format(…).option(“key”, “value”).schema(…).load()
DataFrameReaderเป็นรากฐานสำหรับการอ่านข้อมูลใน Spark ซึ่งสามารถเข้าถึงได้ผ่านแอตทริบิวต์ spark.read
- รูปแบบ — ระบุรูปแบบไฟล์ในรูปแบบ CSV, JSON หรือปาร์เก้ ค่าเริ่มต้นคือไม้ปาร์เก้
- ตัวเลือก - ชุดของการกำหนดค่าคีย์-ค่าเพื่อกำหนดพารามิเตอร์วิธีการอ่านข้อมูล
- สคีมา — ตัวเลือกหนึ่งที่ใช้ระบุว่าคุณต้องการอนุมานสคีมาจากแหล่งข้อมูลหรือไม่
โหมดการอ่าน — บ่อยครั้งในขณะที่อ่านข้อมูลจากแหล่งภายนอก เราพบข้อมูลที่เสียหาย โหมดการอ่านจะสั่งให้ Spark จัดการกับข้อมูลที่เสียหายในลักษณะเฉพาะ
โหมดการอ่านทั่วไปมี 3 โหมด และโหมดการอ่านเริ่มต้นคือ อนุญาต
- อนุญาต — ฟิลด์ทั้งหมดถูกตั้งค่าเป็นโมฆะและบันทึกที่เสียหายจะถูกวางไว้ในคอลัมน์สตริงที่เรียกว่า
_corrupt_record
- dropMalformed — ปล่อยแถวทั้งหมดที่มีบันทึกที่เสียหาย
- failedFast - ล้มเหลวเมื่อพบบันทึกที่เสียหาย
ไวยากรณ์หลักสำหรับการเขียนข้อมูลใน Apache Spark
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()
รากฐานสำหรับการเขียนข้อมูลใน Spark คือ DataFrameWriter ซึ่งเข้าถึงได้ต่อ DataFrame โดยใช้แอตทริบิวต์ dataFrame.write
โหมดบันทึก — ระบุสิ่งที่จะเกิดขึ้นหาก Spark พบข้อมูลที่ปลายทางแล้ว
มีโหมดบันทึกทั่วไป 4 โหมดและโหมดเริ่มต้นคือ errorIfExists
- ผนวก - ผนวกข้อมูลที่ส่งออกไปยังไฟล์ที่มีอยู่แล้ว
- เขียนทับ — เขียนทับข้อมูลใดๆ ที่ปลายทางโดยสมบูรณ์
- errorIfExists — Spark ส่งข้อผิดพลาดหากมีข้อมูลอยู่ที่ปลายทางแล้ว
- ละเว้น — หากมีข้อมูลอยู่ ไม่ต้องทำอะไรกับ dataFrame
นั่นเป็นไพรเมอร์ที่ยอดเยี่ยม! ตอนนี้บัดดี้ดูเหมือนจะเข้าใจเหตุผลเบื้องหลังข้อผิดพลาดที่ทรมานเขาแล้ว เขาต้องการขยายความรู้นี้โดยเจาะลึกประเภทไฟล์ที่พบบ่อยบางประเภทและวิธีจัดการกับไฟล์เหล่านั้น
ไฟล์ CSV
จะอ่านจากไฟล์ CSV ได้อย่างไร?
หากต้องการอ่านไฟล์ CSV คุณต้องสร้าง DataFrameReader ก่อน และตั้งค่าตัวเลือกจำนวนหนึ่ง
df=spark.read.format("csv").option("header","true").load(filePath)
ที่นี่เราโหลดไฟล์ CSV และบอก Spark ว่าไฟล์นั้นมีแถวส่วนหัว ขั้นตอนนี้รับประกันว่าจะทริกเกอร์งาน Spark
งานจุดประกาย:บล็อกของการคำนวณแบบขนานที่ดำเนินงานบางอย่าง
งานจะถูกทริกเกอร์ทุกครั้งที่เราจำเป็นต้องสัมผัสข้อมูลทางกายภาพ ในกรณีนี้ DataFrameReaderจะต้องดูที่บรรทัดแรกของไฟล์เพื่อดูว่าเรามีข้อมูลกี่คอลัมน์ในไฟล์
เมื่ออ่านข้อมูล คุณจะต้องคำนึงถึงโอเวอร์เฮดของประเภทข้อมูลเสมอ มีสองวิธีในการจัดการสิ่งนี้ใน Spark, InferSchema หรือสคีมาที่ผู้ใช้กำหนด
การอ่าน CSV โดยใช้ InferSchema
df=spark.read.format("csv").option("inferSchema","true").load(filePath)
ตัวเลือก inferSchema
บอกให้ผู้อ่านอนุมานประเภทข้อมูลจากไฟล์ต้นฉบับ ซึ่งส่งผลให้มีการส่งผ่านไฟล์เพิ่มเติมส่งผลให้งาน Spark สองงานถูกทริกเกอร์ เป็นการดำเนินการที่มีราคาแพงเนื่องจาก Spark ต้องผ่านไฟล์ CSV โดยอัตโนมัติและอนุมานสคีมาสำหรับแต่ละคอลัมน์
การอ่าน CSV โดยใช้ Schema ที่ผู้ใช้กำหนด
ตัวเลือกที่ต้องการในขณะที่อ่านไฟล์ใดๆ ก็คือบังคับใช้สคีมาที่กำหนดเอง ซึ่งช่วยให้มั่นใจว่าประเภทข้อมูลจะสอดคล้องกันและหลีกเลี่ยงพฤติกรรมที่ไม่คาดคิด
ในการทำเช่นนั้น คุณต้องประกาศสคีมาที่จะบังคับใช้ก่อน จากนั้นจึงอ่านข้อมูลโดยตั้งค่าตัวเลือก schema
csvSchema = StructType([StructField(“id",IntegerType(),False)])
df=spark.read.format("csv").schema(csvSchema).load(filePath)
ผลจากการกำหนดสคีมาล่วงหน้าสำหรับข้อมูลของคุณ คุณจึงหลีกเลี่ยงการทริกเกอร์งานใดๆ ได้ Spark ไม่เห็นความจำเป็นที่จะต้องดูไฟล์เนื่องจากเราดูแลสคีมาแล้ว สิ่งนี้เรียกว่า การประเมินแบบ Lazy ซึ่งเป็นเทคนิคการปรับให้เหมาะสมที่สำคัญใน Spark
จะเขียนข้อมูล CSV ได้อย่างไร?
การเขียนข้อมูลใน Spark นั้นค่อนข้างง่าย เนื่องจากเรากำหนดไว้ในไวยากรณ์หลักเพื่อเขียนข้อมูล เราจำเป็นต้องมี dataFrame ที่มีข้อมูลจริงอยู่ในนั้น ซึ่งเราสามารถเข้าถึง DataFrameWriter ได้
df.write.format("csv").mode("overwrite).save(outputPath/file.csv)
ที่นี่เราเขียนเนื้อหาของกรอบข้อมูลลงในไฟล์ CSV การตั้งค่าโหมดเขียนเป็นเขียนทับจะเขียนทับข้อมูลใดๆ ที่มีอยู่แล้วในปลายทางโดยสมบูรณ์
สิ่งที่คุณคาดหวังจากคำสั่งก่อนหน้านี้คือไฟล์ CSV ไฟล์เดียว อย่างไรก็ตาม คุณจะเห็นว่าไฟล์ที่คุณตั้งใจจะเขียนนั้นเป็นโฟลเดอร์ที่มีไฟล์จำนวนมากอยู่ภายใน สิ่งนี้ได้รับการยืนยันเพิ่มเติมโดยการดูเนื้อหาของ outputPath
%fs ls /outputPath/file.csv
นี่เป็นส่วนสำคัญของกลไกแบบกระจาย Spark และสะท้อนถึงจำนวน พาร์ติชัน ใน dataFrame ของเรา ณ เวลาที่เราเขียนออกมา จำนวนไฟล์ที่สร้างขึ้นจะแตกต่างกันหากเราแบ่งพาร์ติชัน dataFrame ใหม่ก่อนที่จะเขียนออกมา
การแบ่งพาร์ติชันหมายถึงการแบ่งชุดข้อมูลขนาดใหญ่ออกเป็นส่วนเล็กๆ (พาร์ติชัน) ใน Spark เป็นหน่วยพื้นฐานของความขนานและช่วยให้คุณสามารถควบคุมตำแหน่งที่จะจัดเก็บข้อมูลในขณะที่คุณเขียนข้อมูลได้
ไฟล์ JSON
จะอ่านจากไฟล์ JSON ได้อย่างไร
การอ่าน JSON ไม่ได้แตกต่างจากการอ่านไฟล์ CSV มากนัก คุณสามารถอ่านโดยใช้ inferSchema
หรือโดยการกำหนดสคีมาของคุณเอง
df=spark.read.format("json").option("inferSchema”,"true").load(filePath)
ที่นี่เราอ่านไฟล์ JSON โดยขอให้ Spark สรุปสคีมา เราต้องการงานเดียวเท่านั้นแม้จะอนุมานสคีมาเพราะไม่มีส่วนหัวใน JSON ชื่อคอลัมน์จะดึงมาจากแอตทริบิวต์ของออบเจ็กต์ JSON
เพื่อรักษาความสอดคล้องกัน เราสามารถกำหนดสคีมาที่จะนำไปใช้กับข้อมูล JSON ที่กำลังอ่านได้เสมอ
jsonSchema = StructType([...])
df=spark.read.format("json").schema(jsonSchema).load(filePath)
โปรดจำไว้ว่าไฟล์ JSON สามารถซ้อนกันได้ และสำหรับไฟล์ขนาดเล็กที่สร้างสคีมาด้วยตนเองอาจไม่คุ้มกับความพยายาม แต่สำหรับไฟล์ขนาดใหญ่ นี่เป็นตัวเลือกที่ดีกว่า เมื่อเทียบกับไฟล์ที่ ยาว และ กระบวนการ สรุปสคีมา >ราคาแพง
จะเขียนลงไฟล์ JSON ได้อย่างไร?
ตามที่คุณคาดหวังการเขียนลงในไฟล์ JSON จะเหมือนกับไฟล์ CSV
df.write.format("json").mode("overwrite).save(outputPath/file.json)
เช่นเดียวกับการเขียนลงใน CSV ชุดข้อมูลจะถูกแบ่งออกเป็นหลายไฟล์ตามจำนวนพาร์ติชันใน dataFrame
ไฟล์ปาร์เก้
Apache Parquet เป็นรูปแบบการจัดเก็บข้อมูลแบบเรียงเป็นแนว ฟรีและเป็นโอเพ่นซอร์สซึ่งให้การบีบอัดข้อมูลที่มีประสิทธิภาพ และมีบทบาทสำคัญในการประมวลผล Spark Big Data
จะอ่านข้อมูลจากไฟล์ Parquet ได้อย่างไร
ต่างจากไฟล์ CSV และ JSON ตรงที่ “ไฟล์” ของ Parquet คือชุดของไฟล์ซึ่งส่วนใหญ่ประกอบด้วยข้อมูลจริงและไฟล์บางไฟล์ที่ประกอบด้วยข้อมูลเมตา
หากต้องการอ่านไฟล์ไม้ปาร์เก้ เราสามารถใช้ไวยากรณ์รูปแบบต่างๆ ดังที่แสดงด้านล่าง ซึ่งทั้งสองรูปแบบมีการดำเนินการเหมือนกัน
#option1df=spark.read.format("parquet).load(parquetDirectory)
#option2df=spark.read.parquet(parquetDirectory)
ดังที่คุณสังเกตเห็นว่าเราไม่จำเป็นต้องระบุสคีมาประเภทใด ชื่อคอลัมน์และประเภทข้อมูลจะถูกจัดเก็บไว้ในไฟล์ปาร์เก้เอง
กระบวนการอนุมานสคีมาไม่แพงเท่ากับ CSV และ JSON เนื่องจากโปรแกรมอ่าน Parquet จำเป็นต้องประมวลผลเฉพาะไฟล์ข้อมูลเมตาขนาดเล็กเท่านั้น เพื่ออนุมานสคีมาโดยปริยาย แทนที่จะอนุมานทั้งไฟล์
จะเขียนข้อมูลลงไฟล์ Parquet ได้อย่างไร?
การเขียนไม้ปาร์เก้เป็นเรื่องง่ายเหมือนกับการอ่าน เพียงระบุตำแหน่งสำหรับไฟล์ที่จะเขียน
df.write.format(“parquet").mode("overwrite").save("outputPath")
ใช้กฎการแบ่งพาร์ติชันเดียวกันกับที่เรากำหนดไว้สำหรับ CSV และ JSON ที่นี่
เดลต้า
บัดดี้ไม่เคยได้ยินเรื่องนี้มาก่อน ดูเหมือนเป็นแนวคิดที่ค่อนข้างใหม่ สมควรได้รับพื้นหลังเล็กน้อย
Delta Lake เป็นโครงการที่ริเริ่มโดย Databricks ซึ่งปัจจุบันเป็นโอเพ่นซอร์ส Delta Lake เป็นเลเยอร์การจัดเก็บข้อมูลแบบโอเพ่นซอร์สที่ช่วยคุณสร้าง Data Lake ที่ประกอบด้วยตารางอย่างน้อย 1 ตารางในรูปแบบ Delta Lake
ซึ่งเป็นรูปแบบเปิดที่ใช้ Parquet ซึ่งนำธุรกรรม ACID มาสู่ Data Lake และคุณสมบัติที่มีประโยชน์อื่นๆ ที่มุ่งปรับปรุงความน่าเชื่อถือ คุณภาพ และประสิทธิภาพของ Data Lake ที่มีอยู่
เพื่อให้เข้าใจวิธีการอ่านจากรูปแบบเดลต้า การสร้างไฟล์เดลต้าก่อนจึงจะเหมาะสม
จะเขียนข้อมูลในรูปแบบเดลต้าได้อย่างไร?
ในการสร้างไฟล์เดลต้า คุณต้องมี dataFrame พร้อมข้อมูลบางส่วนที่จะเขียน เมื่อคุณมีสิ่งนี้แล้ว การสร้างเดลต้าก็ทำได้ง่ายเหมือนกับการเปลี่ยนประเภทไฟล์ขณะเขียน แทนที่จะเป็น parquet
เพียงพูด delta
someDataFrame.write.format(“delta").partitionBy("someColumn").save(path)
จะอ่านข้อมูลจากรูปแบบเดลต้าได้อย่างไร
หากมีไฟล์ Delta อยู่แล้ว คุณสามารถเรียกใช้การสืบค้นได้โดยตรงโดยใช้ Spark SQL บนไดเร็กทอรีของ delta โดยใช้ไวยากรณ์ต่อไปนี้:
SELECT * FROM delta. `/path/to/delta_directory`
ในกรณีส่วนใหญ่ คุณจะต้องการสร้างตารางโดยใช้ไฟล์เดลต้าและดำเนินการกับตารางโดยใช้ SQL สัญกรณ์คือ : CREATE TABLE USING DELTA LOCATION
spark.sql(""" DROP TABLE IF EXISTS delta_table_name""")
spark.sql(""" CREATE TABLE delta_table_name USING DELTA LOCATION '{}' """.format(/path/to/delta_directory))
สิ่งนี้เรียกว่า "ตารางที่ไม่มีการจัดการ" ใน Spark SQL ตอนนี้ทำหน้าที่เป็นอินเทอร์เฟซระหว่าง Spark และข้อมูลในชั้นจัดเก็บข้อมูล การเปลี่ยนแปลงใดๆ ที่ทำกับตารางนี้จะปรากฏในไฟล์และในทางกลับกัน เมื่อสร้างตารางแล้ว คุณจะสามารถสืบค้นได้เหมือนกับตาราง SQL อื่นๆ
นอกเหนือจากการเขียน dataFrame เป็นรูปแบบเดลต้าแล้ว เรายังสามารถดำเนินการแบบกลุ่มอื่นๆ ได้ เช่น ผนวกและผสานบนตารางเดลต้า ซึ่งเป็นการดำเนินการเล็กๆ น้อยๆ บางอย่างในไปป์ไลน์การประมวลผลข้อมูลขนาดใหญ่
บทสรุป
ในบทความนี้ บัดดี้ได้เรียนรู้
- วิธีอ่านและเขียนข้อมูลด้วย Apache Spark
- วิธีจัดการรูปแบบไฟล์เฉพาะของ Big Data เช่น Apache Parquet และรูปแบบ Delta
รายละเอียดควบคู่ไปกับเอกสารโกงช่วยให้บัดดี้หลีกเลี่ยงปัญหาทั้งหมดได้
Spark ทำอะไรได้อีกมากมาย และเรารู้ว่า Buddy จะไม่หยุดอยู่แค่นั้น!
หากคุณต้องการให้บริการโมเดล ML โดยใช้ Spark นี่คือ "บทช่วยสอนปลายด้าน Spark ที่น่าสนใจ" ที่ฉันพบว่าค่อนข้างมีข้อมูลเชิงลึก ยกนิ้วให้ถ้าคุณชอบมันเหมือนกัน!
อ้างอิง
- Databricks — https://databricks.com/spark/getting-started-with-apache-spark
- เอกสาร Spark — https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
- Spark The Definitive Guide — https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/