วิธีหยุดชั่วคราวและยกเลิกการหยุดสตรีมออบเจ็กต์โหนดชั่วคราวขณะประมวลผลเอาต์พุต

ขณะนี้ฉันกำลังประมวลผลสตรีมไฟล์ทีละบรรทัดโดยเรียกใช้ผ่านสตรีมการแปลงที่ปล่อยเหตุการณ์ 'line' เมื่อพบว่าบรรทัดปัจจุบันตรงกับเกณฑ์บางอย่าง ให้หยุดสตรีมไฟล์อินพุตชั่วคราว เริ่มประมวลผลสตรีมใหม่ และเมื่อเสร็จสิ้นแล้ว ให้ดำเนินการประมวลผลสตรีมต้นฉบับต่อทีละบรรทัด ฉันได้ย่อมันให้เหลือเพียงตัวอย่างด้านล่าง:

ทดสอบกาแฟ:

fs = require 'fs'    
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.pause()
    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
      console.error "UNPAUSE"
      inStream.resume()
  else
    process.stdout.write line

ทดสอบ-transform.coffee:

Transform = require('stream').Transform

module.exports =
class TestTransform extends Transform
  constructor: ->
    Transform.call @, readableObjectMode: true
    @buffer = ""

  pushLines: ->
    newlineIndex = @buffer.indexOf "\n"
    while newlineIndex isnt -1
      @push @buffer.substr(0, newlineIndex + 1)
      @emit 'line', @buffer.substr(0, newlineIndex + 1)
      @buffer = @buffer.substr(newlineIndex + 1)
      newlineIndex = @buffer.indexOf "\n"

  _transform: (chunk, enc, cb) ->
    @buffer = @buffer + chunk.toString()
    @pushLines()
    cb?()

  _flush: (cb) ->
    @pushLines()
    @buffer += "\n"             # ending newline
    @push @buffer
    @emit 'line', @buffer       # push last line
    @buffer = ""
    cb?()

(อย่ากังวลกับสตรีม Transform มากเกินไป นี่เป็นเพียงตัวอย่าง) อย่างไรก็ตาม ผลลัพธ์ของ coffee test.coffee จะเป็นดังนี้:

-->fs = require 'fs'
-->
-->TestTransform = require './test-transform'
-->
-->inStream = new TestTransform
-->
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->
-->inStream.on 'line', (line) ->
-->  process.stdout.write "-->"
-->  if line.match /line\.match/g
PAUSE
-->    process.stdout.write line
-->    console.error "PAUSE"
-->    inStream.pause()
-->    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
-->      console.error "UNPAUSE"
-->      inStream.unpause()
-->  else
-->    process.stdout.write line
-->
fs = require 'fs'

TestTransform = require './test-transform'

inStream = new TestTransform

fs.createReadStream("./test.coffee").pipe(inStream)

inStream.on 'line', (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.pause()
    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
      console.error "UNPAUSE"
      inStream.unpause()
  else
    process.stdout.write line

เห็นได้ชัดว่าไปป์ไม่ได้ถูกหยุดชั่วคราว มันแค่ดำเนินต่อไปจนกว่าจะเสร็จสิ้น (แม้ว่า PAUSE จะทำงานตามที่คาดไว้) และเนื่องจาก "UNPAUSE" ไม่เคยถูกเขียนออกมาเช่นกัน การเรียกกลับ 'end' จึงไม่เริ่มทำงาน การสลับสตรีมเพื่อหยุดชั่วคราว/ยกเลิกการหยุดชั่วคราวเป็น readStream จากสตรีมการแปลงดูเหมือนจะไม่ทำงานเช่นกัน ฉันสมมติจากพฤติกรรมนี้ว่าสตรีมโหนดไม่เคารพการหยุดชั่วคราว/ยกเลิกการหยุดชั่วคราวจากภายในการโทรกลับของเหตุการณ์

อาจมีวิธีอื่นในการบรรลุเป้าหมายนี้โดยไม่ต้องเรียกหยุดชั่วคราว/ยกเลิกการหยุดชั่วคราว หากมีวิธีการบางอย่างที่จะรอการสิ้นสุดของสตรีมและหยุดเธรดการดำเนินการปัจจุบันชั่วคราว นั่นจะเป็นสิ่งที่ฉันพยายามทำอย่างมีประสิทธิภาพ


person cosmicexplorer    schedule 18.04.2015    source แหล่งที่มา
comment
คุณต้องดำเนินการให้เสร็จสิ้นก่อนที่จะเริ่มอ่านสตรีมอีกครั้งหรือไม่ การเริ่มงานประมวลผลใหม่และอ่านต่อจากสตรีมไม่เพียงพอหรือ โหนดเก่งในการทำสิ่งต่าง ๆ แบบอะซิงโครนัส   -  person Interrobang    schedule 18.04.2015
comment
@Interrobang ใช่ฉันกำลังพยายามไพพ์สตรีมอินพุตทั้งสองไปยังสตรีมเอาต์พุตเดียวกันและสิ่งสำคัญคือต้องอ่านสตรีมที่สองทั้งหมดไปยังเอาต์พุตก่อนที่ส่วนที่เหลือของสตรีมแรกจะถูกป้อน ฉันไม่ต้องการให้กระแสทั้งสองสลับกันในเอาต์พุต   -  person cosmicexplorer    schedule 18.04.2015
comment
หากเพียงพอที่จะไม่ให้พวกมันสลับกัน คุณสามารถใช้สตรีมแบบบัฟเฟอร์เช่น concat-stream มิฉะนั้นคุณจะต้องมีสิ่งที่เป็นนามธรรมที่ด้านบนของสตรีม วิธีหนึ่งที่น่าสนใจคือการใช้ Dust.js ซึ่งสามารถแทรกสตรีมแบบเนทีฟได้   -  person Interrobang    schedule 18.04.2015
comment
ฉันกำลังคิดอะไรแบบนั้น เนื่องจากฉันไม่ได้คาดหวังว่าจะต้องจัดการกับสตรีมที่มีความยาวเป็นกิกะไบต์ ฉันจึงสามารถวางมันทั้งหมดลงในบัฟเฟอร์แล้วประมวลผลทุกครั้งที่สตรีมอื่นเสร็จสิ้น ฉันไม่ต้องการเก็บสตรีมทั้งหมดไว้ในหน่วยความจำพร้อมกัน ผมจะลองดูฝุ่นครับ ผมไม่เคยเห็นมาก่อน   -  person cosmicexplorer    schedule 18.04.2015


คำตอบ (2)


หากฉันเข้าใจคำถามถูกต้อง นี่คือแอป Node ง่ายๆ ที่ใช้ Dust.js ที่ช่วยแก้ปัญหาได้

Dust เป็นกลไกการสร้างเทมเพลต แต่หนึ่งในคุณสมบัติที่ดีที่สุดคือความเข้าใจดั้งเดิมของ Node Streams ตัวอย่างนี้ใช้ Dust 2.7.0

ฉันใช้ node-byline แทนสตรีม Transform ของคุณ แต่มันทำสิ่งเดียวกัน-- อ่านสตรีมทีละบรรทัด

var fs = require('fs'),
    byline = require('byline'),
    dust = require('dustjs-linkedin');

var stream = byline(fs.createReadStream('./test.txt', { encoding: 'utf8' }));

var template = dust.loadSource(dust.compile('{#byline}--> {.|s}{~n}{match}{/byline}'));

dust.stream(template, {
  byline: stream,
  match: function(chunk, context) {
    var currentLine = context.current();

    if(currentLine.match(/line\.match/g)) {
      return fs.createReadStream('./test.txt', 'utf8');
    }
    return chunk;
  }
}).pipe(process.stdout);

นี่คือผลลัพธ์จากโปรแกรมของฉัน:

$ node index.js
--> fs = require 'fs'
--> TestTransform = require './test-transform'
--> inStream = new TestTransform
--> fs.createReadStream("./test.coffee").pipe(inStream)
--> inStream.on 'line', (line) ->
-->   process.stdout.write "-->"
-->   if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.pause()
    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
      console.error "UNPAUSE"
      inStream.resume()
  else
    process.stdout.write line

-->     process.stdout.write line
-->     console.error "PAUSE"
-->     inStream.pause()
-->     fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
-->       console.error "UNPAUSE"
-->       inStream.resume()
-->   else
-->     process.stdout.write line

อย่างที่คุณเห็น มันมีการแทรกเอาต์พุตอย่างเหมาะสม หากฉันสามารถอธิบายเพิ่มเติมเกี่ยวกับวิธีการทำงานของส่วน Dust ได้ โปรดแจ้งให้เราทราบ

แก้ไข: นี่คือคำอธิบายของเทมเพลต Dust โดยเฉพาะ

{#byline} {! look for the context variable named `byline` !}
{! okay, it's a stream. For each `data` event, output this stuff once !}
-->
{.|s} {! output the current `data`. Use |s to turn off HTML escaping !}
{~n} {! a newline !}
{match} {! look up the variable called `match` !}
{! okay, it's a function. Run it and insert the result !}
{! if the result is a stream, stream it in. !}
{/byline} {! done looping !}
person Interrobang    schedule 18.04.2015
comment
ดูเหมือนว่าจะสมเหตุสมผลแล้ว! (หลังจากอ่านไวยากรณ์ของฝุ่นแล้ว ฮ่าๆ) ฉันกำลังมองหาวิธีแก้ปัญหาที่ไม่มีการพึ่งพาจากภายนอก แต่ดูเหมือนว่าจะค่อนข้างเบา ในฟังก์ชันการจับคู่ที่กำหนดให้กับ dust.stream เหตุใดบรรทัด if line.match /line\.match/g จึงถูกเขียนออกมา ดูเหมือนว่าฝุ่นจะส่งกลับ fs.createReadStream แทนที่จะเป็นตัวก้อน และบรรทัดนั้นก็จะหายไป - person cosmicexplorer; 18.04.2015
comment
ฟังก์ชันการจับคู่จะถูกเรียกหนึ่งครั้งต่อบรรทัดโดยใช้ {#match/} หากบรรทัดปัจจุบัน (context.current()) ตรงกัน ฟังก์ชันจะสตรีมในเนื้อหาของ test.txt ถ้าไม่เช่นนั้น ก็จะส่งกลับค่า chunk ปัจจุบัน ซึ่งช่วยให้สตรีมดำเนินการต่อได้ - person Interrobang; 18.04.2015
comment
นั่นสมเหตุสมผลแล้ว ส่วน {.|s} เกี่ยวกับอะไรในสตริงเทมเพลต ฉันคิดว่านั่นกำลังบอกให้อ่านจากแอตทริบิวต์ stream (ซึ่งขึ้นต้นด้วย s) หรือเพียงแค่แอตทริบิวต์ใด ๆ หากไม่มี stream แต่นั่นอาจอยู่นอกฐานโดยสิ้นเชิง - person cosmicexplorer; 18.04.2015
comment
{.} หมายถึงบริบทปัจจุบัน และ |s หมายถึงไม่ใช้ HTML Escape - person Interrobang; 19.04.2015
comment
ฉันอัปเดตคำตอบพร้อมคำอธิบายเทมเพลตที่ละเอียดยิ่งขึ้น - person Interrobang; 19.04.2015

จริงๆ แล้วฉันก็พบคำตอบแยกต่างหากสำหรับเรื่องนี้เช่นกัน ไม่สวยเท่าแต่ก็ใช้งานได้

โดยพื้นฐานแล้ว pause() หยุดเอาต์พุตชั่วคราวจากสตรีมแบบไปป์เท่านั้น (ในโหมด "ไหล") เนื่องจากฉันกำลังฟังกิจกรรม 'line' จึงไม่ลื่นไหล และแน่นอน pause จึงไม่ทำอะไรเลย ดังนั้นวิธีแก้ปัญหาแรกคือใช้ removeListener แทน pause ซึ่งจะหยุดการสตรีมได้อย่างมีประสิทธิภาพ ตอนนี้ไฟล์ดูเหมือนว่า:

fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.removeListener 'line', c
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      console.error "UNPAUSE"
      inStream.on 'line', c
    f.pipe(process.stdout)
  else
    process.stdout.write line
inStream.on 'line', c

และสิ่งนี้สร้างผลลัพธ์ที่ เกือบ ใช้งานได้:

-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->c = (line) ->
-->  process.stdout.write "-->"
-->  if line.match /line\.match/g
PAUSE
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.removeListener 'line', c
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      console.error "UNPAUSE"
      inStream.on 'line', c
    f.pipe(process.stdout)
  else
    process.stdout.write line
inStream.on 'line', c
UNPAUSE

อย่างไรก็ตาม ดูเหมือนว่าสตรีมที่อ่านได้ดั้งเดิมจะหยุดลงเมื่อฉันลบ Listener ออก สิ่งนี้ทำให้รู้สึกบิดเบี้ยว (ฉันเดาว่าโหนดขยะจะรวบรวมสตรีมที่อ่านได้เมื่อผู้ฟังทั้งหมดถูกลบออก) ดังนั้นวิธีแก้ปัญหาสุดท้ายที่ฉันพบจึงอาศัยการวางท่อแทน เนื่องจากสตรีม Transform ที่ฉันแสดงไว้ด้านบนยังส่งเอาต์พุตทีละบรรทัดไปยังผู้ฟัง 'data' ใดๆ ก็ตาม pause() จึงสามารถนำมาใช้ที่นี่ได้อย่างมีประสิทธิภาพตามวัตถุประสงค์ดั้งเดิม โดยไม่ต้องฆ่าสตรีมเลย ผลลัพธ์สุดท้าย:

fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
  line = chunk.toString()
  process.stdout.write "-->#{line}"
  if line.match /line\.match/g
    inStream.pause()
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      inStream.resume()
    f.pipe(process.stdout)

มีเอาต์พุต:

-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->inStream.on 'data', (chunk) ->
-->  line = chunk.toString()
-->  process.stdout.write "-->#{line}"
-->  if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
  line = chunk.toString()
  process.stdout.write "-->#{line}"
  if line.match /line\.match/g
    inStream.pause()
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      inStream.resume()
    f.pipe(process.stdout)
-->    inStream.pause()
-->    f = fs.createReadStream("./test.coffee")
-->    f.on 'end', ->
-->      inStream.resume()
-->    f.pipe(process.stdout)
-->

ซึ่งเป็นผลตามที่ตั้งใจไว้

person cosmicexplorer    schedule 18.04.2015