Lembar contekan PySpark untuk Insinyur Data pemula

Buddy adalah Data Engineer pemula yang baru-baru ini menemukan Spark, kerangka pemrosesan data besar yang populer.

Mengingat fakta bahwa Spark terintegrasi secara mulus dengan platform data cloud seperti Azure, AWS, dan GCP, Buddy kini menyadari kepastian eksistensinya. Hal ini mendorong Buddy untuk memulai perjalanan Spark-nya, dengan melakukan hal paling sepele dalam siklus hidup pemrosesan data besar - “Membaca dan Menulis Data”

TL;DR

Dibanjiri dengan pekerjaan, Sobat dan pikirannya yang tidak sabar dengan suara bulat memutuskan untuk mengambil jalan pintas dengan contekan berikut menggunakan Python.

TS; WM

Kalau dipikir-pikir, Buddy menganggap penting untuk berdamai dengan ketidaksabarannya. Jalan pintas ini terbukti efektif, namun banyak waktu yang dihabiskan untuk menyelesaikan kesalahan kecil dan menangani perilaku yang tidak jelas.

Sekarang saatnya untuk menangani detailnya.

Membaca dan menulis data di Spark adalah tugas yang sepele, sering kali ini merupakan awal dari segala bentuk pemrosesan Big data. Sobat ingin mengetahui sintaks inti untuk membaca dan menulis data sebelum melanjutkan ke spesifiknya.

Sintaks inti untuk membaca data di Apache Spark

DataFrameReader.format(…).option(“key”, “value”).schema(…).load()

DataFrameReaderadalah dasar untuk membaca data di Spark, dapat diakses melalui atribut spark.read

  • format — menentukan format file seperti CSV, JSON, atau parket. Standarnya adalah parket.
  • option — sekumpulan konfigurasi nilai kunci untuk membuat parameter cara membaca data
  • skema — skema opsional yang digunakan untuk menentukan apakah Anda ingin menyimpulkan skema dari sumber data.

Mode Baca — Seringkali saat membaca data dari sumber eksternal kita menemukan data yang rusak, mode baca menginstruksikan Spark untuk menangani data yang rusak dengan cara tertentu.

Ada 3 mode baca umum dan mode baca default adalah permisif.

  • permisif — Semua bidang disetel ke nol dan catatan yang rusak ditempatkan di kolom string yang disebut _corrupt_record
  • dropMalformed — Menghapus semua baris yang berisi catatan yang rusak.
  • failFast — Gagal ketika ditemukan catatan yang rusak.

Sintaks inti untuk menulis data di Apache Spark

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()

Landasan penulisan data di Spark adalah DataFrameWriter,yang diakses per-DataFrame menggunakan atribut dataFrame.write

Mode simpan — menentukan apa yang akan terjadi jika Spark menemukan data yang sudah ada di tujuan.

Ada 4 mode penyimpanan tipikal dan mode defaultnya adalah errorIfExists

  • append — menambahkan data keluaran ke file yang sudah ada
  • overwrite — menimpa seluruh data yang ada di tujuan
  • errorIfExists — Spark memunculkan kesalahan jika data sudah ada di tujuan
  • abaikan - jika data ada, jangan lakukan apa pun dengan dataFrame

Itu primer yang bagus! Sobat sepertinya kini paham alasan dibalik kesalahan yang selama ini menyiksanya. Dia ingin memperluas pengetahuan ini dengan mendalami beberapa jenis file yang sering ditemui dan cara menanganinya.

file CSV

Bagaimana cara membaca dari file CSV?

Untuk membaca file CSV, Anda harus membuat DataFrameReader terlebih dahulu dan menetapkan sejumlah opsi.

df=spark.read.format("csv").option("header","true").load(filePath)

Di sini kita memuat file CSV dan memberi tahu Spark bahwa file tersebut berisi baris header. Langkah ini dijamin akan memicu pekerjaan Spark.

Spark job:blok komputasi paralel yang menjalankan beberapa tugas.

Sebuah pekerjaan dipicu setiap kali kita secara fisik diminta untuk menyentuh data. Dalam hal ini, DataFrameReaderharus mengintip baris pertama file untuk mengetahui berapa banyak kolom data yang kita miliki di file tersebut.

Saat membaca data, Anda selalu perlu mempertimbangkan overhead tipe data. Ada dua cara untuk menangani ini di Spark, InferSchema, atau skema yang ditentukan pengguna.

Membaca CSV menggunakan InferSchema

df=spark.read.format("csv").option("inferSchema","true").load(filePath)

Opsi inferSchema memberitahu pembaca untuk menyimpulkan tipe data dari file sumber. Hal ini mengakibatkan penerusan tambahan pada file sehingga memicu dua pekerjaan Spark. Ini adalah operasi yang mahal karena Spark harus secara otomatis menelusuri file CSV dan menyimpulkan skema untuk setiap kolom.

Membaca CSV menggunakan Skema yang ditentukan pengguna

Opsi yang lebih disukai saat membaca file apa pun adalah menerapkan skema khusus, hal ini memastikan bahwa tipe data konsisten dan menghindari perilaku yang tidak terduga.

Untuk melakukan itu, pertama-tama Anda mendeklarasikan skema yang akan diterapkan, lalu membaca data dengan menyetel opsi schema.

csvSchema = StructType([StructField(“id",IntegerType(),False)])
df=spark.read.format("csv").schema(csvSchema).load(filePath)

Sebagai hasil dari penentuan skema untuk data Anda sebelumnya, Anda terhindar dari memicu pekerjaan apa pun. Spark tidak merasa perlu mengintip ke dalam file karena kami sudah mengurus skemanya. Ini dikenal sebagai evaluasi malasyang merupakan teknik pengoptimalan penting di Spark.

Bagaimana Cara Menulis Data CSV?

Menulis data di Spark cukup sederhana, seperti yang kita definisikan dalam sintaks inti, untuk menulis data, kita memerlukan dataFrame dengan data aktual di dalamnya, yang melaluinya kita dapat mengakses DataFrameWriter.

df.write.format("csv").mode("overwrite).save(outputPath/file.csv)

Disini kita menulis isi data frame ke dalam file CSV. Menyetel mode tulis ke menimpa akan sepenuhnya menimpa semua data yang sudah ada di tujuan.

Apa yang Anda harapkan dari perintah sebelumnya adalah satu keluaran file CSV, namun Anda akan melihat bahwa file yang ingin Anda tulis sebenarnya adalah folder dengan banyak file di dalamnya. Hal ini selanjutnya dikonfirmasi dengan mengintip isi outputPath.

%fs ls /outputPath/file.csv

Ini adalah aspek penting dari mesin terdistribusi Spark dan ini mencerminkan jumlah partisidi dataFrame kami pada saat kami menulisnya. Jumlah file yang dihasilkan akan berbeda jika kita telah mempartisi ulang dataFrame sebelum menulisnya.

Mempartisi berarti membagi kumpulan data besar menjadi bagian-bagian yang lebih kecil (partisi). Di Spark, ini adalah unit dasar paralelisme dan memungkinkan Anda mengontrol di mana data disimpan saat Anda menulisnya.

file JSON

Bagaimana cara membaca dari file JSON?

Membaca JSON tidak jauh berbeda dengan membaca file CSV, Anda dapat membaca menggunakan inferSchema atau dengan menentukan skema Anda sendiri.

df=spark.read.format("json").option("inferSchema”,"true").load(filePath)

Di sini kita membaca file JSON dengan meminta Spark untuk menyimpulkan skema, kita hanya memerlukan satu pekerjaan bahkan saat menyimpulkan skema karena tidak ada header di JSON. Nama kolom diekstraksi dari atribut objek JSON.

Untuk menjaga konsistensi kita selalu dapat menentukan skema yang akan diterapkan pada data JSON yang sedang dibaca.

jsonSchema = StructType([...])
df=spark.read.format("json").schema(jsonSchema).load(filePath)

Ingatlah bahwa file JSON dapat disarangkan dan untuk file kecil, membuat skema secara manual mungkin tidak sepadan dengan usaha yang dilakukan, namun untuk file yang lebih besar, ini adalah pilihan yang lebih baik dibandingkan dengan yang sangat panjang dan proses penyimpulan skema yang mahal.

Bagaimana cara menulis ke file JSON?

Seperti yang Anda harapkan, menulis ke file JSON identik dengan file CSV.

df.write.format("json").mode("overwrite).save(outputPath/file.json)

Sekali lagi, seperti menulis ke CSV, kumpulan data dibagi menjadi banyak file yang mencerminkan jumlah partisi dalam dataFrame.

File parket

Apache Parket adalah format penyimpanan kolom, gratis dan sumber terbuka yang menyediakan kompresi data yang efisien dan memainkan peran penting dalam pemrosesan Spark Big Data.

Bagaimana Cara Membaca data dari file Parket?

Tidak seperti file CSV dan JSON, “file” Parket sebenarnya adalah kumpulan file yang sebagian besar berisi data aktual dan beberapa file yang berisi meta-data.

Untuk membaca file parket kita dapat menggunakan variasi sintaksis seperti yang ditunjukkan di bawah ini yang keduanya melakukan tindakan yang sama.

#option1
df=spark.read.format("parquet).load(parquetDirectory)
#option2
df=spark.read.parquet(parquetDirectory)

Seperti yang Anda perhatikan, kita tidak perlu menentukan skema apa pun, nama kolom dan tipe data disimpan di file parket itu sendiri.

Proses inferensi skema tidak semahal untuk CSV dan JSON, karena pembaca Parket hanya perlu memproses file meta-data berukuran kecil untuk secara implisit menyimpulkan skema, bukan keseluruhan file.

Bagaimana cara menulis data ke file Parket?

Menulis Parket semudah membacanya. Cukup tentukan lokasi file yang akan ditulis.

df.write.format(“parquet").mode("overwrite").save("outputPath")

Aturan partisi yang sama yang kami tetapkan untuk CSV dan JSON juga berlaku di sini.

Delta

Sobat belum pernah mendengar hal ini sebelumnya, sepertinya konsep ini cukup baru; layak mendapat sedikit latar belakang.

Delta Lake adalah proyek yang diprakarsai oleh Databricks, yang sekarang bersifat opensource. Delta Lake adalah lapisan penyimpanan sumber terbuka yang membantu Anda membangun data lake yang terdiri dari satu atau beberapa tabel dalam format Delta Lake.

Ini adalah format terbuka berdasarkan Parket yang membawa transaksi ACID ke dalam data lake dan fitur praktis lainnya yang bertujuan untuk meningkatkan keandalan, kualitas, dan kinerja data lake yang ada.

Untuk memahami cara membaca dari format Delta, sebaiknya buat file delta terlebih dahulu.

Bagaimana cara menulis data ke format Delta?

Untuk membuat file delta, Anda harus memiliki dataFrame dengan beberapa data yang akan ditulis. Setelah Anda memilikinya, membuat delta semudah mengubah jenis file saat melakukan penulisan. Daripada parquet cukup ucapkan delta.

someDataFrame.write.format(“delta").partitionBy("someColumn").save(path)

Bagaimana cara membaca data dari format Delta?

Jika file Delta sudah ada, Anda bisa langsung menjalankan kueri menggunakan Spark SQL pada direktori delta menggunakan sintaks berikut:

SELECT * FROM delta. `/path/to/delta_directory`

Dalam kebanyakan kasus, Anda ingin membuat tabel menggunakan file delta dan mengoperasikannya menggunakan SQL. Notasinya adalah : CREATE TABLE USING DELTA LOCATION

spark.sql(""" DROP TABLE IF EXISTS delta_table_name""")
spark.sql(""" CREATE TABLE delta_table_name USING DELTA LOCATION '{}' """.format(/path/to/delta_directory))

Ini disebut "tabel tidak terkelola" di Spark SQL. Sekarang berfungsi sebagai antarmuka antara Spark dan data di lapisan penyimpanan. Setiap perubahan yang dilakukan pada tabel ini akan tercermin dalam file dan sebaliknya. Setelah tabel dibuat, Anda dapat menanyakannya seperti tabel SQL lainnya.

Selain menulis dataFrame sebagai format delta, kita dapat melakukan operasi batch lainnya seperti Tambahkan dan Gabungkan pada tabel delta, beberapa operasi sepele dalam pipeline pemrosesan data besar.

Kesimpulan

Di artikel ini, Sobat belajar

  1. Cara membaca dan menulis data menggunakan Apache Spark.
  2. Cara menangani format file spesifik Big Data seperti Apache Parket dan format Delta.

Detailnya ditambah dengan contekan telah membantu Sobat menghindari semua masalah.

Spark dapat melakukan lebih banyak hal, dan kami tahu Buddy tidak akan berhenti di situ!

Jika Anda ingin menyajikan model ML menggunakan Spark, berikut adalah "tutorial Spark end-end yang menarik" yang menurut saya cukup mendalam. Beri acungan jempol jika Anda juga menyukainya!

Referensi

  1. Databricks — https://databricks.com/spark/getting-started-with-apache-spark
  2. Dokumen Spark — https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
  3. Spark Panduan Definitif — https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/