หลายครั้งในฐานะ Data Scientist เราต้องรับมือกับข้อมูลจำนวนมหาศาล ในกรณีดังกล่าว หลายวิธีใช้ไม่ได้ผลหรือเป็นไปไม่ได้ ข้อมูลจำนวนมหาศาลนั้นดี ดีมาก และเราต้องการใช้ประโยชน์ให้มากที่สุด

ในที่นี้ ฉันอยากจะแนะนำเทคนิค MapReduce ซึ่งเป็นเทคนิคกว้างๆ ที่ใช้ในการจัดการข้อมูลจำนวนมหาศาล มีการใช้งาน MapReduce มากมาย รวมถึง Apache Hadoop ที่มีชื่อเสียง ในที่นี้ ฉันจะไม่พูดถึงการใช้งาน ฉันจะพยายามแนะนำแนวคิดนี้ด้วยวิธีที่เข้าใจง่ายที่สุด และนำเสนอตัวอย่างสำหรับทั้งของเล่นและตัวอย่างในชีวิตจริง

เริ่มต้นด้วยงานที่ตรงไปตรงมา คุณได้รับรายการสตริง และคุณต้องส่งคืนสตริงที่ยาวที่สุด มันค่อนข้างง่ายที่จะทำใน python:

def find_longest_string(list_of_strings):
    longest_string = None
    longest_string_len = 0 
    for s in list_of_strings:
        if len(s) > longest_string_len:
            longest_string_len = len(s)
            longest_string = s
    return longest_string

เราตรวจดูสตริงทีละรายการ คำนวณความยาวและเก็บสตริงที่ยาวที่สุดไว้จนกว่าเราจะเสร็จ

สำหรับรายการขนาดเล็ก มันทำงานได้ค่อนข้างเร็ว:

list_of_strings = ['abc', 'python', 'dima']
%time max_length = print(find_longest_string(list_of_strings))
OUTPUT:
python
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 75.8 µs

แม้แต่รายการที่มีองค์ประกอบมากกว่า 3 รายการก็ยังทำงานได้ดี แต่เราลองใช้องค์ประกอบ 3000 รายการ:

large_list_of_strings = list_of_strings*1000
%time print(find_longest_string(large_list_of_strings))
OUTPUT:
python
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 307 µs

แต่ถ้าเราพยายามหาองค์ประกอบ 300 ล้านองค์ประกอบล่ะ?

large_list_of_strings = list_of_strings*100000000
%time max_length = max(large_list_of_strings, key=len)
OUTPUT:
python
CPU times: user 21.8 s, sys: 0 ns, total: 21.8 s
Wall time: 21.8 s

นี่เป็นปัญหา ในแอปพลิเคชันส่วนใหญ่ เวลาตอบสนอง 20 วินาทีไม่เป็นที่ยอมรับ วิธีหนึ่งในการปรับปรุงเวลาในการคำนวณคือการซื้อ CPU ที่ดีขึ้นและเร็วขึ้นมาก การปรับขนาดระบบของคุณด้วยการแนะนำฮาร์ดแวร์ที่ดีขึ้นและเร็วขึ้นเรียกว่า "Vertical Scaling" แน่นอนว่าสิ่งนี้จะไม่ได้ผลตลอดไป ไม่เพียงแต่การค้นหา CPU ที่ทำงานเร็วขึ้น 10 เท่าไม่ใช่เรื่องง่ายเท่านั้น แต่ข้อมูลของเราอาจจะใหญ่ขึ้นด้วย และเราไม่ต้องการอัปเกรด CPU ของเราทุกครั้งที่โค้ดช้าลง โซลูชันของเราไม่สามารถปรับขนาดได้ แต่เราสามารถทำ "การปรับขนาดแนวนอน" แทนได้ เราจะออกแบบโค้ดของเราเพื่อให้สามารถทำงานแบบขนานได้ และจะเร็วขึ้นมากเมื่อเราเพิ่มโปรเซสเซอร์และ/หรือ CPU มากขึ้น

ในการทำเช่นนั้น เราต้องแบ่งโค้ดของเราออกเป็นส่วนประกอบเล็กๆ และดูว่าเราสามารถดำเนินการคำนวณแบบขนานได้อย่างไร สัญชาตญาณมีดังนี้: 1) แบ่งข้อมูลของเราออกเป็นหลาย ๆ ส่วน 2) เรียกใช้ฟังก์ชัน find_longest_string สำหรับทุก ๆ ส่วนขนาน และ 3) ค้นหาสตริงที่ยาวที่สุดในบรรดาเอาต์พุตของชิ้นส่วนทั้งหมด

โค้ดของเรามีความเฉพาะเจาะจงมากและยากต่อการทำลายและแก้ไข ดังนั้นแทนที่จะใช้ฟังก์ชัน find_longest_string เราจะพัฒนาเฟรมเวิร์กทั่วไปมากขึ้น ซึ่งจะช่วยให้เราทำการคำนวณต่างๆ พร้อมกันกับข้อมูลขนาดใหญ่

สิ่งสำคัญสองประการที่เราทำในโค้ดของเราคือการคำนวณ len ของสตริงและเปรียบเทียบกับสตริงที่ยาวที่สุดจนถึงปัจจุบัน เราจะแบ่งโค้ดของเราออกเป็นสองขั้นตอน: 1) คำนวณ len ของสตริงทั้งหมด และ 2) เลือกค่า max

%%time
# step 1:
list_of_string_lens = [len(s) for s in list_of_strings]
list_of_string_lens = zip(list_of_strings, list_of_string_lens)
#step 2:
max_len = max(list_of_string_lens, key=lambda t: t[1])
print(max_len)
OUTPUT:
('python', 6)
CPU times: user 51.6 s, sys: 804 ms, total: 52.4 s
Wall time: 52.4 s

(ฉันกำลังคำนวณความยาวของสตริงแล้ว zip เข้าด้วยกันเพราะมันเร็วกว่าการทำในบรรทัดเดียวและทำซ้ำรายการสตริงมาก)

ในสถานะนี้ โค้ดจะทำงานช้าลงกว่าเดิมเพราะแทนที่จะส่งผ่านสตริงทั้งหมดของเราเพียงครั้งเดียว เราทำ 2 ครั้ง ขั้นแรกเพื่อคำนวณ len จากนั้นจึงค้นหาค่า max ทำไมมันถึงดีสำหรับเรา? เพราะตอนนี้ "ขั้นตอนที่ 2" ของเราได้รับเป็นอินพุต ไม่ใช่รายการสตริงดั้งเดิม แต่เป็นข้อมูลที่ประมวลผลล่วงหน้าบางส่วน สิ่งนี้ช่วยให้เราสามารถดำเนินการขั้นตอนที่สองโดยใช้เอาต์พุตของ "ขั้นตอนที่สอง" อื่น! เราจะเข้าใจสิ่งนั้นให้ดีขึ้นในอีกสักครู่ แต่ก่อนอื่น เรามาตั้งชื่อขั้นตอนเหล่านั้นกันก่อน เราจะเรียก "ขั้นตอนที่หนึ่ง" ว่า "ตัวทำแผนที่" เพราะมันจะจับคู่ค่าบางอย่างกับค่าอื่น และเราจะเรียก "ขั้นตอนที่สอง" ว่าเป็นตัวลดเพราะมันรับรายการค่าและสร้างค่าเดียว (ในกรณีส่วนใหญ่) . ต่อไปนี้เป็นฟังก์ชันตัวช่วยสองฟังก์ชันสำหรับ mapper และตัวลด:

mapper = len
def reducer(p, c):
    if p[1] > c[1]:
        return p
    return c

ผู้ทำแผนที่เป็นเพียงฟังก์ชัน len ได้รับสตริงและส่งกลับความยาว ตัวลดจะได้รับสองสิ่งอันดับเป็นอินพุตและส่งกลับอันที่มีความยาวมากที่สุด

มาเขียนโค้ดของเราใหม่โดยใช้ map และ reduce มีฟังก์ชันในตัวสำหรับสิ่งนี้ใน python (ใน python 3 เราต้องนำเข้าจาก functools )

%%time
#step 1
mapped = map(mapper, list_of_strings)
mapped = zip(list_of_strings, mapped)
#step 2:
reduced = reduce(reducer, mapped)
print(reduced)
OUTPUT:
('python', 6)
CPU times: user 57.9 s, sys: 0 ns, total: 57.9 s
Wall time: 57.9 s

โค้ดทำสิ่งเดียวกันทุกประการ มันดูสวยงามกว่าเล็กน้อย แต่ยังเป็นแบบทั่วไปมากกว่าและจะช่วยเราทำให้มันขนานกัน ลองดูให้ละเอียดยิ่งขึ้น:

ขั้นตอนที่ 1 แมปรายการสตริงของเราลงในรายการสิ่งอันดับโดยใช้ฟังก์ชันตัวทำแผนที่ (ในที่นี้ฉันใช้ zip อีกครั้งเพื่อหลีกเลี่ยงการทำซ้ำสตริง)

ขั้นตอนที่ 2 ใช้ฟังก์ชันตัวลด ข้ามสิ่งอันดับจากขั้นตอนที่หนึ่งแล้วนำไปใช้ทีละรายการ ผลลัพธ์ที่ได้คือสิ่งอันดับที่มีความยาวสูงสุด

ตอนนี้เรามาแบ่งข้อมูลที่เราป้อนออกเป็นส่วนๆ และทำความเข้าใจว่ามันทำงานอย่างไรก่อนที่เราจะทำการขนานกัน (เราจะใช้ chunkify ที่แบ่งรายการขนาดใหญ่ออกเป็นชิ้นที่มีขนาดเท่ากัน):

data_chunks = chunkify(list_of_strings, number_of_chunks=30)
#step 1:
reduced_all = []
for chunk in data_chunks:
    mapped_chunk = map(mapper, chunk)
    mapped_chunk = zip(chunk, mapped_chunk)
    
    reduced_chunk = reduce(reducer, mapped_chunk)
    reduced_all.append(reduced_chunk)
    
#step 2:
reduced = reduce(reducer, reduced_all)
print(reduced)
OUTPUT:
('python', 6)

ในขั้นตอนที่หนึ่ง เราจะข้ามส่วนต่างๆ ของเราและค้นหาสตริงที่ยาวที่สุดในส่วนนั้นโดยใช้แผนที่แล้วย่อขนาด ในขั้นตอนที่สอง เราจะนำเอาต์พุตของขั้นตอนที่หนึ่งซึ่งเป็นรายการของค่าที่ลดลง และดำเนินการลดขั้นสุดท้ายเพื่อให้ได้สตริงที่ยาวที่สุด เราใช้ number_of_chunks=36 เพราะนี่คือจำนวน CPU ที่ฉันมีในเครื่อง

เราเกือบจะพร้อมที่จะรันโค้ดของเราไปพร้อมๆ กันแล้ว สิ่งเดียวที่เราสามารถทำได้ดีกว่าคือการเพิ่ม reduce ขั้นตอนแรกลงใน mapper เดียว เราทำอย่างนั้นเพราะเราต้องการแบ่งโค้ดของเราออกเป็นสองขั้นตอนง่ายๆ และเนื่องจาก reduce แรกทำงานในส่วนเดียว และเราต้องการทำให้มันขนานกันด้วยเช่นกัน นี่คือลักษณะที่ปรากฏ:

def chunks_mapper(chunk):
    mapped_chunk = map(mapper, chunk) 
    mapped_chunk = zip(chunk, mapped_chunk)
    return reduce(reducer, mapped_chunk)
%%time
data_chunks = chunkify(list_of_strings, number_of_chunks=30)
#step 1:
mapped = map(chunks_mapper, data_chunks)
#step 2:
reduced = reduce(reducer, mapped)
print(reduced)
OUTPUT:
('python', 6)
CPU times: user 58.5 s, sys: 968 ms, total: 59.5 s
Wall time: 59.5 s

ตอนนี้เรามีโค้ดสองขั้นตอนที่ดูดีแล้ว หากเราดำเนินการตามที่เป็นอยู่ เราจะได้เวลาในการคำนวณเท่ากัน แต่ตอนนี้เราสามารถขนานขั้นตอนที่ 1 ได้โดยใช้โมดูล multiprocessing เพียงใช้ฟังก์ชัน pool.map แทนฟังก์ชัน map ปกติ:

from multiprocessing import Pool
pool = Pool(8)
data_chunks = chunkify(large_list_of_strings, number_of_chunks=8)
#step 1:
mapped = pool.map(mapper, data_chunks)
#step 2:
reduced = reduce(reducer, mapped)
print(reduced)
OUTPUT:
('python', 6)
CPU times: user 7.74 s, sys: 1.46 s, total: 9.2 s
Wall time: 10.8 s

เราเห็นว่ามันวิ่งเร็วขึ้น 2 เท่า! มันไม่ใช่การปรับปรุงครั้งใหญ่ แต่ข่าวดีก็คือเราสามารถปรับปรุงได้โดยการเพิ่มจำนวนกระบวนการ! เรายังสามารถทำได้บนเครื่องมากกว่าหนึ่งเครื่อง หากข้อมูลของเรามีขนาดใหญ่มาก เราก็สามารถใช้เครื่องนับหมื่นหรือหลายพันเครื่องเพื่อทำให้เวลาในการคำนวณของเราสั้นลงเท่าที่เราต้องการ (เกือบ)

สถาปัตยกรรมของเราสร้างขึ้นโดยใช้สองฟังก์ชัน: map และ reduce หน่วยคำนวณแต่ละหน่วยจะจับคู่ข้อมูลอินพุตและดำเนินการลดขนาดเริ่มต้น ในที่สุด หน่วยรวมศูนย์บางหน่วยจะดำเนินการลดขั้นสุดท้ายและส่งกลับเอาต์พุต ดูเหมือนว่านี้:

สถาปัตยกรรมนี้มีข้อดีที่สำคัญสองประการ:

  1. สามารถปรับขนาดได้: หากเรามีข้อมูลมากขึ้น สิ่งเดียวที่เราต้องทำคือเพิ่มหน่วยประมวลผลมากขึ้น ไม่จำเป็นต้องเปลี่ยนรหัส!
  2. เป็นเรื่องทั่วไป: สถาปัตยกรรมนี้รองรับงานที่หลากหลาย เราสามารถแทนที่ฟังก์ชัน map และ reduce ด้วยเกือบทุกอย่าง และทำให้คอมพิวเตอร์ทำงานหลายอย่างในลักษณะที่ปรับขนาดได้

สิ่งสำคัญคือต้องทราบว่าในกรณีส่วนใหญ่ ข้อมูลของเราจะใหญ่มากและคงที่ หมายความว่าการแตกออกเป็นชิ้น ๆ ทุกครั้งนั้นไม่มีประสิทธิภาพและซ้ำซ้อนจริงๆ ดังนั้นในการใช้งานส่วนใหญ่ในชีวิตจริง เราจะจัดเก็บข้อมูลของเราเป็นชิ้น (หรือส่วนย่อย) ตั้งแต่เริ่มต้น จากนั้น เราจะสามารถคำนวณแบบต่างๆ ได้โดยใช้เทคนิค MapReduce

ตอนนี้เรามาดูตัวอย่างที่น่าสนใจมากขึ้น: จำนวนคำ!

สมมติว่าเรามีบทความข่าวชุดใหญ่มาก และเราต้องการหาคำที่ใช้ 10 อันดับแรก ไม่รวมคำหยุด เราจะทำยังไง? ก่อนอื่นเรามารับข้อมูลกันก่อน:

from sklearn.datasets import fetch_20newsgroups
data = news.data*10

สำหรับโพสต์นี้ ฉันเพิ่มข้อมูลให้ใหญ่ขึ้น x10 เพื่อให้เราได้เห็นความแตกต่าง

สำหรับแต่ละข้อความในชุดข้อมูล เราต้องการสร้างโทเค็น ทำความสะอาด ลบคำหยุด และสุดท้ายก็นับคำ:

def clean_word(word):
    return re.sub(r'[^\w\s]','',word).lower()
def word_not_in_stopwords(word):
    return word not in ENGLISH_STOP_WORDS and word and word.isalpha()
    
    
def find_top_words(data):
    cnt = Counter()
    for text in data:
        tokens_in_text = text.split()
        tokens_in_text = map(clean_word, tokens_in_text)
        tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
        cnt.update(tokens_in_text)
        
    return cnt.most_common(10)

มาดูกันว่าถ้าไม่มี MapReduce จะต้องใช้เวลานานเท่าใด:

%time find_top_words(data)
OUTPUT:
[('subject', 122520),
 ('lines', 118240),
 ('organization', 111850),
 ('writes', 78360),
 ('article', 67540),
 ('people', 58320),
 ('dont', 58130),
 ('like', 57570),
 ('just', 55790),
 ('university', 55440)]
CPU times: user 51.7 s, sys: 0 ns, total: 51.7 s
Wall time: 51.7 s

ตอนนี้ เรามาเขียน mapper ,reducer และ chunk_mapper ของเรากัน:

def mapper(text):
    tokens_in_text = text.split()
    tokens_in_text = map(clean_word, tokens_in_text)
    tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
    return Counter(tokens_in_text)
def reducer(cnt1, cnt2):
    cnt1.update(cnt2)
    return cnt1
def chunk_mapper(chunk):
    mapped = map(mapper, chunk)
    reduced = reduce(reducer, mapped)
    return reduced

mapper รับข้อความ แยกออกเป็นโทเค็น ทำความสะอาดและกรองคำหยุดและไม่ใช่คำ ในที่สุดก็จะนับคำในเอกสารข้อความเดียวนี้ ฟังก์ชัน reducer รับ 2 ตัวนับและรวมเข้าด้วยกัน chunk_mapper ได้รับชิ้นส่วนและทำ MapReduce กับมัน ตอนนี้เรามารันโดยใช้เฟรมเวิร์กที่เราสร้างขึ้นและดู:

%%time
data_chunks = chunkify(data, number_of_chunks=36)
#step 1:
mapped = pool.map(chunk_mapper, data_chunks)
#step 2:
reduced = reduce(reducer, mapped)
print(reduced.most_common(10))
OUTPUT:
[('subject', 122520),
 ('lines', 118240),
 ('organization', 111850),
 ('writes', 78360),
 ('article', 67540),
 ('people', 58320),
 ('dont', 58130),
 ('like', 57570),
 ('just', 55790),
 ('university', 55440)]
CPU times: user 1.52 s, sys: 256 ms, total: 1.77 s
Wall time: 4.67 s

เร็วขึ้น 10 เท่า! ที่นี่ เราสามารถใช้ประโยชน์จากพลังการคำนวณของเราได้จริง เนื่องจากงานมีความซับซ้อนมากขึ้นและต้องใช้มากกว่านั้น

โดยสรุป MapReduce เป็นเทคนิคที่น่าตื่นเต้นและจำเป็นสำหรับการประมวลผลข้อมูลขนาดใหญ่ สามารถจัดการงานได้มากมาย เช่น การนับ การค้นหา การเรียนรู้แบบมีผู้ดูแลและแบบไม่มีผู้ดูแล และอื่นๆ ปัจจุบันมีการนำไปใช้และเครื่องมือมากมายที่สามารถทำให้ชีวิตของเราสะดวกสบายมากขึ้น แต่ฉันคิดว่ามันสำคัญมากที่จะต้องเข้าใจพื้นฐาน