В настоящее время я обрабатываю поток файлов построчно, пропуская его через поток преобразования, который генерирует 'line'
события. Я хотел бы иметь возможность, обнаружив, что текущая строка соответствует некоторым критериям, приостановить поток входного файла, начать обработку нового потока и, когда это будет завершено, возобновить обработку исходного потока построчно. Я сократил это до минимального примера ниже:
test.coffee:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.resume()
else
process.stdout.write line
test-transform.coffee:
Transform = require('stream').Transform
module.exports =
class TestTransform extends Transform
constructor: ->
Transform.call @, readableObjectMode: true
@buffer = ""
pushLines: ->
newlineIndex = @buffer.indexOf "\n"
while newlineIndex isnt -1
@push @buffer.substr(0, newlineIndex + 1)
@emit 'line', @buffer.substr(0, newlineIndex + 1)
@buffer = @buffer.substr(newlineIndex + 1)
newlineIndex = @buffer.indexOf "\n"
_transform: (chunk, enc, cb) ->
@buffer = @buffer + chunk.toString()
@pushLines()
cb?()
_flush: (cb) ->
@pushLines()
@buffer += "\n" # ending newline
@push @buffer
@emit 'line', @buffer # push last line
@buffer = ""
cb?()
(Не беспокойтесь о потоке Transform, это просто пример.) В любом случае результат coffee test.coffee
выглядит так:
-->fs = require 'fs'
-->
-->TestTransform = require './test-transform'
-->
-->inStream = new TestTransform
-->
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->
-->inStream.on 'line', (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
PAUSE
--> process.stdout.write line
--> console.error "PAUSE"
--> inStream.pause()
--> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
--> console.error "UNPAUSE"
--> inStream.unpause()
--> else
--> process.stdout.write line
-->
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.unpause()
else
process.stdout.write line
Итак, очевидно, что конвейер не приостанавливается, он просто продолжается до завершения (даже если PAUSE
запускается так, как ожидалось), и поскольку "UNPAUSE"
также никогда не записывается, обратный вызов 'end'
никогда не запускается. Переключение потока на паузу / возобновление в readStream из потока преобразования, похоже, тоже не работает. Из этого поведения я предполагаю, что потоки узлов каким-то образом не соблюдают паузу / возобновление изнутри обратного вызова события.
Также может быть другой способ сделать это без вызова паузы / возобновления; если есть способ дождаться конца потока и приостановить текущий поток выполнения, это будет эффективно делать то, что я пытаюсь сделать.
concat-stream
. В противном случае вам понадобится абстракция поверх потоков. Одним из интересных способов могло бы быть использование чего-то вроде Dust.js, которое может чередовать потоки изначально. - person Interrobang   schedule 18.04.2015