Penggunaan objek Python khusus di Pyspark UDF

Saat menjalankan potongan kode PySpark berikut:

nlp = NLPFunctions()

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType()))

Saya mendapatkan kesalahan berikut: _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

Saya membayangkan ini karena PySpark tidak dapat membuat serial kelas khusus ini. Tapi bagaimana saya bisa menghindari overhead saat membuat instance objek mahal ini pada setiap menjalankan fungsi parse_ingredients_line?


person Thomas Nys    schedule 11.10.2017    source sumber


Jawaban (3)


Katakanlah Anda ingin menggunakan kelas Identity yang didefinisikan seperti ini (identity.py):

class Identity(object):                   
    def __getstate__(self):
        raise NotImplementedError("Not serializable")

    def identity(self, x):
        return x

misalnya Anda dapat menggunakan objek yang dapat dipanggil (f.py) dan menyimpan instance Identity sebagai anggota kelas:

from identity import Identity

class F(object):                          
    identity = None

    def __call__(self, x):
        if not F.identity:
            F.identity = Identity()
        return F.identity.identity(x)

dan gunakan ini seperti yang ditunjukkan di bawah ini:

from pyspark.sql.functions import udf
import f

sc.addPyFile("identity.py")
sc.addPyFile("f.py")

f_ = udf(f.F())

spark.range(3).select(f_("id")).show()
+-----+
|F(id)|
+-----+
|    0|
|    1|
|    2|
+-----+

atau fungsi dan penutupan mandiri:

from pyspark.sql.functions import udf
import identity

sc.addPyFile("identity.py")

def f(): 
    dict_ = {}                 
    @udf()              
    def f_(x):                 
        if "identity" not in dict_:
            dict_["identity"] = identity.Identity()
        return dict_["identity"].identity(x)
    return f_


spark.range(3).select(f()("id")).show()
+------+
|f_(id)|
+------+
|     0|
|     1|
|     2|
+------+
person zero323    schedule 11.10.2017
comment
Saya kurang paham dengan contohnya. Di mana Anda menunjukkan bahwa Anda dapat mempertahankan status di antara eksekusi udf? - person Vitaliy; 01.04.2018
comment
@Vitaliy Ini adalah kode Python standar - dalam kedua kasus kami menyimpan objek yang diinginkan di cakupan luar sehingga seumur hidup tidak terbatas pada cakupan itu sendiri. Anda dapat menggunakan nonlocal sebagai pengganti dict yang dapat diubah jika Anda mau. Tentu saja ia tidak bisa hidup lebih lama dari penerjemah orang tua, yang tidak dapat Anda kendalikan. Jika tidak, Anda dapat dengan mudah menambahkan logging dan menggunakan debugger untuk melihat bahwa inisialisasi hanya diterapkan pada panggilan pertama. - person zero323; 01.04.2018
comment
Ini bekerja dengan sangat baik!! Super cepat - inilah alasan kami menggunakan percikan :) - person Michael Klear; 23.08.2019
comment
user6910411 - Apakah Anda yakin kode Anda tidak membuat 3 instance kelas Identity? Saya memeriksa fungsi mandiri dan kode contoh penutupan Anda dan itulah yang terjadi pada saya. - person Pawel Batko; 18.11.2019
comment
@ PawełBatko Kode ini akan membuat instance Identity sebanyak banyaknya interpreter eksekutor yang dihasilkan oleh Spark (ingat tidak ada memori bersama di sini, dan setiap thread eksekutor sebenarnya adalah proses di PySpark). Jadi jumlah sebenarnya akan bergantung pada jumlah pelaksana yang digunakan kembali - dengan batas atas adalah jumlah total tugas (termasuk tugas yang dimulai ulang). Terdapat strategi yang lebih canggih, namun strategi ini berada di luar jangkauan jawaban spesifik ini. - person 10465355; 19.11.2019

Saya menyelesaikannya berdasarkan (https://github.com/scikit-learn/scikit-learn/issues/6975) dengan membuat semua dependensi kelas NLPFunctions dapat diserialkan.

person Thomas Nys    schedule 12.10.2017

Sunting: jawaban ini salah. Objek tersebut masih diserialkan dan kemudian di-de-serialisasi ketika disiarkan, sehingga serialisasi tidak dapat dihindari. (Tips untuk menggunakan variabel siaran besar dengan benar?)


Coba gunakan variabel siaran.

sc = SparkContext()
nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format.

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))
person A.M.    schedule 11.10.2017