clarify connection private/public method and properties

This commit is contained in:
rusher
2018-03-13 15:33:23 +01:00
parent 27c57e4863
commit f77d74aeee
9 changed files with 214 additions and 168 deletions

View File

@ -33,7 +33,7 @@ class Command extends EventEmitter {
} else {
this.emit("error", err);
}
if (err.fatal) this.connEvents.emit("error", err);
if (err.fatal) this.connEvents.emit("server_error", err);
this.emit("end");
this.onPacketReceive = null;
}

View File

@ -14,7 +14,7 @@ const Capabilities = require("../../const/capabilities");
*/
class Handshake extends Command {
constructor(conn) {
super(conn.events);
super(conn._events);
this.conn = conn;
}

View File

@ -16,9 +16,7 @@ class Quit extends Command {
out.startPacket(this);
out.writeInt8(0x01);
out.flushBuffer(true);
if (this.callback) {
this.callback();
}
this.callback();
this.emit("end");
return null;
}

View File

@ -6,6 +6,7 @@ const Net = require("net");
const PacketInputStream = require("./io/packet_input_stream");
const PacketOutputStream = require("./io/packet_output_stream");
const ServerStatus = require("./const/server-status");
const ConnectionInformation = require("./misc/connection-information");
const tls = require("tls");
/*commands*/
@ -16,15 +17,17 @@ const Query = require("./cmd/query");
class Connection {
constructor(options) {
this.events = new EventEmitter();
//public info
this.opts = options;
this.cmd = null;
this.cmdQueue = new Queue();
this.info = { threadId: -1 };
this.out = new PacketOutputStream(this.opts, this.info);
this.info = new ConnectionInformation();
this.addCommand = this._addCommandEnable;
this.addCommand(new Handshake(this));
//internal
this._events = new EventEmitter();
this._cmd = null;
this._cmdQueue = new Queue();
this._out = new PacketOutputStream(this.opts, this.info);
this._addCommand = this._addCommandEnable;
this._addCommand(new Handshake(this));
this._initSocket();
}
@ -50,7 +53,13 @@ class Connection {
if (this._connected) {
callback(null);
} else {
this.events.once("connect", () => callback(null));
this._events.once(
"connect",
function() {
this._connected = true;
callback(null);
}.bind(this)
);
}
}
@ -100,34 +109,6 @@ class Connection {
return this._changeTransaction(options, callback, "ROLLBACK");
}
_changeTransaction(options, callback, cmd) {
let _options, _cb;
if (typeof options === "function") {
_cb = options;
_options = null;
} else {
_options = options;
_cb = callback;
}
//TODO ensure that due to node.js threading system, there can't be race condition on status value
//TODO i.e. if possible race condition, just emit command every time.
if (
!(this.info.status & ServerStatus.STATUS_AUTOCOMMIT) &&
this.info.status & ServerStatus.STATUS_IN_TRANS
) {
const cmd = new Query(this.events, _options, cmd, null, _cb);
return this.addCommand(cmd);
}
if (!_cb) _cb();
return null;
}
/**
* Execute query using binary protocol.
*
@ -167,8 +148,8 @@ class Connection {
_cb = cb;
}
const cmd = new Query(this.events, _options, _sql, _values, _cb);
return this.addCommand(cmd);
const cmd = new Query(this._events, _options, _sql, _values, _cb);
return this._addCommand(cmd);
}
ping(options, callback) {
@ -183,9 +164,17 @@ class Connection {
*/
end(callback) {
this._clearConnectTimeout();
this.addCommand = this._addCommandDisabled;
this._addCommand = this._addCommandDisabled;
this._closing = true;
this._addCommandEnable(new Quit(this.events, callback));
this._addCommandEnable(
new Quit(
this._events,
function() {
this._clear();
if (callback) callback();
}.bind(this)
)
);
}
/**
@ -193,27 +182,27 @@ class Connection {
*/
destroy() {
this._clearConnectTimeout();
this.addCommand = this._addCommandDisabled;
this._addCommand = this._addCommandDisabled;
this._closing = true;
this.cmdQueue.clear();
this._cmdQueue.clear();
if (this.cmd) {
if (this._cmd) {
//socket is closed, but server may still be processing a huge select
//only possibility is to kill process by another thread
//TODO reuse a pool connection to avoid connection creation
const self = this;
const killCon = new Connection(this.opts);
killCon.query("KILL " + this.threadId, () => {
if (self.cmd) {
if (self._cmd) {
const err = Utils.createError(
"Connection destroyed, command was killed",
true,
this.info
);
if (self.cmd.onResult) {
self.cmd.onResult(err);
if (self._cmd.onResult) {
self._cmd.onResult(err);
} else {
self.cmd.emit("error", err);
self._cmd.emit("error", err);
}
}
process.nextTick(() => self._socket.destroy());
@ -222,6 +211,7 @@ class Connection {
} else {
this._socket.destroy();
}
this._clear();
}
pause() {
@ -233,11 +223,11 @@ class Connection {
}
on(eventName, listener) {
this.events.on(eventName, listener);
this._events.on(eventName, listener);
}
once(eventName, listener) {
this.events.once(eventName, listener);
this._events.once(eventName, listener);
}
//*****************************************************************
@ -245,110 +235,48 @@ class Connection {
//*****************************************************************
serverVersion() {
if (!this.info.serverVersion)
throw "cannot know if server information until connection is established";
return this.info.serverVersion;
return this.info.serverVersion();
}
isMariaDB() {
if (!this.info.serverVersion)
throw "cannot know if server is MariaDB until connection is established";
return this.info.serverVersion.mariaDb;
return this.info.isMariaDB();
}
hasMinVersion(major, minor, patch) {
if (!major) major = 0;
if (!minor) minor = 0;
if (!patch) patch = 0;
if (!this.info.serverVersion)
throw "cannot know if server version until connection is established";
let ver = this.info.serverVersion;
return (
ver.major > major ||
(ver.major === major && ver.minor > minor) ||
(ver.major === major && ver.minor === minor && ver.patch >= patch)
);
return this.info.hasMinVersion(major, minor, patch);
}
//*****************************************************************
// internal methods
//*****************************************************************
_onConnect() {
this._clearConnectTimeout();
this._connected = true;
}
_initSocket() {
//TODO handle pipe
if (this.opts.connectTimeout) {
this.timeoutRef = setTimeout(
this._connectTimeoutReached.bind(this),
this.opts.connectTimeout
);
this.events.once("connect", this._onConnect.bind(this));
}
let socket;
this._events.once(
"connect",
function() {
this._connected = true;
this._clearConnectTimeout();
}.bind(this)
);
if (this.opts.socketPath) {
socket = Net.connect(this.opts.socketPath);
this._socket = Net.connect(this.opts.socketPath);
} else {
socket = Net.connect(this.opts.port, this.opts.host);
this._socket = Net.connect(this.opts.port, this.opts.host);
}
let packetInputStream = new PacketInputStream(this);
socket.on("error", this._socketError.bind(this));
socket.on("data", chunk => packetInputStream.onData(chunk));
this.out.setWriter(buffer => this._socket.write(buffer));
this._socket = socket;
}
_socketError(err) {
if (!this._closing) {
this._fatalError(err);
}
}
_dispatchPacket(packet, header) {
if (this.opts.debug && packet && this.cmd) {
console.log(
"<== conn:%d %s (%d,%d)\n%s",
this.info.threadId ? this.info.threadId : -1,
this.cmd.onPacketReceive
? this.cmd.constructor.name + "." + this.cmd.onPacketReceive.name
: this.cmd.constructor.name,
packet.off,
packet.end,
Utils.log(packet.buf, packet.off, packet.end, header)
);
}
if (this.cmd) {
this.cmd.handle(packet, this.out, this.opts, this.info);
return;
}
if (packet && packet.peek() === 0xff) {
//can receive unexpected error packet from server/proxy
//to inform that connection is closed (usually by timeout)
let err = packet.readError(this.info);
if (err.fatal) {
this.events.emit("error", err);
this.close();
}
} else {
let err = Utils.createError(
"receiving packet from server without active commands",
true,
this.info
);
this.events.emit("error", err);
this.close();
}
let packetInputStream = new PacketInputStream(this._dispatchPacket.bind(this));
this._socket.on("error", this._socketError.bind(this));
this._socket.on("data", chunk => packetInputStream.onData(chunk));
this._events.on("server_error", this._fatalError.bind(this));
this._out.setWriter(buffer => this._socket.write(buffer));
}
_createSecureContext(callback) {
@ -367,7 +295,7 @@ class Connection {
let packetInputStream = new PacketInputStream(this);
let events = this.events;
let events = this._events;
secureSocket.on("error", this._socketError.bind(this));
secureSocket.on("data", chunk => packetInputStream.onData(chunk));
secureSocket.on("secureConnect", () => {
@ -375,14 +303,82 @@ class Connection {
callback();
});
this.out.setWriter(buffer => secureSocket.write(buffer));
this._out.setWriter(buffer => secureSocket.write(buffer));
}
_socketError(err) {
if (!this._closing) {
this._fatalError(err);
}
}
_dispatchPacket(packet, header) {
if (this.opts.debug && packet && this._cmd) {
console.log(
"<== conn:%d %s (%d,%d)\n%s",
this.info.threadId ? this.info.threadId : -1,
this._cmd.onPacketReceive
? this._cmd.constructor.name + "." + this._cmd.onPacketReceive.name
: this._cmd.constructor.name,
packet.off,
packet.end,
Utils.log(packet.buf, packet.off, packet.end, header)
);
}
if (this._cmd) {
this._cmd.handle(packet, this._out, this.opts, this.info);
return;
}
if (packet && packet.peek() === 0xff) {
//can receive unexpected error packet from server/proxy
//to inform that connection is closed (usually by timeout)
let err = packet.readError(this.info);
if (err.fatal) {
this._events.emit("error", err);
this.close();
}
} else {
let err = Utils.createError(
"receiving packet from server without active commands",
true,
this.info
);
this._events.emit("error", err);
this.close();
}
}
_changeTransaction(options, callback, sql) {
let _options, _cb;
if (typeof options === "function") {
_cb = options;
_options = null;
} else {
_options = options;
_cb = callback;
}
//TODO ensure that due to node.js threading system, there can't be race condition on status value
//TODO i.e. if possible race condition, just emit command every time.
if (
!(this.info.status & ServerStatus.STATUS_AUTOCOMMIT) &&
this.info.status & ServerStatus.STATUS_IN_TRANS
) {
const cmd = new Query(this._events, _options, sql, null, _cb);
return this._addCommand(cmd);
}
if (_cb) _cb();
return null;
}
_connectTimeoutReached() {
this._clearConnectTimeout();
this._socket.destroy && this._socket.destroy();
const err = Utils.createError("Connection timeout", true, this.info);
this.info = null;
this._fatalError(err);
}
@ -396,12 +392,13 @@ class Connection {
_addCommandEnable(cmd) {
let conn = this;
cmd.once("end", () => process.nextTick(() => conn._nextCmd()));
if (!this.cmd && this.cmdQueue.isEmpty()) {
this.cmd = cmd;
this.cmd.init(this.out, this.opts, this.info);
if (!this._cmd && this._cmdQueue.isEmpty()) {
this._cmd = cmd;
this._cmd.init(this._out, this.opts, this.info);
} else {
this.cmdQueue.push(cmd);
this._cmdQueue.push(cmd);
}
return cmd;
}
_addCommandDisabled(cmd) {
@ -424,22 +421,31 @@ class Connection {
_fatalError(err) {
err.fatal = true;
//prevent any new action
this.addCommand = this._addCommandDisabled;
this.cmdQueue.clear();
this._addCommand = this._addCommandDisabled;
//disabled events
this._socket.destroy();
this._closing = true;
if (this.cmd && this.cmd.onResult) {
this.cmd.onResult(err);
if (this._cmd && this._cmd.onResult) {
this._cmd.onResult(err);
}
this.events.emit("error", err);
this._events.emit("error", err);
this._events.emit("end");
this._clear();
}
_nextCmd() {
if ((this.cmd = this.cmdQueue.shift())) {
this.cmd.init(this.out, this.opts, this.info);
if ((this._cmd = this._cmdQueue.shift())) {
this._cmd.init(this._out, this.opts, this.info);
}
}
_clear() {
this._clearConnectTimeout();
this._cmdQueue.clear();
this._out = null;
this._socket = null;
this._events.removeAllListeners();
}
}
module.exports = Connection;

View File

@ -7,8 +7,8 @@ const Packet = require("./Packet");
* see : https://mariadb.com/kb/en/library/0-packet/
*/
class PacketInputStream {
constructor(conn) {
this.conn = conn;
constructor(dispatchPacket) {
this.dispatchPacket = dispatchPacket;
//in case packet is not complete
this.header = Buffer.allocUnsafe(4);
@ -23,7 +23,7 @@ class PacketInputStream {
}
receivePacket(packet) {
this.conn._dispatchPacket(packet, this.header);
this.dispatchPacket(packet, this.header);
}
resetHeader() {

View File

@ -0,0 +1,37 @@
"use strict";
class ConnectionInformation {
constructor() {
this.threadId = -1;
this.status = null;
}
serverVersion() {
if (!this.serverVersion)
throw "cannot know if server information until connection is established";
return this.serverVersion;
}
isMariaDB() {
if (!this.serverVersion)
throw "cannot know if server is MariaDB until connection is established";
return this.serverVersion.mariaDb;
}
hasMinVersion(major, minor, patch) {
if (!this.serverVersion) throw "cannot know if server version until connection is established";
if (!major) major = 0;
if (!minor) minor = 0;
if (!patch) patch = 0;
let ver = this.serverVersion;
return (
ver.major > major ||
(ver.major === major && ver.minor > minor) ||
(ver.major === major && ver.minor === minor && ver.patch >= patch)
);
}
}
module.exports = ConnectionInformation;

View File

@ -28,19 +28,18 @@ describe("connection event", () => {
conn.on("connect", () => {
eventNumber++;
});
conn.on("error", () => {
eventNumber++;
});
conn.on("end", () => {
eventNumber++;
assert.equal(eventNumber, 3);
done();
});
conn.query("KILL CONNECTION_ID()", err => {
assert.equal(eventNumber, 2);
conn.end(err => {
assert.equal(eventNumber, 2);
done();
});
});
const query = conn.query("KILL CONNECTION_ID()");
query.on("error", () => {});
});
});

View File

@ -9,8 +9,14 @@ describe("connection timeout", () => {
const conn = base.createConnection({ host: "www.google.fr", connectTimeout: 1000 });
conn.on("error", err => {
assert.strictEqual(err.message, "(conn=-1) Connection timeout");
assert.isTrue(Date.now() - initTime >= 1000);
assert.isTrue(Date.now() - initTime < 1050);
assert.isTrue(
Date.now() - initTime >= 1000,
"expected > 1000, but was " + (Date.now() - initTime)
);
assert.isTrue(
Date.now() - initTime < 1050,
"expected < 1050, but was " + (Date.now() - initTime)
);
done();
});
});

View File

@ -22,7 +22,7 @@ describe("test PacketInputStream data", () => {
}
};
let buf = Buffer.from([5, 0, 0, 0, 1, 2, 3, 4, 5]);
let pis = new PacketInputStream(conn);
let pis = new PacketInputStream(conn._dispatchPacket.bind(conn));
pis.onData(buf);
});
@ -33,7 +33,7 @@ describe("test PacketInputStream data", () => {
}
};
let pis = new PacketInputStream(conn);
let pis = new PacketInputStream(conn._dispatchPacket.bind(conn));
pis.onData(Buffer.from([5]));
pis.onData(Buffer.from([0, 0, 0, 1, 2, 3, 4, 5]));
});
@ -45,7 +45,7 @@ describe("test PacketInputStream data", () => {
}
};
let pis = new PacketInputStream(conn);
let pis = new PacketInputStream(conn._dispatchPacket.bind(conn));
pis.onData(Buffer.from([5, 0]));
pis.onData(Buffer.from([0, 0, 1, 2, 3, 4, 5]));
});
@ -57,7 +57,7 @@ describe("test PacketInputStream data", () => {
}
};
let pis = new PacketInputStream(conn);
let pis = new PacketInputStream(conn._dispatchPacket.bind(conn));
pis.onData(Buffer.from([5, 0]));
pis.onData(Buffer.from([0]));
pis.onData(Buffer.from([0, 1, 2, 3, 4, 5]));
@ -70,7 +70,7 @@ describe("test PacketInputStream data", () => {
}
};
let pis = new PacketInputStream(conn);
let pis = new PacketInputStream(conn._dispatchPacket.bind(conn));
pis.onData(Buffer.from([5, 0]));
pis.onData(Buffer.from([0, 0]));
pis.onData(Buffer.from([1, 2, 3, 4, 5]));
@ -83,7 +83,7 @@ describe("test PacketInputStream data", () => {
}
};
let pis = new PacketInputStream(conn);
let pis = new PacketInputStream(conn._dispatchPacket.bind(conn));
pis.onData(Buffer.from([5, 0, 0, 0, 1, 2]));
pis.onData(Buffer.from([3, 4, 5]));
});
@ -103,7 +103,7 @@ describe("test PacketInputStream data", () => {
}
};
let pis = new PacketInputStream(conn);
let pis = new PacketInputStream(conn._dispatchPacket.bind(conn));
pis.onData(Buffer.concat([Buffer.from([0xff, 0xff, 0xff, 0x00]), buf.slice(0, 16777215)]));
pis.onData(Buffer.concat([Buffer.from([0x00, 0x00, 0x40, 0x01]), buf.slice(16777215)]));
assert.ok(beenDispatch);
@ -124,7 +124,7 @@ describe("test PacketInputStream data", () => {
}
};
let pis = new PacketInputStream(conn);
let pis = new PacketInputStream(conn._dispatchPacket.bind(conn));
pis.onData(Buffer.concat([Buffer.from([0xff, 0xff, 0xff, 0x00]), buf.slice(0, 1000000)]));
pis.onData(buf.slice(1000000, 2000000));
pis.onData(buf.slice(2000000, 16777215));