Уменьшить список слов, подсчитать кортежи до совокупного ключа

Я пытаюсь взять пример количества слов Spark и агрегировать количество слов по какому-либо другому значению (например, слова и количество слов по человеку, где человек "VI" или "MO" в приведенном ниже случае)

У меня есть rdd, который представляет собой список кортежей, значения которых являются списками кортежей:

from operator import add
reduced_tokens = tokenized.reduceByKey(add)
reduced_tokens.take(2)

Что дает мне:

[(u'VI', [(u'word1', 1), (u'word2', 1), (u'word3', 1)]),
 (u'MO',
  [(u'word4', 1),
   (u'word4', 1),
   (u'word5', 1),
   (u'word8', 1),
   (u'word10', 1),
   (u'word1', 1),
   (u'word4', 1),
   (u'word6', 1),
   (u'word9', 1),
   ...
 )]

Я хочу что-то вроде:

[
 ('VI', 
    [(u'word1', 1), (u'word2', 1), (u'word3', 1)],
 ('MO', 
    [(u'word4', 58), (u'word8', 2), (u'word9', 23) ...)
]

Подобно примеру подсчета слов здесь, Я хотел бы иметь возможность отфильтровывать слова с количеством ниже некоторого порога для какого-либо человека. Спасибо!


person scmz    schedule 29.09.2017    source источник


Ответы (2)


Ключи, которые вы пытаетесь сократить, представляют собой пары (name, word), а не просто имена. Итак, вам нужно сделать .map шаг, чтобы исправить ваши данные:

def key_by_name_word(record):
  name, (word, count) = record
  return (name, word), count

tokenized_by_name_word = tokenized.map(key_by_name_word)
counts_by_name_word = tokenized_by_name_word.reduce(add)

Это должно дать вам

[
  (('VI', 'word1'), 1),
  (('VI', 'word2'), 1),
  (('VI', 'word3'), 1),
  (('MO', 'word4'), 58),
  ...
]

Чтобы получить его в том же формате, который вы упомянули, вы можете сделать:

def key_by_name(record):
  # this is the inverse of key_by_name_word
  (name, word), count = record
  return name, (word, count)

output = counts_by_name_word.map(key_by_name).reduceByKey(add)

Но на самом деле может быть проще работать с данными в плоском формате, в котором находится counts_by_name_word.

person Kerrick Staley    schedule 29.09.2017
comment
Мои данные были структурированы немного по-другому, но это помогло мне понять, как это исправить. Мои исходные данные выглядели как [Row(key=u'VI', item=u'word1 word2 word3'), ...], и я создал функцию, которая токенизировала элемент и вернула [((name, token), 1) for token in tokens]. Оттуда я применил функцию к своим данным с помощью flatMap, чтобы получить предложенную вами структуру. - person scmz; 04.10.2017

Для полноты, вот как я решил каждую часть вопроса:

Задание 1. Совокупное количество слов по некоторому ключу

import re

def restructure_data(name_and_freetext):
    name = name_and_freetext[0]
    tokens = re.sub('[&|/|\d{4}|\.|\,|\:|\-|\(|\)|\+|\$|\!]', ' ', name_and_freetext[1]).split()
    return [((name, token), 1) for token in tokens]

filtered_data = data.filter((data.flag==1)).select('name', 'item')
tokenized = filtered_data.rdd.flatMap(restructure_data)

Задание 2. Отфильтруйте слова, количество которых ниже определенного порога:

from operator import add

# keep words which have counts >= 5
counts_by_state_word = tokenized.reduceByKey(add).filter(lambda x: x[1] >= 5)

# map filtered word counts into a list by key so we can sort them
restruct = counts_by_name_word.map(lambda x: (x[0][0], [(x[0][1], x[1])]))

Бонус: сортируйте слова от наиболее часто встречающихся к наименее часто встречающимся

# sort the word counts from most frequent to least frequent words
output = restruct.reduceByKey(add).map(lambda x: (x[0], sorted(x[1], key=lambda y: y[1], reverse=True))).collect()
person scmz    schedule 04.10.2017