| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 | var parse = require('url').parsevar events = require('events')var https = require('https')var http = require('http')var util = require('util')var httpsOptions = [  'pfx', 'key', 'passphrase', 'cert', 'ca', 'ciphers',  'rejectUnauthorized', 'secureProtocol', 'servername', 'checkServerIdentity']var bom = [239, 187, 191]var colon = 58var space = 32var lineFeed = 10var carriageReturn = 13// Beyond 256KB we could not observe any gain in performancevar maxBufferAheadAllocation = 1024 * 256// Headers matching the pattern should be removed when redirecting to different originvar reUnsafeHeader = /^(cookie|authorization)$/ifunction hasBom (buf) {  return bom.every(function (charCode, index) {    return buf[index] === charCode  })}/** * Creates a new EventSource object * * @param {String} url the URL to which to connect * @param {Object} [eventSourceInitDict] extra init params. See README for details. * @api public **/function EventSource (url, eventSourceInitDict) {  var readyState = EventSource.CONNECTING  var headers = eventSourceInitDict && eventSourceInitDict.headers  var hasNewOrigin = false  Object.defineProperty(this, 'readyState', {    get: function () {      return readyState    }  })  Object.defineProperty(this, 'url', {    get: function () {      return url    }  })  var self = this  self.reconnectInterval = 1000  self.connectionInProgress = false  function onConnectionClosed (message) {    if (readyState === EventSource.CLOSED) return    readyState = EventSource.CONNECTING    _emit('error', new Event('error', {message: message}))    // The url may have been changed by a temporary redirect. If that's the case,    // revert it now, and flag that we are no longer pointing to a new origin    if (reconnectUrl) {      url = reconnectUrl      reconnectUrl = null      hasNewOrigin = false    }    setTimeout(function () {      if (readyState !== EventSource.CONNECTING || self.connectionInProgress) {        return      }      self.connectionInProgress = true      connect()    }, self.reconnectInterval)  }  var req  var lastEventId = ''  if (headers && headers['Last-Event-ID']) {    lastEventId = headers['Last-Event-ID']    delete headers['Last-Event-ID']  }  var discardTrailingNewline = false  var data = ''  var eventName = ''  var reconnectUrl = null  function connect () {    var options = parse(url)    var isSecure = options.protocol === 'https:'    options.headers = { 'Cache-Control': 'no-cache', 'Accept': 'text/event-stream' }    if (lastEventId) options.headers['Last-Event-ID'] = lastEventId    if (headers) {      var reqHeaders = hasNewOrigin ? removeUnsafeHeaders(headers) : headers      for (var i in reqHeaders) {        var header = reqHeaders[i]        if (header) {          options.headers[i] = header        }      }    }    // Legacy: this should be specified as `eventSourceInitDict.https.rejectUnauthorized`,    // but for now exists as a backwards-compatibility layer    options.rejectUnauthorized = !(eventSourceInitDict && !eventSourceInitDict.rejectUnauthorized)    if (eventSourceInitDict && eventSourceInitDict.createConnection !== undefined) {      options.createConnection = eventSourceInitDict.createConnection    }    // If specify http proxy, make the request to sent to the proxy server,    // and include the original url in path and Host headers    var useProxy = eventSourceInitDict && eventSourceInitDict.proxy    if (useProxy) {      var proxy = parse(eventSourceInitDict.proxy)      isSecure = proxy.protocol === 'https:'      options.protocol = isSecure ? 'https:' : 'http:'      options.path = url      options.headers.Host = options.host      options.hostname = proxy.hostname      options.host = proxy.host      options.port = proxy.port    }    // If https options are specified, merge them into the request options    if (eventSourceInitDict && eventSourceInitDict.https) {      for (var optName in eventSourceInitDict.https) {        if (httpsOptions.indexOf(optName) === -1) {          continue        }        var option = eventSourceInitDict.https[optName]        if (option !== undefined) {          options[optName] = option        }      }    }    // Pass this on to the XHR    if (eventSourceInitDict && eventSourceInitDict.withCredentials !== undefined) {      options.withCredentials = eventSourceInitDict.withCredentials    }    req = (isSecure ? https : http).request(options, function (res) {      self.connectionInProgress = false      // Handle HTTP errors      if (res.statusCode === 500 || res.statusCode === 502 || res.statusCode === 503 || res.statusCode === 504) {        _emit('error', new Event('error', {status: res.statusCode, message: res.statusMessage}))        onConnectionClosed()        return      }      // Handle HTTP redirects      if (res.statusCode === 301 || res.statusCode === 302 || res.statusCode === 307) {        var location = res.headers.location        if (!location) {          // Server sent redirect response without Location header.          _emit('error', new Event('error', {status: res.statusCode, message: res.statusMessage}))          return        }        var prevOrigin = new URL(url).origin        var nextOrigin = new URL(location).origin        hasNewOrigin = prevOrigin !== nextOrigin        if (res.statusCode === 307) reconnectUrl = url        url = location        process.nextTick(connect)        return      }      if (res.statusCode !== 200) {        _emit('error', new Event('error', {status: res.statusCode, message: res.statusMessage}))        return self.close()      }      readyState = EventSource.OPEN      res.on('close', function () {        res.removeAllListeners('close')        res.removeAllListeners('end')        onConnectionClosed()      })      res.on('end', function () {        res.removeAllListeners('close')        res.removeAllListeners('end')        onConnectionClosed()      })      _emit('open', new Event('open'))      // text/event-stream parser adapted from webkit's      // Source/WebCore/page/EventSource.cpp      var buf      var newBuffer      var startingPos = 0      var startingFieldLength = -1      var newBufferSize = 0      var bytesUsed = 0      res.on('data', function (chunk) {        if (!buf) {          buf = chunk          if (hasBom(buf)) {            buf = buf.slice(bom.length)          }          bytesUsed = buf.length        } else {          if (chunk.length > buf.length - bytesUsed) {            newBufferSize = (buf.length * 2) + chunk.length            if (newBufferSize > maxBufferAheadAllocation) {              newBufferSize = buf.length + chunk.length + maxBufferAheadAllocation            }            newBuffer = Buffer.alloc(newBufferSize)            buf.copy(newBuffer, 0, 0, bytesUsed)            buf = newBuffer          }          chunk.copy(buf, bytesUsed)          bytesUsed += chunk.length        }        var pos = 0        var length = bytesUsed        while (pos < length) {          if (discardTrailingNewline) {            if (buf[pos] === lineFeed) {              ++pos            }            discardTrailingNewline = false          }          var lineLength = -1          var fieldLength = startingFieldLength          var c          for (var i = startingPos; lineLength < 0 && i < length; ++i) {            c = buf[i]            if (c === colon) {              if (fieldLength < 0) {                fieldLength = i - pos              }            } else if (c === carriageReturn) {              discardTrailingNewline = true              lineLength = i - pos            } else if (c === lineFeed) {              lineLength = i - pos            }          }          if (lineLength < 0) {            startingPos = length - pos            startingFieldLength = fieldLength            break          } else {            startingPos = 0            startingFieldLength = -1          }          parseEventStreamLine(buf, pos, fieldLength, lineLength)          pos += lineLength + 1        }        if (pos === length) {          buf = void 0          bytesUsed = 0        } else if (pos > 0) {          buf = buf.slice(pos, bytesUsed)          bytesUsed = buf.length        }      })    })    req.on('error', function (err) {      self.connectionInProgress = false      onConnectionClosed(err.message)    })    if (req.setNoDelay) req.setNoDelay(true)    req.end()  }  connect()  function _emit () {    if (self.listeners(arguments[0]).length > 0) {      self.emit.apply(self, arguments)    }  }  this._close = function () {    if (readyState === EventSource.CLOSED) return    readyState = EventSource.CLOSED    if (req.abort) req.abort()    if (req.xhr && req.xhr.abort) req.xhr.abort()  }  function parseEventStreamLine (buf, pos, fieldLength, lineLength) {    if (lineLength === 0) {      if (data.length > 0) {        var type = eventName || 'message'        _emit(type, new MessageEvent(type, {          data: data.slice(0, -1), // remove trailing newline          lastEventId: lastEventId,          origin: new URL(url).origin        }))        data = ''      }      eventName = void 0    } else if (fieldLength > 0) {      var noValue = fieldLength < 0      var step = 0      var field = buf.slice(pos, pos + (noValue ? lineLength : fieldLength)).toString()      if (noValue) {        step = lineLength      } else if (buf[pos + fieldLength + 1] !== space) {        step = fieldLength + 1      } else {        step = fieldLength + 2      }      pos += step      var valueLength = lineLength - step      var value = buf.slice(pos, pos + valueLength).toString()      if (field === 'data') {        data += value + '\n'      } else if (field === 'event') {        eventName = value      } else if (field === 'id') {        lastEventId = value      } else if (field === 'retry') {        var retry = parseInt(value, 10)        if (!Number.isNaN(retry)) {          self.reconnectInterval = retry        }      }    }  }}module.exports = EventSourceutil.inherits(EventSource, events.EventEmitter)EventSource.prototype.constructor = EventSource; // make stacktraces readable['open', 'error', 'message'].forEach(function (method) {  Object.defineProperty(EventSource.prototype, 'on' + method, {    /**     * Returns the current listener     *     * @return {Mixed} the set function or undefined     * @api private     */    get: function get () {      var listener = this.listeners(method)[0]      return listener ? (listener._listener ? listener._listener : listener) : undefined    },    /**     * Start listening for events     *     * @param {Function} listener the listener     * @return {Mixed} the set function or undefined     * @api private     */    set: function set (listener) {      this.removeAllListeners(method)      this.addEventListener(method, listener)    }  })})/** * Ready states */Object.defineProperty(EventSource, 'CONNECTING', {enumerable: true, value: 0})Object.defineProperty(EventSource, 'OPEN', {enumerable: true, value: 1})Object.defineProperty(EventSource, 'CLOSED', {enumerable: true, value: 2})EventSource.prototype.CONNECTING = 0EventSource.prototype.OPEN = 1EventSource.prototype.CLOSED = 2/** * Closes the connection, if one is made, and sets the readyState attribute to 2 (closed) * * @see https://developer.mozilla.org/en-US/docs/Web/API/EventSource/close * @api public */EventSource.prototype.close = function () {  this._close()}/** * Emulates the W3C Browser based WebSocket interface using addEventListener. * * @param {String} type A string representing the event type to listen out for * @param {Function} listener callback * @see https://developer.mozilla.org/en/DOM/element.addEventListener * @see http://dev.w3.org/html5/websockets/#the-websocket-interface * @api public */EventSource.prototype.addEventListener = function addEventListener (type, listener) {  if (typeof listener === 'function') {    // store a reference so we can return the original function again    listener._listener = listener    this.on(type, listener)  }}/** * Emulates the W3C Browser based WebSocket interface using dispatchEvent. * * @param {Event} event An event to be dispatched * @see https://developer.mozilla.org/en-US/docs/Web/API/EventTarget/dispatchEvent * @api public */EventSource.prototype.dispatchEvent = function dispatchEvent (event) {  if (!event.type) {    throw new Error('UNSPECIFIED_EVENT_TYPE_ERR')  }  // if event is instance of an CustomEvent (or has 'details' property),  // send the detail object as the payload for the event  this.emit(event.type, event.detail)}/** * Emulates the W3C Browser based WebSocket interface using removeEventListener. * * @param {String} type A string representing the event type to remove * @param {Function} listener callback * @see https://developer.mozilla.org/en/DOM/element.removeEventListener * @see http://dev.w3.org/html5/websockets/#the-websocket-interface * @api public */EventSource.prototype.removeEventListener = function removeEventListener (type, listener) {  if (typeof listener === 'function') {    listener._listener = undefined    this.removeListener(type, listener)  }}/** * W3C Event * * @see http://www.w3.org/TR/DOM-Level-3-Events/#interface-Event * @api private */function Event (type, optionalProperties) {  Object.defineProperty(this, 'type', { writable: false, value: type, enumerable: true })  if (optionalProperties) {    for (var f in optionalProperties) {      if (optionalProperties.hasOwnProperty(f)) {        Object.defineProperty(this, f, { writable: false, value: optionalProperties[f], enumerable: true })      }    }  }}/** * W3C MessageEvent * * @see http://www.w3.org/TR/webmessaging/#event-definitions * @api private */function MessageEvent (type, eventInitDict) {  Object.defineProperty(this, 'type', { writable: false, value: type, enumerable: true })  for (var f in eventInitDict) {    if (eventInitDict.hasOwnProperty(f)) {      Object.defineProperty(this, f, { writable: false, value: eventInitDict[f], enumerable: true })    }  }}/** * Returns a new object of headers that does not include any authorization and cookie headers * * @param {Object} headers An object of headers ({[headerName]: headerValue}) * @return {Object} a new object of headers * @api private */function removeUnsafeHeaders (headers) {  var safe = {}  for (var key in headers) {    if (reUnsafeHeader.test(key)) {      continue    }    safe[key] = headers[key]  }  return safe}
 |