| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 | 'use strict';const net = require('net'),    tls = require('tls'),    fs = require('fs'),    dgram = require('dgram'),    EventParser = require('../entities/EventParser.js'),    Message = require('js-message'),    Events = require('event-pubsub');let eventParser = new EventParser();class Server extends Events{    constructor(path,config,log,port){        super();        Object.assign(            this,            {                config          : config,                path            : path,                port            : port,                udp4            : false,                udp6            : false,                log             : log,                server          : false,                sockets         : [],                emit            : emit,                broadcast       : broadcast            }        );        eventParser=new EventParser(this.config);        this.on(            'close',            serverClosed.bind(this)        );    }    onStart(socket){        this.trigger(            'start',            socket        );    }    stop(){        this.server.close();    }    start(){        if(!this.path){            this.log('Socket Server Path not specified, refusing to start');            return;        }        if(this.config.unlink){            fs.unlink(                this.path,                startServer.bind(this)            );        }else{            startServer.bind(this)();        }    }}function emit(socket, type, data){    this.log('dispatching event to socket', ' : ', type, data);    let message=new Message;    message.type=type;    message.data=data;    if(this.config.rawBuffer){        this.log(this.config.encoding)        message=Buffer.from(type,this.config.encoding);    }else{        message=eventParser.format(message);    }    if(this.udp4 || this.udp6){        if(!socket.address || !socket.port){            this.log('Attempting to emit to a single UDP socket without supplying socket address or port. Redispatching event as broadcast to all connected sockets');            this.broadcast(type,data);            return;        }        this.server.write(            message,            socket        );        return;    }    socket.write(message);}function broadcast(type,data){    this.log('broadcasting event to all known sockets listening to ', this.path,' : ', ((this.port)?this.port:''), type, data);    let message=new Message;    message.type=type;    message.data=data;    if(this.config.rawBuffer){        message=Buffer.from(type,this.config.encoding);    }else{        message=eventParser.format(message);    }    if(this.udp4 || this.udp6){        for(let i=1, count=this.sockets.length; i<count; i++){            this.server.write(message,this.sockets[i]);        }    }else{        for(let i=0, count=this.sockets.length; i<count; i++){            this.sockets[i].write(message);        }    }}function serverClosed(){    for(let i=0, count=this.sockets.length; i<count; i++){        let socket=this.sockets[i];        let destroyedSocketId=false;        if(socket){            if(socket.readable){                continue;            }        }        if(socket.id){            destroyedSocketId=socket.id;        }        this.log('socket disconnected',destroyedSocketId.toString());        if(socket && socket.destroy){            socket.destroy();        }        this.sockets.splice(i,1);        this.publish('socket.disconnected', socket, destroyedSocketId);        return;    }}function gotData(socket,data,UDPSocket){    let sock=((this.udp4 || this.udp6)? UDPSocket : socket);    if(this.config.rawBuffer){        data=Buffer.from(data,this.config.encoding);        this.publish(            'data',            data,            sock        );        return;    }    if(!sock.ipcBuffer){        sock.ipcBuffer='';    }    data=(sock.ipcBuffer+=data);    if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){        this.log('Messages are large, You may want to consider smaller messages.');        return;    }    sock.ipcBuffer='';    data=eventParser.parse(data);    while(data.length>0){        let message=new Message;        message.load(data.shift());        // Only set the sock id if it is specified.        if (message.data && message.data.id){            sock.id=message.data.id;        }        this.log('received event of : ',message.type,message.data);        this.publish(            message.type,            message.data,            sock        );    }}function socketClosed(socket){    this.publish(        'close',        socket    );}function serverCreated(socket) {    this.sockets.push(socket);    if(socket.setEncoding){        socket.setEncoding(this.config.encoding);    }    this.log('## socket connection to server detected ##');    socket.on(        'close',        socketClosed.bind(this)    );    socket.on(        'error',        function(err){            this.log('server socket error',err);            this.publish('error',err);        }.bind(this)    );    socket.on(        'data',        gotData.bind(this,socket)    );    socket.on(        'message',        function(msg,rinfo) {            if (!rinfo){                return;            }            this.log('Received UDP message from ', rinfo.address, rinfo.port);            let data;            if(this.config.rawSocket){                data=Buffer.from(msg,this.config.encoding);            }else{                data=msg.toString();            }            socket.emit('data',data,rinfo);        }.bind(this)    );    this.publish(        'connect',        socket    );    if(this.config.rawBuffer){        return;    }}function startServer() {    this.log(        'starting server on ',this.path,        ((this.port)?`:${this.port}`:'')    );    if(!this.udp4 && !this.udp6){        this.log('starting TLS server',this.config.tls);        if(!this.config.tls){            this.server=net.createServer(                serverCreated.bind(this)            );        }else{            startTLSServer.bind(this)();        }    }else{        this.server=dgram.createSocket(            ((this.udp4)? 'udp4':'udp6')        );        this.server.write=UDPWrite.bind(this);        this.server.on(            'listening',            function UDPServerStarted() {                serverCreated.bind(this)(this.server);            }.bind(this)        );    }    this.server.on(        'error',        function(err){            this.log('server error',err);            this.publish(                'error',                err            );        }.bind(this)    );    this.server.maxConnections=this.config.maxConnections;    if(!this.port){        this.log('starting server as', 'Unix || Windows Socket');        if (process.platform ==='win32'){            this.path = this.path.replace(/^\//, '');            this.path = this.path.replace(/\//g, '-');            this.path= `\\\\.\\pipe\\${this.path}`;        }        this.server.listen({            path: this.path,            readableAll: this.config.readableAll,            writableAll: this.config.writableAll        }, this.onStart.bind(this));        return;    }    if(!this.udp4 && !this.udp6){        this.log('starting server as', (this.config.tls?'TLS':'TCP'));        this.server.listen(            this.port,            this.path,            this.onStart.bind(this)        );        return;    }    this.log('starting server as',((this.udp4)? 'udp4':'udp6'));    this.server.bind(        this.port,        this.path    );    this.onStart(        {            address : this.path,            port    : this.port        }    );}function startTLSServer(){    this.log('starting TLS server',this.config.tls);    if(this.config.tls.private){        this.config.tls.key=fs.readFileSync(this.config.tls.private);    }else{        this.config.tls.key=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/private/server.key`);    }    if(this.config.tls.public){        this.config.tls.cert=fs.readFileSync(this.config.tls.public);    }else{        this.config.tls.cert=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/server.pub`);    }    if(this.config.tls.dhparam){        this.config.tls.dhparam=fs.readFileSync(this.config.tls.dhparam);    }    if(this.config.tls.trustedConnections){        if(typeof this.config.tls.trustedConnections === 'string'){            this.config.tls.trustedConnections=[this.config.tls.trustedConnections];        }        this.config.tls.ca=[];        for(let i=0; i<this.config.tls.trustedConnections.length; i++){            this.config.tls.ca.push(                fs.readFileSync(this.config.tls.trustedConnections[i])            );        }    }    this.server=tls.createServer(        this.config.tls,        serverCreated.bind(this)    );}function UDPWrite(message,socket){    let data=Buffer.from(message, this.config.encoding);    this.server.send(        data,        0,        data.length,        socket.port,        socket.address,        function(err, bytes) {            if(err){                this.log('error writing data to socket',err);                this.publish(                    'error',                    function(err){                        this.publish('error',err);                    }                );            }        }    );}module.exports=Server;
 |