!!! Listing 1: Überschreiben der Funktion _read !!! _read bitte kursiv var stream = require('stream'), util = require('util'); var Readable = stream.Readable; var MyStream = function (options) { Readable.call(this, options); // ... }; util.inherits(MyStream, Readable); MyStream.prototype._read = function (size) { // ... }; !!! Listing 2: Primzahlen berechnen var stream = require('stream'), util = require('util'); var Readable = stream.Readable; var isPrime = function (n) { for (var i = 2; i <= Math.sqrt(n); i++) { if (n % i === 0) { return false; } } return true; }; var Primes = function (options) { Readable.call(this, options); this._index = 2; }; util.inherits(Primes, Readable); Primes.prototype._read = function () { var foundPrime = false; do { if (isPrime(this._index)) { foundPrime = true; this.push(new Buffer('' + this._index, 'utf8')); } this._index++; } while(!foundPrime); }; !!! Listing 3: Ende des Datenstroms signalisieren Primes.prototype._read = function () { var foundPrime = false; do { if (this._index > this._max) { this.push(null); return; } if (isPrime(this._index)) { foundPrime = true; this.push(new Buffer('' + this._index, 'utf8')); } this._index++; } while(!foundPrime); }; !!! Listing 4: Listing 3 ohne Schleife Primes.prototype._read = function () { if (this._index > this._max) { this.push(null); return; } if (isPrime(this._index)) { this.push(new Buffer('' + this._index, 'utf8')); } this._index++; }; !!! Listing 5: Leere Zeichenkette statt Schleife Primes.prototype._read = function () { if (this._index > this._max) { this.push(null); return; } if (isPrime(this._index)) { this.push(new Buffer('' + this._index, 'utf8')); } else { this.push(''); } this._index++; }; !!! Listing 6: Ausnahme von der Regel: read-Aufruf innerhalb von _read !!! read und _read bitte kursiv Primes.prototype._read = function () { var that = this; if (this._index > this._max) { this.push(null); return; } if (isPrime(this._index)) { this.push(new Buffer('' + this._index, 'utf8')); } else { this.push(''); process.nextTick(function () { this.read(0); }); } this._index++; }; !!! Listing 7: Umschalten in den Object-Mode var Primes = function (max, options) { options = options || {}; options.objectMode = true; Readable.call(this, options); this._max = max; this._index = 2; }; !!! Listing 8: Daten direkt weitergeben Primes.prototype._read = function () { var that = this; if (this._index > this._max) { this.push(null); return; } if (isPrime(this._index)) { this.push(this._index); } else { this.push(''); process.nextTick(function () { that.read(0); }); } this._index++; }; !!! Listing 9: Vollständige Implementierung des Primzahlen-Stream var stream = require('stream'), util = require('util'); var Readable = stream.Readable; var isPrime = function (n) { for (var i = 2; i <= Math.sqrt(n); i++) { if (n % i === 0) { return false; } } return true; }; var Primes = function (max, options) { options = options || {}; options.objectMode = true; Readable.call(this, options); this._max = max; this._index = 2; }; util.inherits(Primes, Readable); Primes.prototype._read = function () { var foundPrime = false, that = this; do { if (this._index > this._max) { this.push(null); return; } if (isPrime(this._index)) { foundPrime = true; this.push(this._index); } this._index++; } while (!foundPrime); }; !!! Listing 10: Überschreiben der write-Funktion !!! write bitte kursiv var stream = require('stream'), util = require('util'); var Writable = stream.Writable; var MyStream = function (options) { Writable.call(this, options); // ... }; util.inherits(MyStream, Writable); MyStream.prototype._write = function (chunk, encoding, callback) { // ... }; !!! Listing 11: Ausgabe auf der Konsole var stream = require('stream'), util = require('util'); var Writable = stream.Writable; var Logger = function (options) { Writable.call(this, options); this._index = 0; }; util.inherits(Logger, Writable); Logger.prototype._write = function (chunk, encoding, callbacj) { var message = { index: this._index++, timestamp: new Date().getTime(), message: chunk.toString('utf8') }; console.log(message); callback(); }; !!! Listing 12: Aktivierung des Object-Mode var Logger = function (options) { options = options || {}; options.objectMode = true; Writable.call(this, options); this._index = 0; }; !!! Listing 13: Optionale Metadaten übergeben Logger.prototype._write = function (chunk, encoding, callback) { var message = { index: this._index++, timestamp: new Date().getTime(), level: chunk.level, message: chunk.message, metadata: chunk.metadata || {} }; console.log(message); callback(); }; !!! Listing 14: Objekt statt Zeichenkette übergeben logger.write({ level: 'info', message: 'Started application...', metadata: { host: 'localhost', port: 3000 } }); !!! Listing 15: Eigene Duplex-Klasse ableiten var stream = require('stream'), util = require('util'); var Duplex = stream.Duplex; var MyStream = function (options) { Duplex.call(this, options); // ... }; util.inherits(MyStream, Duplex); MyStream.prototype._read = function (size) { // ... }; MyStream.prototype._write = function (chunk, encoding, callback) { // ... }; !!! Listing 16: _transform statt _read und _write !!! bitte _transform, _read und _write kursiv var stream = require('stream'), util = require('util'); var Transform = stream.Transform; var MyStream = function (options) { Transform.call(this, options); // ... }; util.inherits(MyStream, Transform); MyStream.prototype._transform = function (chunk, encoding, callback) { // ... }; !!! Listing 17: Einfacher Transform-Stream var stream = require('stream'), util = require('util'); var Transform = stream.Transform; var ToUppercase = function (options) { Transform.call(this, options); }; util.inherits(ToUppercase, Transform); ToUppercase.prototype._transform = function (chunk, encoding, callback) { var text = chunk.toString('utf8'); this.push(text.toUpperCase(), 'utf8'); callback(); }; !!! Listing 18: Verwendung von ToUppercase !!! bitte ToUppercase kursiv var toUppercase = new ToUppercase(); toUppercase.on('data', function (data) { console.log(data.toString('utf8')); }); toUppercase.write('http://www.thenativeweb.io'); !!! Listing 19: Umkehrung der übergebenen Daten var stream = require('stream'), util = require('util'); var Transform = stream.Transform; var Reverse = function (options) { Transform.call(this, options); this._reversedText = ''; }; util.inherits(Reverse, Transform); Reverse.prototype._transform = function (chunk, encoding, callback) { var text = chunk.toString('utf8'); this._reversedText = text.split('').reverse().join('') + this._reversedText; callback(); }; Reverse.prototype._flush = function (callback) { this.push(this._reversedText, 'utf8'); callback(); }; !!! Listing 20: Daten analysieren und unverändert durchreichen var stream = require('stream'), util = require('util'); var Transform = stream.Transform; var GetLength = function (options) { Transform.call(this, options); this._length = 0; }; util.inherits(GetLength, Transform); GetLength.prototype._transform = function (chunk, encoding, callback) { this._length += chunk.length; this.push(chunk); callback(); }; GetLength.prototype._flush = function (callback) { this.emit('length', this._length); callback(); }; !!! Listing 21: Verwendung von GetLength !!! GetLength bitte kursiv var getLength = new GetLength(); getLength .on('data', function (data) { console.log(data.toString('utf8')); }) .on('length', function (length) { console.log(util.format('Length: %s bytes', this._length)); }); getLength.write('http://www.thenativeweb.io'); getLength.end();