| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 | 'use strict';const net = require('net'),    tls = require('tls'),    EventParser = require('../entities/EventParser.js'),    Message = require('js-message'),    fs = require('fs'),    Queue = require('js-queue'),    Events = require('event-pubsub');let eventParser = new EventParser();class Client extends Events{    constructor(config,log){        super();        Object.assign(            this,            {                Client  : Client,                config  : config,                queue   : new Queue,                socket  : false,                connect : connect,                emit    : emit,                log     : log,                retriesRemaining:config.maxRetries||0,                explicitlyDisconnected: false            }        );        eventParser=new EventParser(this.config);    }}function emit(type,data){    this.log('dispatching event to ', this.id, this.path, ' : ', type, ',', data);    let message=new Message;    message.type=type;    message.data=data;    if(this.config.rawBuffer){        message=Buffer.from(type,this.config.encoding);    }else{        message=eventParser.format(message);    }    if(!this.config.sync){        this.socket.write(message);        return;    }    this.queue.add(        syncEmit.bind(this,message)    );}function syncEmit(message){    this.log('dispatching event to ', this.id, this.path, ' : ', message);    this.socket.write(message);}function connect(){    //init client object for scope persistance especially inside of socket events.    let client=this;    client.log('requested connection to ', client.id, client.path);    if(!this.path){        client.log('\n\n######\nerror: ', client.id ,' client has not specified socket path it wishes to connect to.');        return;    }    const options={};    if(!client.port){        client.log('Connecting client on Unix Socket :', client.path);        options.path=client.path;        if (process.platform ==='win32' && !client.path.startsWith('\\\\.\\pipe\\')){            options.path = options.path.replace(/^\//, '');            options.path = options.path.replace(/\//g, '-');            options.path= `\\\\.\\pipe\\${options.path}`;        }        client.socket = net.connect(options);    }else{        options.host=client.path;        options.port=client.port;        if(client.config.interface.localAddress){          options.localAddress=client.config.interface.localAddress;        }        if(client.config.interface.localPort){          options.localPort=client.config.interface.localPort;        }        if(client.config.interface.family){          options.family=client.config.interface.family;        }        if(client.config.interface.hints){          options.hints=client.config.interface.hints;        }        if(client.config.interface.lookup){          options.lookup=client.config.interface.lookup;        }        if(!client.config.tls){            client.log('Connecting client via TCP to', options);            client.socket = net.connect(options);        }else{            client.log('Connecting client via TLS to', client.path ,client.port,client.config.tls);            if(client.config.tls.private){                client.config.tls.key=fs.readFileSync(client.config.tls.private);            }            if(client.config.tls.public){                client.config.tls.cert=fs.readFileSync(client.config.tls.public);            }            if(client.config.tls.trustedConnections){                if(typeof client.config.tls.trustedConnections === 'string'){                    client.config.tls.trustedConnections=[client.config.tls.trustedConnections];                }                client.config.tls.ca=[];                for(let i=0; i<client.config.tls.trustedConnections.length; i++){                    client.config.tls.ca.push(                        fs.readFileSync(client.config.tls.trustedConnections[i])                    );                }            }            Object.assign(client.config.tls,options);            client.socket = tls.connect(                client.config.tls            );        }    }    client.socket.setEncoding(this.config.encoding);    client.socket.on(        'error',        function(err){            client.log('\n\n######\nerror: ', err);            client.publish('error', err);        }    );    client.socket.on(        'connect',        function connectionMade(){            client.publish('connect');            client.retriesRemaining=client.config.maxRetries;            client.log('retrying reset');        }    );    client.socket.on(        'close',        function connectionClosed(){            client.log('connection closed' ,client.id , client.path,            client.retriesRemaining, 'tries remaining of', client.config.maxRetries        );            if(                client.config.stopRetrying ||                client.retriesRemaining<1 ||                client.explicitlyDisconnected            ){                client.publish('disconnect');                client.log(                    (client.config.id),                    'exceeded connection rety amount of',                    ' or stopRetrying flag set.'                );                client.socket.destroy();                client.publish('destroy');                client=undefined;                return;            }            setTimeout(                function retryTimeout(){                    if (client.explicitlyDisconnected) {                        return;                    }                    client.retriesRemaining--;                    client.connect();                }.bind(null,client),                client.config.retry            );            client.publish('disconnect');        }    );    client.socket.on(        'data',        function(data) {            client.log('## received events ##');            if(client.config.rawBuffer){                client.publish(                   'data',                   Buffer.from(data,client.config.encoding)                );                if(!client.config.sync){                    return;                }                client.queue.next();                return;            }            if(!this.ipcBuffer){                this.ipcBuffer='';            }            data=(this.ipcBuffer+=data);            if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){                client.log('Messages are large, You may want to consider smaller messages.');                return;            }            this.ipcBuffer='';            const events = eventParser.parse(data);            const eCount = events.length;            for(let i=0; i<eCount; i++){                let message=new Message;                message.load(events[i]);                client.log('detected event', message.type, message.data);                client.publish(                   message.type,                   message.data                );            }            if(!client.config.sync){                return;            }            client.queue.next();        }    );}module.exports=Client;
 |