#!/usr/bin/env node
/*
* BSD 3-Clause License
*
* Copyright (c) 2015, Nicolas Riesco and others as credited in the AUTHORS file
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
module.exports = Kernel;
var console = require("console");
var fs = require("fs");
var path = require("path");
var uuid = require("uuid");
var Session = require("nel").Session; // Javascript session
var Socket = require("jmp").Socket; // IPython/Jupyter protocol socket
var zmq = require("jmp").zmq; // ZMQ bindings
// Setup logging helpers
var log;
var dontLog = function dontLog() {};
var doLog = function doLog() {
process.stderr.write("KERNEL: ");
console.error.apply(this, arguments);
};
if (process.env.DEBUG) {
global.DEBUG = true;
try {
doLog = require("debug")("KERNEL:");
} catch (err) {}
}
log = global.DEBUG ? doLog : dontLog;
/**
* Kernel configuration.
*
* @typedef Config
*
* @property {object} connection Frontend connection file
*
* @property {string} cwd Session current working directory
*
* @property {boolean} debug Enable debug mode
*
* @property {boolean} hideExecutionResult
* Do not show execution results
*
* @property {boolean} hideUndefined Do not show undefined results
*
* @property {object} kernelInfoReply Content of kernel_info_reply message
*
* @property {string} protocolVersion Message protocol version
*
* @property {StartupCB}
* startupCallback Callback invoked at session startup.
* This callback can be used to setup a
* session; e.g. to register a require
* extensions.
*
* @property {string} startupScript Path to a script to be run at startup.
* Path to a folder also accepted, in which
* case all the scripts in the folder will
* be run.
*
* @property {?module:nel~Transpiler}
* transpile If defined, this function transpiles the
* request code into Javascript that can be
* run by the Node.js session.
*/
/**
* Callback run at session startup (this callback can be used to setup the
* kernel session; e.g. to register a require extensions).
*
* @callback StartupCB
*
* @this Kernel
*/
/**
* @class
* @classdesc Implements a Javascript kernel for IPython/Jupyter.
* @param {Config} config Kernel configuration
*/
function Kernel(config) {
if (config.debug) {
log = doLog;
global.DEBUG = true;
}
/**
* Configuration provided by IPython
* @member {Object}
*/
this.connection = config.connection;
var scheme = this.connection.signature_scheme.slice("hmac-".length);
var key = this.connection.key;
// ZMQ identity
var identity = uuid.v4();
/**
* HeartBeat socket
* @member {module:zmq~Socket}
*/
this.hbSocket = zmq.createSocket("rep");
this.hbSocket.identity = identity;
/**
* IOPub socket
* @member {module:jmp~Socket}
*/
this.iopubSocket = new Socket("pub", scheme, key);
this.iopubSocket.identity = identity;
/**
* Stdin socket
* @member {module:jmp~Socket}
*/
this.stdinSocket = new Socket("router", scheme, key);
this.stdinSocket.identity = identity;
/**
* Shell socket
* @member {module:jmp~Socket}
*/
this.shellSocket = new Socket("router", scheme, key);
this.shellSocket.identity = identity;
/**
* Control socket
* @member {module:jmp~Socket}
*/
this.controlSocket = new Socket("router", scheme, key);
this.controlSocket.identity = identity;
/**
* Flag to hide execution results
* @member {Boolean}
*/
this.hideExecutionResult = config.hideExecutionResult;
/**
* Flag to hide undefined results
* @member {Boolean}
*/
this.hideUndefined = config.hideUndefined;
/**
* Content of kernel_info_reply message
* @member {object}
*/
this.kernelInfoReply = config.kernelInfoReply;
/**
* Javascript session
* @member {module:nel~Session}
*/
this.session = new Session({
cwd: config.cwd,
transpile: config.transpile,
});
/**
* onReply callbacks indexed by input_request.header.msg_id
* @member {object}
*/
this.onReplies = {};
/**
* lastActiveOnReply is the last unused onReply callback
* (workaround for frontends that don't set input_reply.parent_header)
* @member {?function}
*/
this.lastActiveOnReply = null;
/**
* Callback run at session startup (this callback can be used to setup the
* kernel session; e.g. to register a require extensions).
* @member {StartupCB}
*/
this.startupCallback = config.startupCallback;
/**
* Path to a Javascript file to be run on session startup. Path to a folder
* also accepted, in which case all the Javascript files in the folder will
* be run.
* @member {String}
*/
this.startupScript = config.startupScript;
/**
* Number of visible execution requests
* @member {Number}
*/
this.executionCount = 0;
/**
* IPython/Jupyter protocol version
* @member {String}
*/
this.protocolVersion = config.protocolVersion;
var majorVersion = parseInt(this.protocolVersion.split(".")[0]);
/**
* Collection of message handlers that links a message type with the method
* handling the response
* @member {Object.<String, Function>}
* @see {@link module:handler_v4}
* @see {@link module:handler_v5}
*/
this.handlers = (majorVersion <= 4) ?
require("./handlers_v4.js") :
require("./handlers_v5.js");
this._bindSockets();
this._initSession();
}
/**
* Bind kernel sockets and hook listeners
*
* @private
*/
Kernel.prototype._bindSockets = function() {
var address = "tcp://" + this.connection.ip + ":";
this.hbSocket.on("message", onHBMessage.bind(this));
this.shellSocket.on("message", onShellMessage.bind(this));
this.controlSocket.on("message", onControlMessage.bind(this));
this.stdinSocket.on("message", onStdinMessage.bind(this));
this.hbSocket.bindSync(address + this.connection.hb_port);
this.shellSocket.bindSync(address + this.connection.shell_port);
this.controlSocket.bindSync(address + this.connection.control_port);
this.stdinSocket.bindSync(address + this.connection.stdin_port);
this.iopubSocket.bindSync(address + this.connection.iopub_port);
function onHBMessage(message) {
this.hbSocket.send(message);
}
function onShellMessage(msg) {
var msg_type = msg.header.msg_type;
if (this.handlers.hasOwnProperty(msg_type)) {
try {
this.handlers[msg_type].call(this, msg);
} catch (e) {
log("Exception in %s handler: %s", msg_type, e.stack);
}
} else {
// Ignore unimplemented msg_type requests
log("SHELL: Unhandled message type:", msg_type);
}
}
function onControlMessage(msg) {
var msg_type = msg.header.msg_type;
if (msg_type === "shutdown_request") {
this.handlers.shutdown_request.call(this, msg);
} else {
// Ignore unimplemented msg_type requests
log("CONTROL: Unhandled message type:", msg_type);
}
}
function onStdinMessage(msg) {
log("STDIN: RESPONSE:", msg);
var msg_type = msg.header.msg_type;
if (msg_type === "input_reply") {
var onReply;
var msg_id = msg.parent_header.msg_id;
if (msg_id) {
onReply = this.onReplies[msg_id];
} else {
log("STDIN: Frontend did not set parent_header.msg_id");
onReply = this.lastActiveOnReply;
}
if (!onReply) {
log(
"STDIN:",
"Dropping input_reply because of missing onReply callback",
this.onReplies
);
return;
}
if (this.lastActiveOnReply === onReply) {
this.lastActiveOnReply = null;
}
onReply({input: msg.content.value});
return;
} else {
// Ignore unimplemented msg_type requests
log("STDIN: Unhandled message type:", msg_type);
}
}
};
/**
* Initialise session
*
* @private
*/
Kernel.prototype._initSession = function() {
if (this.startupCallback) {
this.startupCallback();
}
this._runStartupScripts();
};
/**
* Run startup scripts
*
* @private
*/
Kernel.prototype._runStartupScripts = function() {
var startupScripts;
if (this.startupScript) {
var stats = fs.lstatSync(this.startupScript);
if (stats.isDirectory()) {
var dir = this.startupScript;
startupScripts = fs.readdirSync(dir).filter(function(filename) {
var ext = filename.slice(filename.length - 3).toLowerCase();
return ext === ".js";
}).sort().map(function(filename) {
return path.join(dir, filename);
});
} else if (stats.isFile()) {
startupScripts = [this.startupScript];
} else {
startupScripts = [];
}
} else {
startupScripts = [];
}
log("startupScript: " + startupScripts);
startupScripts.forEach((function(script) {
var code;
try {
code = fs.readFileSync(script).toString();
} catch (e) {
log("startupScript: Cannot read '" + script + "'");
return;
}
this.session.execute(code, {
onSuccess: function onSuccess() {
log("startupScript: '" + script + "' run successfuly");
},
onError: function onError() {
log("startupScript: '" + script + "' failed to run");
}
});
}).bind(this));
};
/**
* Destroy kernel
*
* @param {DestroyCB} [destroyCB] Callback run after the session server has been
* killed and before closing the sockets
*/
Kernel.prototype.destroy = function(destroyCB) {
log("Destroying kernel");
// TODO(NR) Handle socket `this.stdin` once it is implemented
this.controlSocket.removeAllListeners();
this.shellSocket.removeAllListeners();
this.iopubSocket.removeAllListeners();
this.hbSocket.removeAllListeners();
this.session.kill("SIGTERM", function(code, signal) {
if (destroyCB) {
destroyCB(code, signal);
}
this.controlSocket.close();
this.shellSocket.close();
this.iopubSocket.close();
this.hbSocket.close();
}.bind(this));
};
/**
* @callback DestroyCB
* @param {?Number} code Exit code from session server if exited normally
* @param {?String} signal Signal passed to kill the session server
* @description Callback run after the session server has been killed and before
* the sockets have been closed
* @see {@link Kernel.destroy}
*/
/**
* Restart kernel
*
* @param {RestartCB} [restartCB] Callback run after the session server has been
* restarted
*/
Kernel.prototype.restart = function(restartCB) {
log("Restarting kernel");
this.session.restart("SIGTERM", (function() {
this._initSession();
if (restartCB) {
restartCB();
}
}).bind(this));
};
/**
* @callback RestartCB
* @param {?Number} code Exit code from session server if exited normally
* @param {?String} signal Signal passed to kill the session server
* @description Callback run after the session server has been restarted
* @see {@link Kernel.restart}
*/