หลายครั้งในฐานะ Data Scientist เราต้องรับมือกับข้อมูลจำนวนมหาศาล ในกรณีดังกล่าว หลายวิธีใช้ไม่ได้ผลหรือเป็นไปไม่ได้ ข้อมูลจำนวนมหาศาลนั้นดี ดีมาก และเราต้องการใช้ประโยชน์ให้มากที่สุด
ในที่นี้ ฉันอยากจะแนะนำเทคนิค MapReduce ซึ่งเป็นเทคนิคกว้างๆ ที่ใช้ในการจัดการข้อมูลจำนวนมหาศาล มีการใช้งาน MapReduce มากมาย รวมถึง Apache Hadoop ที่มีชื่อเสียง ในที่นี้ ฉันจะไม่พูดถึงการใช้งาน ฉันจะพยายามแนะนำแนวคิดนี้ด้วยวิธีที่เข้าใจง่ายที่สุด และนำเสนอตัวอย่างสำหรับทั้งของเล่นและตัวอย่างในชีวิตจริง
เริ่มต้นด้วยงานที่ตรงไปตรงมา คุณได้รับรายการสตริง และคุณต้องส่งคืนสตริงที่ยาวที่สุด มันค่อนข้างง่ายที่จะทำใน python:
def find_longest_string(list_of_strings): longest_string = None longest_string_len = 0 for s in list_of_strings: if len(s) > longest_string_len: longest_string_len = len(s) longest_string = s return longest_string
เราตรวจดูสตริงทีละรายการ คำนวณความยาวและเก็บสตริงที่ยาวที่สุดไว้จนกว่าเราจะเสร็จ
สำหรับรายการขนาดเล็ก มันทำงานได้ค่อนข้างเร็ว:
list_of_strings = ['abc', 'python', 'dima'] %time max_length = print(find_longest_string(list_of_strings)) OUTPUT: python CPU times: user 0 ns, sys: 0 ns, total: 0 ns Wall time: 75.8 µs
แม้แต่รายการที่มีองค์ประกอบมากกว่า 3 รายการก็ยังทำงานได้ดี แต่เราลองใช้องค์ประกอบ 3000 รายการ:
large_list_of_strings = list_of_strings*1000 %time print(find_longest_string(large_list_of_strings)) OUTPUT: python CPU times: user 0 ns, sys: 0 ns, total: 0 ns Wall time: 307 µs
แต่ถ้าเราพยายามหาองค์ประกอบ 300 ล้านองค์ประกอบล่ะ?
large_list_of_strings = list_of_strings*100000000 %time max_length = max(large_list_of_strings, key=len) OUTPUT: python CPU times: user 21.8 s, sys: 0 ns, total: 21.8 s Wall time: 21.8 s
นี่เป็นปัญหา ในแอปพลิเคชันส่วนใหญ่ เวลาตอบสนอง 20 วินาทีไม่เป็นที่ยอมรับ วิธีหนึ่งในการปรับปรุงเวลาในการคำนวณคือการซื้อ CPU ที่ดีขึ้นและเร็วขึ้นมาก การปรับขนาดระบบของคุณด้วยการแนะนำฮาร์ดแวร์ที่ดีขึ้นและเร็วขึ้นเรียกว่า "Vertical Scaling" แน่นอนว่าสิ่งนี้จะไม่ได้ผลตลอดไป ไม่เพียงแต่การค้นหา CPU ที่ทำงานเร็วขึ้น 10 เท่าไม่ใช่เรื่องง่ายเท่านั้น แต่ข้อมูลของเราอาจจะใหญ่ขึ้นด้วย และเราไม่ต้องการอัปเกรด CPU ของเราทุกครั้งที่โค้ดช้าลง โซลูชันของเราไม่สามารถปรับขนาดได้ แต่เราสามารถทำ "การปรับขนาดแนวนอน" แทนได้ เราจะออกแบบโค้ดของเราเพื่อให้สามารถทำงานแบบขนานได้ และจะเร็วขึ้นมากเมื่อเราเพิ่มโปรเซสเซอร์และ/หรือ CPU มากขึ้น
ในการทำเช่นนั้น เราต้องแบ่งโค้ดของเราออกเป็นส่วนประกอบเล็กๆ และดูว่าเราสามารถดำเนินการคำนวณแบบขนานได้อย่างไร สัญชาตญาณมีดังนี้: 1) แบ่งข้อมูลของเราออกเป็นหลาย ๆ ส่วน 2) เรียกใช้ฟังก์ชัน find_longest_string
สำหรับทุก ๆ ส่วนขนาน และ 3) ค้นหาสตริงที่ยาวที่สุดในบรรดาเอาต์พุตของชิ้นส่วนทั้งหมด
โค้ดของเรามีความเฉพาะเจาะจงมากและยากต่อการทำลายและแก้ไข ดังนั้นแทนที่จะใช้ฟังก์ชัน find_longest_string
เราจะพัฒนาเฟรมเวิร์กทั่วไปมากขึ้น ซึ่งจะช่วยให้เราทำการคำนวณต่างๆ พร้อมกันกับข้อมูลขนาดใหญ่
สิ่งสำคัญสองประการที่เราทำในโค้ดของเราคือการคำนวณ len
ของสตริงและเปรียบเทียบกับสตริงที่ยาวที่สุดจนถึงปัจจุบัน เราจะแบ่งโค้ดของเราออกเป็นสองขั้นตอน: 1) คำนวณ len
ของสตริงทั้งหมด และ 2) เลือกค่า max
%%time # step 1: list_of_string_lens = [len(s) for s in list_of_strings] list_of_string_lens = zip(list_of_strings, list_of_string_lens) #step 2: max_len = max(list_of_string_lens, key=lambda t: t[1]) print(max_len) OUTPUT: ('python', 6) CPU times: user 51.6 s, sys: 804 ms, total: 52.4 s Wall time: 52.4 s
(ฉันกำลังคำนวณความยาวของสตริงแล้ว zip
เข้าด้วยกันเพราะมันเร็วกว่าการทำในบรรทัดเดียวและทำซ้ำรายการสตริงมาก)
ในสถานะนี้ โค้ดจะทำงานช้าลงกว่าเดิมเพราะแทนที่จะส่งผ่านสตริงทั้งหมดของเราเพียงครั้งเดียว เราทำ 2 ครั้ง ขั้นแรกเพื่อคำนวณ len
จากนั้นจึงค้นหาค่า max
ทำไมมันถึงดีสำหรับเรา? เพราะตอนนี้ "ขั้นตอนที่ 2" ของเราได้รับเป็นอินพุต ไม่ใช่รายการสตริงดั้งเดิม แต่เป็นข้อมูลที่ประมวลผลล่วงหน้าบางส่วน สิ่งนี้ช่วยให้เราสามารถดำเนินการขั้นตอนที่สองโดยใช้เอาต์พุตของ "ขั้นตอนที่สอง" อื่น! เราจะเข้าใจสิ่งนั้นให้ดีขึ้นในอีกสักครู่ แต่ก่อนอื่น เรามาตั้งชื่อขั้นตอนเหล่านั้นกันก่อน เราจะเรียก "ขั้นตอนที่หนึ่ง" ว่า "ตัวทำแผนที่" เพราะมันจะจับคู่ค่าบางอย่างกับค่าอื่น และเราจะเรียก "ขั้นตอนที่สอง" ว่าเป็นตัวลดเพราะมันรับรายการค่าและสร้างค่าเดียว (ในกรณีส่วนใหญ่) . ต่อไปนี้เป็นฟังก์ชันตัวช่วยสองฟังก์ชันสำหรับ mapper และตัวลด:
mapper = len def reducer(p, c): if p[1] > c[1]: return p return c
ผู้ทำแผนที่เป็นเพียงฟังก์ชัน len
ได้รับสตริงและส่งกลับความยาว ตัวลดจะได้รับสองสิ่งอันดับเป็นอินพุตและส่งกลับอันที่มีความยาวมากที่สุด
มาเขียนโค้ดของเราใหม่โดยใช้ map
และ reduce
มีฟังก์ชันในตัวสำหรับสิ่งนี้ใน python (ใน python 3 เราต้องนำเข้าจาก functools
)
%%time #step 1 mapped = map(mapper, list_of_strings) mapped = zip(list_of_strings, mapped) #step 2: reduced = reduce(reducer, mapped) print(reduced) OUTPUT: ('python', 6) CPU times: user 57.9 s, sys: 0 ns, total: 57.9 s Wall time: 57.9 s
โค้ดทำสิ่งเดียวกันทุกประการ มันดูสวยงามกว่าเล็กน้อย แต่ยังเป็นแบบทั่วไปมากกว่าและจะช่วยเราทำให้มันขนานกัน ลองดูให้ละเอียดยิ่งขึ้น:
ขั้นตอนที่ 1 แมปรายการสตริงของเราลงในรายการสิ่งอันดับโดยใช้ฟังก์ชันตัวทำแผนที่ (ในที่นี้ฉันใช้ zip
อีกครั้งเพื่อหลีกเลี่ยงการทำซ้ำสตริง)
ขั้นตอนที่ 2 ใช้ฟังก์ชันตัวลด ข้ามสิ่งอันดับจากขั้นตอนที่หนึ่งแล้วนำไปใช้ทีละรายการ ผลลัพธ์ที่ได้คือสิ่งอันดับที่มีความยาวสูงสุด
ตอนนี้เรามาแบ่งข้อมูลที่เราป้อนออกเป็นส่วนๆ และทำความเข้าใจว่ามันทำงานอย่างไรก่อนที่เราจะทำการขนานกัน (เราจะใช้ chunkify
ที่แบ่งรายการขนาดใหญ่ออกเป็นชิ้นที่มีขนาดเท่ากัน):
data_chunks = chunkify(list_of_strings, number_of_chunks=30) #step 1: reduced_all = [] for chunk in data_chunks: mapped_chunk = map(mapper, chunk) mapped_chunk = zip(chunk, mapped_chunk) reduced_chunk = reduce(reducer, mapped_chunk) reduced_all.append(reduced_chunk) #step 2: reduced = reduce(reducer, reduced_all) print(reduced) OUTPUT: ('python', 6)
ในขั้นตอนที่หนึ่ง เราจะข้ามส่วนต่างๆ ของเราและค้นหาสตริงที่ยาวที่สุดในส่วนนั้นโดยใช้แผนที่แล้วย่อขนาด ในขั้นตอนที่สอง เราจะนำเอาต์พุตของขั้นตอนที่หนึ่งซึ่งเป็นรายการของค่าที่ลดลง และดำเนินการลดขั้นสุดท้ายเพื่อให้ได้สตริงที่ยาวที่สุด เราใช้ number_of_chunks=36
เพราะนี่คือจำนวน CPU ที่ฉันมีในเครื่อง
เราเกือบจะพร้อมที่จะรันโค้ดของเราไปพร้อมๆ กันแล้ว สิ่งเดียวที่เราสามารถทำได้ดีกว่าคือการเพิ่ม reduce
ขั้นตอนแรกลงใน mapper เดียว เราทำอย่างนั้นเพราะเราต้องการแบ่งโค้ดของเราออกเป็นสองขั้นตอนง่ายๆ และเนื่องจาก reduce
แรกทำงานในส่วนเดียว และเราต้องการทำให้มันขนานกันด้วยเช่นกัน นี่คือลักษณะที่ปรากฏ:
def chunks_mapper(chunk): mapped_chunk = map(mapper, chunk) mapped_chunk = zip(chunk, mapped_chunk) return reduce(reducer, mapped_chunk) %%time data_chunks = chunkify(list_of_strings, number_of_chunks=30) #step 1: mapped = map(chunks_mapper, data_chunks) #step 2: reduced = reduce(reducer, mapped) print(reduced) OUTPUT: ('python', 6) CPU times: user 58.5 s, sys: 968 ms, total: 59.5 s Wall time: 59.5 s
ตอนนี้เรามีโค้ดสองขั้นตอนที่ดูดีแล้ว หากเราดำเนินการตามที่เป็นอยู่ เราจะได้เวลาในการคำนวณเท่ากัน แต่ตอนนี้เราสามารถขนานขั้นตอนที่ 1 ได้โดยใช้โมดูล multiprocessing
เพียงใช้ฟังก์ชัน pool.map
แทนฟังก์ชัน map
ปกติ:
from multiprocessing import Pool pool = Pool(8) data_chunks = chunkify(large_list_of_strings, number_of_chunks=8) #step 1: mapped = pool.map(mapper, data_chunks) #step 2: reduced = reduce(reducer, mapped) print(reduced) OUTPUT: ('python', 6) CPU times: user 7.74 s, sys: 1.46 s, total: 9.2 s Wall time: 10.8 s
เราเห็นว่ามันวิ่งเร็วขึ้น 2 เท่า! มันไม่ใช่การปรับปรุงครั้งใหญ่ แต่ข่าวดีก็คือเราสามารถปรับปรุงได้โดยการเพิ่มจำนวนกระบวนการ! เรายังสามารถทำได้บนเครื่องมากกว่าหนึ่งเครื่อง หากข้อมูลของเรามีขนาดใหญ่มาก เราก็สามารถใช้เครื่องนับหมื่นหรือหลายพันเครื่องเพื่อทำให้เวลาในการคำนวณของเราสั้นลงเท่าที่เราต้องการ (เกือบ)
สถาปัตยกรรมของเราสร้างขึ้นโดยใช้สองฟังก์ชัน: map
และ reduce
หน่วยคำนวณแต่ละหน่วยจะจับคู่ข้อมูลอินพุตและดำเนินการลดขนาดเริ่มต้น ในที่สุด หน่วยรวมศูนย์บางหน่วยจะดำเนินการลดขั้นสุดท้ายและส่งกลับเอาต์พุต ดูเหมือนว่านี้:
สถาปัตยกรรมนี้มีข้อดีที่สำคัญสองประการ:
- สามารถปรับขนาดได้: หากเรามีข้อมูลมากขึ้น สิ่งเดียวที่เราต้องทำคือเพิ่มหน่วยประมวลผลมากขึ้น ไม่จำเป็นต้องเปลี่ยนรหัส!
- เป็นเรื่องทั่วไป: สถาปัตยกรรมนี้รองรับงานที่หลากหลาย เราสามารถแทนที่ฟังก์ชัน
map
และreduce
ด้วยเกือบทุกอย่าง และทำให้คอมพิวเตอร์ทำงานหลายอย่างในลักษณะที่ปรับขนาดได้
สิ่งสำคัญคือต้องทราบว่าในกรณีส่วนใหญ่ ข้อมูลของเราจะใหญ่มากและคงที่ หมายความว่าการแตกออกเป็นชิ้น ๆ ทุกครั้งนั้นไม่มีประสิทธิภาพและซ้ำซ้อนจริงๆ ดังนั้นในการใช้งานส่วนใหญ่ในชีวิตจริง เราจะจัดเก็บข้อมูลของเราเป็นชิ้น (หรือส่วนย่อย) ตั้งแต่เริ่มต้น จากนั้น เราจะสามารถคำนวณแบบต่างๆ ได้โดยใช้เทคนิค MapReduce
ตอนนี้เรามาดูตัวอย่างที่น่าสนใจมากขึ้น: จำนวนคำ!
สมมติว่าเรามีบทความข่าวชุดใหญ่มาก และเราต้องการหาคำที่ใช้ 10 อันดับแรก ไม่รวมคำหยุด เราจะทำยังไง? ก่อนอื่นเรามารับข้อมูลกันก่อน:
from sklearn.datasets import fetch_20newsgroups data = news.data*10
สำหรับโพสต์นี้ ฉันเพิ่มข้อมูลให้ใหญ่ขึ้น x10 เพื่อให้เราได้เห็นความแตกต่าง
สำหรับแต่ละข้อความในชุดข้อมูล เราต้องการสร้างโทเค็น ทำความสะอาด ลบคำหยุด และสุดท้ายก็นับคำ:
def clean_word(word): return re.sub(r'[^\w\s]','',word).lower() def word_not_in_stopwords(word): return word not in ENGLISH_STOP_WORDS and word and word.isalpha() def find_top_words(data): cnt = Counter() for text in data: tokens_in_text = text.split() tokens_in_text = map(clean_word, tokens_in_text) tokens_in_text = filter(word_not_in_stopwords, tokens_in_text) cnt.update(tokens_in_text) return cnt.most_common(10)
มาดูกันว่าถ้าไม่มี MapReduce จะต้องใช้เวลานานเท่าใด:
%time find_top_words(data) OUTPUT: [('subject', 122520), ('lines', 118240), ('organization', 111850), ('writes', 78360), ('article', 67540), ('people', 58320), ('dont', 58130), ('like', 57570), ('just', 55790), ('university', 55440)] CPU times: user 51.7 s, sys: 0 ns, total: 51.7 s Wall time: 51.7 s
ตอนนี้ เรามาเขียน mapper
,reducer
และ chunk_mapper
ของเรากัน:
def mapper(text): tokens_in_text = text.split() tokens_in_text = map(clean_word, tokens_in_text) tokens_in_text = filter(word_not_in_stopwords, tokens_in_text) return Counter(tokens_in_text) def reducer(cnt1, cnt2): cnt1.update(cnt2) return cnt1 def chunk_mapper(chunk): mapped = map(mapper, chunk) reduced = reduce(reducer, mapped) return reduced
mapper
รับข้อความ แยกออกเป็นโทเค็น ทำความสะอาดและกรองคำหยุดและไม่ใช่คำ ในที่สุดก็จะนับคำในเอกสารข้อความเดียวนี้ ฟังก์ชัน reducer
รับ 2 ตัวนับและรวมเข้าด้วยกัน chunk_mapper
ได้รับชิ้นส่วนและทำ MapReduce กับมัน ตอนนี้เรามารันโดยใช้เฟรมเวิร์กที่เราสร้างขึ้นและดู:
%%time data_chunks = chunkify(data, number_of_chunks=36) #step 1: mapped = pool.map(chunk_mapper, data_chunks) #step 2: reduced = reduce(reducer, mapped) print(reduced.most_common(10)) OUTPUT: [('subject', 122520), ('lines', 118240), ('organization', 111850), ('writes', 78360), ('article', 67540), ('people', 58320), ('dont', 58130), ('like', 57570), ('just', 55790), ('university', 55440)] CPU times: user 1.52 s, sys: 256 ms, total: 1.77 s Wall time: 4.67 s
เร็วขึ้น 10 เท่า! ที่นี่ เราสามารถใช้ประโยชน์จากพลังการคำนวณของเราได้จริง เนื่องจากงานมีความซับซ้อนมากขึ้นและต้องใช้มากกว่านั้น
โดยสรุป MapReduce เป็นเทคนิคที่น่าตื่นเต้นและจำเป็นสำหรับการประมวลผลข้อมูลขนาดใหญ่ สามารถจัดการงานได้มากมาย เช่น การนับ การค้นหา การเรียนรู้แบบมีผู้ดูแลและแบบไม่มีผู้ดูแล และอื่นๆ ปัจจุบันมีการนำไปใช้และเครื่องมือมากมายที่สามารถทำให้ชีวิตของเราสะดวกสบายมากขึ้น แต่ฉันคิดว่ามันสำคัญมากที่จะต้องเข้าใจพื้นฐาน