Использование пользовательского объекта Python в Pyspark UDF

При запуске следующего фрагмента кода PySpark:

nlp = NLPFunctions()

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType()))

Я получаю следующую ошибку: _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

Я предполагаю, что это связано с тем, что PySpark не может сериализовать этот пользовательский класс. Но как мне избежать накладных расходов на создание экземпляра этого дорогостоящего объекта при каждом запуске функции parse_ingredients_line?


person Thomas Nys    schedule 11.10.2017    source источник


Ответы (3)


Допустим, вы хотите использовать класс Identity, определенный следующим образом (identity.py):

class Identity(object):                   
    def __getstate__(self):
        raise NotImplementedError("Not serializable")

    def identity(self, x):
        return x

вы можете, например, использовать вызываемый объект (f.py) и сохранить экземпляр Identity в качестве члена класса:

from identity import Identity

class F(object):                          
    identity = None

    def __call__(self, x):
        if not F.identity:
            F.identity = Identity()
        return F.identity.identity(x)

и используйте их, как показано ниже:

from pyspark.sql.functions import udf
import f

sc.addPyFile("identity.py")
sc.addPyFile("f.py")

f_ = udf(f.F())

spark.range(3).select(f_("id")).show()
+-----+
|F(id)|
+-----+
|    0|
|    1|
|    2|
+-----+

или отдельная функция и закрытие:

from pyspark.sql.functions import udf
import identity

sc.addPyFile("identity.py")

def f(): 
    dict_ = {}                 
    @udf()              
    def f_(x):                 
        if "identity" not in dict_:
            dict_["identity"] = identity.Identity()
        return dict_["identity"].identity(x)
    return f_


spark.range(3).select(f()("id")).show()
+------+
|f_(id)|
+------+
|     0|
|     1|
|     2|
+------+
person zero323    schedule 11.10.2017
comment
Я не совсем понимаю пример. Где вы показываете, что можете сохранять состояние между выполнениями udf? - person Vitaliy; 01.04.2018
comment
@Vitaly Это стандартный код Python - в обоих случаях мы сохраняем интересующий объект во внешней области, поэтому его время жизни не ограничивается самой областью. Вы можете использовать nonlocal вместо изменяемого dict, если хотите. Очевидно, что он не может пережить родительский интерпретатор, над которым у вас нет контроля. В противном случае вы можете легко добавить ведение журнала и использовать отладчик, чтобы увидеть, что инициализация применяется только при первом вызове. - person zero323; 01.04.2018
comment
Это работает очень хорошо! Супер быстро - вот почему мы используем искру :) - person Michael Klear; 23.08.2019
comment
user6910411 - Вы уверены, что ваш код не создает 3 экземпляра класса Identity? Я проверил вашу автономную функцию и пример кода закрытия, и вот что со мной произошло. - person Pawel Batko; 18.11.2019
comment
@PawełBatko Этот код создаст столько экземпляров Identity, сколько интерпретаторов-исполнителей порождает Spark (помните, что здесь нет общей памяти, и каждый поток-исполнитель на самом деле является процессом в PySpark). Таким образом, фактическое число будет зависеть от количества исполнителей, которые используются повторно, причем верхняя граница представляет собой общее количество задач (включая те, которые были перезапущены). Существуют более сложные стратегии, но они выходят за рамки данного конкретного ответа. - person 10465355; 19.11.2019

Я решил это на основе (https://github.com/scikit-learn/scikit-learn/issues/6975), сделав сериализуемыми все зависимости класса NLPFunctions.

person Thomas Nys    schedule 12.10.2017

Изменить: этот ответ неверен. Объект по-прежнему сериализуется, а затем десериализуется при широковещании, поэтому сериализации не избежать. (Советы по правильному использованию больших широковещательных переменных?)


Попробуйте использовать широковещательную переменную.

sc = SparkContext()
nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format.

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))
person A.M.    schedule 11.10.2017