| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 | 'use strict';var util = require('util');var EventEmitter = require('events').EventEmitter;function Hose(socket, options, filter) {  EventEmitter.call(this);  if (typeof options === 'function') {    filter = options;    options = {};  }  this.socket = socket;  this.options = options;  this.filter = filter;  this.buffer = null;  var self = this;  this.listeners = {    error: function(err) {      return self.onError(err);    },    data: function(chunk) {      return self.onData(chunk);    },    end: function() {      return self.onEnd();    }  };  this.socket.on('error', this.listeners.error);  this.socket.on('data', this.listeners.data);  this.socket.on('end', this.listeners.end);}util.inherits(Hose, EventEmitter);module.exports = Hose;Hose.create = function create(socket, options, filter) {  return new Hose(socket, options, filter);};Hose.prototype.detach = function detach() {  // Stop the flow  this.socket.pause();  this.socket.removeListener('error', this.listeners.error);  this.socket.removeListener('data', this.listeners.data);  this.socket.removeListener('end', this.listeners.end);};Hose.prototype.reemit = function reemit() {  var buffer = this.buffer;  this.buffer = null;  // Modern age  if (this.socket.unshift) {    this.socket.unshift(buffer);    if (this.socket.listeners('data').length > 0)      this.socket.resume();    return;  }  // Rusty node v0.8  if (this.socket.ondata)    this.socket.ondata(buffer, 0, buffer.length);  this.socket.emit('data', buffer);  this.socket.resume();};Hose.prototype.onError = function onError(err) {  this.detach();  this.emit('error', err);};Hose.prototype.onData = function onData(chunk) {  if (this.buffer)    this.buffer = Buffer.concat([ this.buffer, chunk ]);  else    this.buffer = chunk;  var self = this;  this.filter(this.buffer, function(err, protocol) {    if (err)      return self.onError(err);    // No protocol selected yet    if (!protocol)      return;    self.detach();    self.emit('select', protocol, self.socket);    self.reemit();  });};Hose.prototype.onEnd = function onEnd() {  this.detach();  this.emit('error', new Error('Not enough data to recognize protocol'));};
 |