как приостановить и возобновить поток объекта узла во время обработки его вывода

В настоящее время я обрабатываю поток файлов построчно, пропуская его через поток преобразования, который генерирует 'line' события. Я хотел бы иметь возможность, обнаружив, что текущая строка соответствует некоторым критериям, приостановить поток входного файла, начать обработку нового потока и, когда это будет завершено, возобновить обработку исходного потока построчно. Я сократил это до минимального примера ниже:

test.coffee:

fs = require 'fs'    
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.pause()
    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
      console.error "UNPAUSE"
      inStream.resume()
  else
    process.stdout.write line

test-transform.coffee:

Transform = require('stream').Transform

module.exports =
class TestTransform extends Transform
  constructor: ->
    Transform.call @, readableObjectMode: true
    @buffer = ""

  pushLines: ->
    newlineIndex = @buffer.indexOf "\n"
    while newlineIndex isnt -1
      @push @buffer.substr(0, newlineIndex + 1)
      @emit 'line', @buffer.substr(0, newlineIndex + 1)
      @buffer = @buffer.substr(newlineIndex + 1)
      newlineIndex = @buffer.indexOf "\n"

  _transform: (chunk, enc, cb) ->
    @buffer = @buffer + chunk.toString()
    @pushLines()
    cb?()

  _flush: (cb) ->
    @pushLines()
    @buffer += "\n"             # ending newline
    @push @buffer
    @emit 'line', @buffer       # push last line
    @buffer = ""
    cb?()

(Не беспокойтесь о потоке Transform, это просто пример.) В любом случае результат coffee test.coffee выглядит так:

-->fs = require 'fs'
-->
-->TestTransform = require './test-transform'
-->
-->inStream = new TestTransform
-->
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->
-->inStream.on 'line', (line) ->
-->  process.stdout.write "-->"
-->  if line.match /line\.match/g
PAUSE
-->    process.stdout.write line
-->    console.error "PAUSE"
-->    inStream.pause()
-->    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
-->      console.error "UNPAUSE"
-->      inStream.unpause()
-->  else
-->    process.stdout.write line
-->
fs = require 'fs'

TestTransform = require './test-transform'

inStream = new TestTransform

fs.createReadStream("./test.coffee").pipe(inStream)

inStream.on 'line', (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.pause()
    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
      console.error "UNPAUSE"
      inStream.unpause()
  else
    process.stdout.write line

Итак, очевидно, что конвейер не приостанавливается, он просто продолжается до завершения (даже если PAUSE запускается так, как ожидалось), и поскольку "UNPAUSE" также никогда не записывается, обратный вызов 'end' никогда не запускается. Переключение потока на паузу / возобновление в readStream из потока преобразования, похоже, тоже не работает. Из этого поведения я предполагаю, что потоки узлов каким-то образом не соблюдают паузу / возобновление изнутри обратного вызова события.

Также может быть другой способ сделать это без вызова паузы / возобновления; если есть способ дождаться конца потока и приостановить текущий поток выполнения, это будет эффективно делать то, что я пытаюсь сделать.


person cosmicexplorer    schedule 18.04.2015    source источник
comment
Вам нужно завершить обработку, прежде чем вы снова начнете читать поток? Разве недостаточно запустить новое задание обработки и продолжить чтение из потока? Узел хорош в выполнении задач асинхронно.   -  person Interrobang    schedule 18.04.2015
comment
@Interrobang да, я пытаюсь направить оба входных потока в один и тот же выходной поток, и важно, чтобы второй поток полностью считывался в выход до того, как остальная часть первого будет введена. Я не хочу, чтобы два потока перемежались в выводе.   -  person cosmicexplorer    schedule 18.04.2015
comment
Если достаточно, чтобы они не перемежались, вы можете использовать поток в стиле буфера, например concat-stream. В противном случае вам понадобится абстракция поверх потоков. Одним из интересных способов могло бы быть использование чего-то вроде Dust.js, которое может чередовать потоки изначально.   -  person Interrobang    schedule 18.04.2015
comment
Я думал о чем-то подобном. Поскольку я не ожидаю, что буду иметь дело с потоками длиной в гигабайты, я мог бы передать все это в буфер, а затем обрабатывать его всякий раз, когда выполняется другой поток. Однако я бы предпочел не хранить в памяти весь поток сразу. Я посмотрю на пыль, я такого раньше не видел.   -  person cosmicexplorer    schedule 18.04.2015


Ответы (2)


Если я правильно понял вопрос, вот простое приложение Node, использующее Dust.js, которое решает проблему.

Dust - это шаблонизатор, но одна из его лучших особенностей - это собственное понимание Node Streams. В этом примере используется Dust 2.7.0.

Я использую node-byline в качестве замены вашего потока Transform, но он делает то же самое - читает потоки построчно.

var fs = require('fs'),
    byline = require('byline'),
    dust = require('dustjs-linkedin');

var stream = byline(fs.createReadStream('./test.txt', { encoding: 'utf8' }));

var template = dust.loadSource(dust.compile('{#byline}--> {.|s}{~n}{match}{/byline}'));

dust.stream(template, {
  byline: stream,
  match: function(chunk, context) {
    var currentLine = context.current();

    if(currentLine.match(/line\.match/g)) {
      return fs.createReadStream('./test.txt', 'utf8');
    }
    return chunk;
  }
}).pipe(process.stdout);

Вот результат моей программы:

$ node index.js
--> fs = require 'fs'
--> TestTransform = require './test-transform'
--> inStream = new TestTransform
--> fs.createReadStream("./test.coffee").pipe(inStream)
--> inStream.on 'line', (line) ->
-->   process.stdout.write "-->"
-->   if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.pause()
    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
      console.error "UNPAUSE"
      inStream.resume()
  else
    process.stdout.write line

-->     process.stdout.write line
-->     console.error "PAUSE"
-->     inStream.pause()
-->     fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
-->       console.error "UNPAUSE"
-->       inStream.resume()
-->   else
-->     process.stdout.write line

Как видите, вывод правильно перемежается. Дайте мне знать, если я смогу подробнее рассказать о том, как работает часть «Пыль».

РЕДАКТИРОВАТЬ: Вот конкретное объяснение шаблона Dust.

{#byline} {! look for the context variable named `byline` !}
{! okay, it's a stream. For each `data` event, output this stuff once !}
-->
{.|s} {! output the current `data`. Use |s to turn off HTML escaping !}
{~n} {! a newline !}
{match} {! look up the variable called `match` !}
{! okay, it's a function. Run it and insert the result !}
{! if the result is a stream, stream it in. !}
{/byline} {! done looping !}
person Interrobang    schedule 18.04.2015
comment
В этом есть смысл! (прочитав синтаксис пыли lol) Я искал решение без внешних зависимостей, но это кажется довольно легким. Почему в функции сопоставления, заданной для dust.stream, выписана строка if line.match /line\.match/g? Похоже, что пыль просто вернет fs.createReadStream вместо самого фрагмента, и эта строка будет потеряна. - person cosmicexplorer; 18.04.2015
comment
Функция сопоставления вызывается один раз для каждой строки с использованием {#match/}. Если текущая строка (context.current()) совпадает, тогда функция передает содержимое test.txt. Если нет, он просто возвращает текущий chunk, что позволяет продолжить поток. - person Interrobang; 18.04.2015
comment
В этом есть смысл. О чем {.|s} часть строки шаблона? Я предполагаю, что это говорит ему читать из атрибута потока (который начинается с s) или просто из любого атрибута, если поток не существует, но это может быть совершенно не так. - person cosmicexplorer; 18.04.2015
comment
{.} означает текущий контекст, а |s означает, что HTML-экранирование не выполняется. - person Interrobang; 19.04.2015
comment
Я обновил ответ, добавив более подробное объяснение шаблона. - person Interrobang; 19.04.2015

На самом деле я тоже нашел отдельный ответ; не так красиво, но тоже работает.

По сути, pause() только приостанавливает вывод из конвейерного потока (в «текущем» режиме); так как я слушал событие 'line', оно не протекало, и поэтому pause, конечно, ничего не сделал. Итак, первое решение заключалось в использовании removeListener вместо pause, что фактически останавливает потоковую передачу. Теперь файл выглядит так:

fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.removeListener 'line', c
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      console.error "UNPAUSE"
      inStream.on 'line', c
    f.pipe(process.stdout)
  else
    process.stdout.write line
inStream.on 'line', c

И это дает результат, который почти работает:

-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->c = (line) ->
-->  process.stdout.write "-->"
-->  if line.match /line\.match/g
PAUSE
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.removeListener 'line', c
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      console.error "UNPAUSE"
      inStream.on 'line', c
    f.pipe(process.stdout)
  else
    process.stdout.write line
inStream.on 'line', c
UNPAUSE

Однако похоже, что исходный читаемый поток просто остановился, когда я удалил слушателя; это имеет какой-то извращенный смысл (я полагаю, что мусор узла собирает свои читаемые потоки, когда все слушатели были удалены). Итак, окончательное рабочее решение, которое я нашел, зависит от трубопроводов. Поскольку поток Transform, который я показал выше, также подталкивает свой вывод построчно к любым 'data' слушателям, pause() можно эффективно использовать здесь для его первоначальной цели, не просто убивая поток. Окончательный результат:

fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
  line = chunk.toString()
  process.stdout.write "-->#{line}"
  if line.match /line\.match/g
    inStream.pause()
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      inStream.resume()
    f.pipe(process.stdout)

с выходом:

-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->inStream.on 'data', (chunk) ->
-->  line = chunk.toString()
-->  process.stdout.write "-->#{line}"
-->  if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
  line = chunk.toString()
  process.stdout.write "-->#{line}"
  if line.match /line\.match/g
    inStream.pause()
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      inStream.resume()
    f.pipe(process.stdout)
-->    inStream.pause()
-->    f = fs.createReadStream("./test.coffee")
-->    f.on 'end', ->
-->      inStream.resume()
-->    f.pipe(process.stdout)
-->

что и было предполагаемым результатом.

person cosmicexplorer    schedule 18.04.2015