Samstag, Februar 22, 2020

Erstellen eines Node.js-Streams aus zwei Pipe-Streams

Möchte ich kombinieren zwei Node.js Datenströme in einem durch die Rohrleitung, wenn möglich. Ich bin mit Verwandeln streams.

In anderen Worten, ich möchte meine Bibliothek zurück myStream für Menschen zu verwenden. Zum Beispiel könnten Sie schreiben:

process.stdin.pipe(myStream).pipe(process.stdout);

Intern und ich bin mit third-party vendorStream das bedeutet einige Arbeit, gesteckt in meine eigene Logik enthalten in myInternalStream. Also, was ist oben übersetzen würde:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);

Kann ich etwas tun? Ich habe versucht var myStream = vendorStream.pipe(myInternalStream) aber, dass offensichtlich nicht funktioniert.

Machen eine Analogie mit bash, sagen wir, ich möchte ein Programm schreiben, dass prüft, ob der Brief h ist in der letzten Zeile von einigen stream ( tail -n 1 | grep h ) habe, kann ich ein shell-script:

# myscript.sh
tail -n 1 | grep h

Und wenn die Leute dann tun:

$ printf "abc\ndef\nghi" | . myscript.sh

Funktioniert es einfach.

Dies ist, was ich habe, so weit:

//Combine a pipe of two streams into one stream

var util = require('util')
  , Transform = require('stream').Transform;

var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
  chunks1.push(chunk.toString());
  var pieces = (soFar + chunk).split('\n');
  soFar = pieces.pop();
  for (var i = 0; i < pieces.length; i++) {
    var piece = pieces[i];
    this.push(piece);
  }
  return done();
};

var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
  chunks2.push(chunk.toString());
  count = count + 1;
  this.push(count + ' ' + chunk.toString() + '\n');
  done();
};

var stdin = process.stdin;
var stdout = process.stdout;

process.on('exit', function () {
    console.error('chunks1: ' + JSON.stringify(chunks1));
    console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);


//stdin.pipe(stream1).pipe(stream2).pipe(stdout);

//$ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
//Outputs:
//1 abc
//2 def
//3 ghi
//chunks1: ["abc\nd","ef\nghi\n"]
//chunks2: ["abc","def","ghi"]

//Best working solution I could find
var stream3 = function(src) {
  return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);

//$ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
//Outputs:
//1 abc
//2 def
//3 ghi
//chunks1: ["abc\nd","ef\nghi\n"]
//chunks2: ["abc","def","ghi"]

Ist, ist das überhaupt möglich? Lassen Sie mich wissen, wenn das, was ich versuche zu tun, ist nicht klar.

Dank!

InformationsquelleAutor der Frage Nicolas Hery | 2013-07-04

2 Kommentare

  1. 25

    Können Sie beobachten, für etwas zu sein, an dem stream, und dann unpipe es und zu leiten, um die streams, die Sie interessiert:

    var PassThrough = require('stream').PassThrough;
    
    var stream3 = new PassThrough();
    
    //When a source stream is piped to us, undo that pipe, and save
    //off the source stream piped into our internally managed streams.
    stream3.on('pipe', function(source) {
      source.unpipe(this);
      this.transformStream = source.pipe(stream1).pipe(stream2);
    });
    
    //When we're piped to another stream, instead pipe our internal
    //transform stream to that destination.
    stream3.pipe = function(destination, options) {
      return this.transformStream.pipe(destination, options);
    };
    
    stdin.pipe(stream3).pipe(stdout);

    Können Sie extrahieren Sie diese Funktionalität in Ihre eigenen constructable stream-Klasse:

    var util = require('util');
    var PassThrough = require('stream').PassThrough;
    
    var StreamCombiner = function() {
      this.streams = Array.prototype.slice.apply(arguments);
    
      this.on('pipe', function(source) {
        source.unpipe(this);
        for(i in this.streams) {
          source = source.pipe(this.streams[i]);
        }
        this.transformStream = source;
      });
    };
    
    util.inherits(StreamCombiner, PassThrough);
    
    StreamCombiner.prototype.pipe = function(dest, options) {
      return this.transformStream.pipe(dest, options);
    };
    
    var stream3 = new StreamCombiner(stream1, stream2);
    stdin.pipe(stream3).pipe(stdout);

    InformationsquelleAutor der Antwort Michelle Tilley

  2. 2

    Eine option ist vielleicht multipipe können Sie die Kette mehrere Transformationen zusammen, verpackt als ein einzelner Strom-Transformation:

    //my-stream.js
    var multipipe = require('multipipe');
    
    module.exports = function createMyStream() {
      return multipipe(vendorStream, myInternalStream);
    };

    Dann können Sie tun:

    var createMyStream = require('./my-stream');
    
    var myStream = createMyStream();
    
    process.stdin.pipe(myStream).pipe(process.stdout);

    InformationsquelleAutor der Antwort Jonas Berlin

Kostenlose Online-Tests