/*
* BSD 3-Clause License
*
* Copyright (c) 2015, Nicolas Riesco and others as credited in the AUTHORS file
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
var crypto = require("crypto");
var uuid = require("uuid/v4");
var zmq = require("zeromq/v5-compat");
var DEBUG = global.DEBUG || false;
var log;
if (DEBUG) {
var console = require("console");
log = function log() {
process.stderr.write("JMP: ");
console.error.apply(this, arguments);
};
} else {
try {
log = require("debug")("JMP:");
} catch (err) {
log = function noop() {};
}
}
var DELIMITER = "<IDS|MSG>";
/**
* Jupyter message
* @class
* @param [properties] Message properties
* @param {Array} [properties.idents] ZMQ identities
* @param {Object} [properties.header]
* @param {Object} [properties.parent_header]
* @param {Object} [properties.metadata]
* @param {Object} [properties.content]
* @param {Array} [properties.buffers] Unparsed message frames
*/
function Message(properties) {
/**
* ZMQ identities
* @member {Array}
*/
this.idents = properties && properties.idents || [];
/**
* @member {Object}
*/
this.header = properties && properties.header || {};
/**
* @member {Object}
*/
this.parent_header = properties && properties.parent_header || {};
/**
* @member {Object}
*/
this.metadata = properties && properties.metadata || {};
/**
* @member {Object}
*/
this.content = properties && properties.content || {};
/**
* Unparsed JMP message frames (any frames after content)
* @member {Array}
*/
this.buffers = properties && properties.buffers || [];
}
/**
* Send a response over a given socket
*
* @param {module:zmq~Socket} socket Socket over which the response is sent
* @param {String} messageType Jupyter response message type
* @param {Object} [content] Jupyter response content
* @param {Object} [metadata] Jupyter response metadata
* @param {String} [protocolVersion] Jupyter protocol version
* @returns {module:jmp~Message} The response message sent over the given socket
*/
Message.prototype.respond = function(
socket, messageType, content, metadata, protocolVersion
) {
var response = new Message();
response.idents = this.idents;
response.header = {
msg_id: uuid(),
username: this.header.username,
session: this.header.session,
msg_type: messageType,
};
if (this.header && this.header.version) {
response.header.version = this.header.version;
}
if (protocolVersion) {
response.header.version = protocolVersion;
}
response.parent_header = this.header;
response.content = content || {};
response.metadata = metadata || {};
socket.send(response);
return response;
};
/**
* Decode message received over a ZMQ socket
*
* @param {argsArray} messageFrames argsArray of a message listener on a JMP
* socket
* @param {String} [scheme=sha256] Hashing scheme
* @param {String} [key=""] Hashing key
* @returns {?module:jmp~Message} JMP message or `null` if failed to decode
* @protected
*/
Message._decode = function(messageFrames, scheme, key) {
// Workaround for Buffer.toString failure caused by exceeding the maximum
// supported length in V8.
//
// See issue #4266 https://github.com/nodejs/node/issues/4266
// and PR #4394 https://github.com/nodejs/node/pull/4394
try {
return _decode(messageFrames, scheme, key);
} catch (err) {
if (err.message.indexOf("toString") === -1) throw err;
}
return null;
};
function _decode(messageFrames, scheme, key) {
scheme = scheme || "sha256";
key = key || "";
var i = 0;
var idents = [];
for (i = 0; i < messageFrames.length; i++) {
var frame = messageFrames[i];
if (frame.toString() === DELIMITER) {
break;
}
idents.push(frame);
}
if (messageFrames.length - i < 5) {
log("MESSAGE: DECODE: Not enough message frames", messageFrames);
return null;
}
if (messageFrames[i].toString() !== DELIMITER) {
log("MESSAGE: DECODE: Missing delimiter", messageFrames);
return null;
}
if (key) {
var obtainedSignature = messageFrames[i + 1].toString();
var hmac = crypto.createHmac(scheme, key);
hmac.update(messageFrames[i + 2]);
hmac.update(messageFrames[i + 3]);
hmac.update(messageFrames[i + 4]);
hmac.update(messageFrames[i + 5]);
var expectedSignature = hmac.digest("hex");
if (expectedSignature !== obtainedSignature) {
log(
"MESSAGE: DECODE: Incorrect message signature:",
"Obtained = " + obtainedSignature,
"Expected = " + expectedSignature
);
return null;
}
}
var message = new Message({
idents: idents,
header: toJSON(messageFrames[i + 2]),
parent_header: toJSON(messageFrames[i + 3]),
content: toJSON(messageFrames[i + 5]),
metadata: toJSON(messageFrames[i + 4]),
buffers: Array.prototype.slice.apply(messageFrames, [i + 6]),
});
return message;
function toJSON(value) {
return JSON.parse(value.toString());
}
}
/**
* Encode message for transfer over a ZMQ socket
*
* @param {String} [scheme=sha256] Hashing scheme
* @param {String} [key=""] Hashing key
* @returns {Array} Encoded message
* @protected
*/
Message.prototype._encode = function(scheme, key) {
scheme = scheme || "sha256";
key = key || "";
var idents = this.idents;
var header = JSON.stringify(this.header);
var parent_header = JSON.stringify(this.parent_header);
var metadata = JSON.stringify(this.metadata);
var content = JSON.stringify(this.content);
var signature = "";
if (key) {
var hmac = crypto.createHmac(scheme, key);
var encoding = "utf8";
hmac.update(new Buffer(header, encoding));
hmac.update(new Buffer(parent_header, encoding));
hmac.update(new Buffer(metadata, encoding));
hmac.update(new Buffer(content, encoding));
signature = hmac.digest("hex");
}
var response = idents.concat([ // idents
DELIMITER, // delimiter
signature, // HMAC signature
header, // header
parent_header, // parent header
metadata, // metadata
content, // content
]).concat(this.buffers);
return response;
};
/**
* ZMQ socket that parses the Jupyter Messaging Protocol
*/
class Socket extends zmq.Socket {
/**
* Create a JMP socket.
*
* @param {String|Number} socketType ZMQ socket type
* @param {String} [scheme="sha256"] Hashing scheme
* @param {String} [key=""] Hashing key
*/
constructor(socketType, scheme, key) {
super(socketType);
this._jmp = {
scheme: scheme,
key: key,
_listeners: [],
};
}
/**
* Send the given message.
*
* @param {module:jmp~Message|String|Buffer|Array} message
* @param {Number} flags
* @returns {module:jmp~Socket} `this` to allow chaining
*
*/
send(message, flags) {
var p = Object.getPrototypeOf(Socket.prototype);
if (message instanceof Message) {
log("SOCKET: SEND:", message);
return p.send.call(
this, message._encode(this._jmp.scheme, this._jmp.key), flags
);
}
return p.send.apply(this, arguments);
}
/**
* Add listener to the end of the listeners array for the specified event
*
* @param {String} event
* @param {Function} listener
* @returns {module:jmp~Socket} `this` to allow chaining
*/
on(event, listener) {
var p = Object.getPrototypeOf(Socket.prototype);
if (event !== "message") {
return p.on.apply(this, arguments);
}
var _listener = {
unwrapped: listener,
wrapped: (function() {
var message = Message._decode(
arguments, this._jmp.scheme, this._jmp.key
);
if (message) {
listener(message);
}
}).bind(this),
};
this._jmp._listeners.push(_listener);
return p.on.call(this, event, _listener.wrapped);
}
/**
* Add listener to the end of the listeners array for the specified event
*
* @method module:jmp~Socket#addListener
* @param {String} event
* @param {Function} listener
* @returns {module:jmp~Socket} `this` to allow chaining
*/
addListener(event, listener) {
return this.on(event, listener);
}
/**
* Add a one-time listener to the end of the listeners array for the
* specified event
*
* @param {String} event
* @param {Function} listener
* @returns {module:jmp~Socket} `this` to allow chaining
*/
once(event, listener) {
var p = Object.getPrototypeOf(Socket.prototype);
if (event !== "message") {
return p.once.apply(this, arguments);
}
var _listener = {
unwrapped: listener,
wrapped: (function() {
var message = Message._decode(
arguments, this._jmp.scheme, this._jmp.key
);
if (message) {
try {
listener(message);
} catch (error) {
Socket.prototype.removeListener.call(
this, event, listener
);
throw error;
}
}
Socket.prototype.removeListener.call(this, event, listener);
}).bind(this),
};
this._jmp._listeners.push(_listener);
return p.on.call(this, event, _listener.wrapped);
}
/**
* Remove listener from the listeners array for the specified event
*
* @param {String} event
* @param {Function} listener
* @returns {module:jmp~Socket} `this` to allow chaining
*/
removeListener(event, listener) {
var p = Object.getPrototypeOf(Socket.prototype);
if (event !== "message") {
return p.removeListener.apply(this, arguments);
}
var length = this._jmp._listeners.length;
for (var i = 0; i < length; i++) {
var _listener = this._jmp._listeners[i];
if (_listener.unwrapped === listener) {
this._jmp._listeners.splice(i, 1);
return p.removeListener.call(this, event, _listener.wrapped);
}
}
return p.removeListener.apply(this, arguments);
}
/**
* Remove all listeners, or those for the specified event
*
* @param {String} [event]
* @returns {module:jmp~Socket} `this` to allow chaining
*/
removeAllListeners(event) {
var p = Object.getPrototypeOf(Socket.prototype);
if (arguments.length === 0 || event === "message") {
this._jmp._listeners.length = 0;
}
return p.removeAllListeners.apply(this, arguments);
}
}
/**
* @module jmp
*
* @description Module `jmp` provides functionality for creating, parsing and
* replying to messages of the Jupyter protocol. It also provides functionality
* for networking these messages via {@link module:zmq~Socket ZMQ sockets}.
*
*/
module.exports = {
Message: Message,
Socket: Socket,
/**
* ZeroMQ bindings
*/
zmq: zmq,
};