ขณะนี้ฉันกำลังประมวลผลสตรีมไฟล์ทีละบรรทัดโดยเรียกใช้ผ่านสตรีมการแปลงที่ปล่อยเหตุการณ์ 'line'
เมื่อพบว่าบรรทัดปัจจุบันตรงกับเกณฑ์บางอย่าง ให้หยุดสตรีมไฟล์อินพุตชั่วคราว เริ่มประมวลผลสตรีมใหม่ และเมื่อเสร็จสิ้นแล้ว ให้ดำเนินการประมวลผลสตรีมต้นฉบับต่อทีละบรรทัด ฉันได้ย่อมันให้เหลือเพียงตัวอย่างด้านล่าง:
ทดสอบกาแฟ:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.resume()
else
process.stdout.write line
ทดสอบ-transform.coffee:
Transform = require('stream').Transform
module.exports =
class TestTransform extends Transform
constructor: ->
Transform.call @, readableObjectMode: true
@buffer = ""
pushLines: ->
newlineIndex = @buffer.indexOf "\n"
while newlineIndex isnt -1
@push @buffer.substr(0, newlineIndex + 1)
@emit 'line', @buffer.substr(0, newlineIndex + 1)
@buffer = @buffer.substr(newlineIndex + 1)
newlineIndex = @buffer.indexOf "\n"
_transform: (chunk, enc, cb) ->
@buffer = @buffer + chunk.toString()
@pushLines()
cb?()
_flush: (cb) ->
@pushLines()
@buffer += "\n" # ending newline
@push @buffer
@emit 'line', @buffer # push last line
@buffer = ""
cb?()
(อย่ากังวลกับสตรีม Transform มากเกินไป นี่เป็นเพียงตัวอย่าง) อย่างไรก็ตาม ผลลัพธ์ของ coffee test.coffee
จะเป็นดังนี้:
-->fs = require 'fs'
-->
-->TestTransform = require './test-transform'
-->
-->inStream = new TestTransform
-->
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->
-->inStream.on 'line', (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
PAUSE
--> process.stdout.write line
--> console.error "PAUSE"
--> inStream.pause()
--> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
--> console.error "UNPAUSE"
--> inStream.unpause()
--> else
--> process.stdout.write line
-->
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.unpause()
else
process.stdout.write line
เห็นได้ชัดว่าไปป์ไม่ได้ถูกหยุดชั่วคราว มันแค่ดำเนินต่อไปจนกว่าจะเสร็จสิ้น (แม้ว่า PAUSE
จะทำงานตามที่คาดไว้) และเนื่องจาก "UNPAUSE"
ไม่เคยถูกเขียนออกมาเช่นกัน การเรียกกลับ 'end'
จึงไม่เริ่มทำงาน การสลับสตรีมเพื่อหยุดชั่วคราว/ยกเลิกการหยุดชั่วคราวเป็น readStream จากสตรีมการแปลงดูเหมือนจะไม่ทำงานเช่นกัน ฉันสมมติจากพฤติกรรมนี้ว่าสตรีมโหนดไม่เคารพการหยุดชั่วคราว/ยกเลิกการหยุดชั่วคราวจากภายในการโทรกลับของเหตุการณ์
อาจมีวิธีอื่นในการบรรลุเป้าหมายนี้โดยไม่ต้องเรียกหยุดชั่วคราว/ยกเลิกการหยุดชั่วคราว หากมีวิธีการบางอย่างที่จะรอการสิ้นสุดของสตรีมและหยุดเธรดการดำเนินการปัจจุบันชั่วคราว นั่นจะเป็นสิ่งที่ฉันพยายามทำอย่างมีประสิทธิภาพ
concat-stream
มิฉะนั้นคุณจะต้องมีสิ่งที่เป็นนามธรรมที่ด้านบนของสตรีม วิธีหนึ่งที่น่าสนใจคือการใช้ Dust.js ซึ่งสามารถแทรกสตรีมแบบเนทีฟได้ - person Interrobang   schedule 18.04.2015