Source: lib/kernel.spec.js

/*
 * BSD 3-Clause License
 *
 * Copyright (c) 2017, Nicolas Riesco and others as credited in the AUTHORS file
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 * this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 * this list of conditions and the following disclaimer in the documentation
 * and/or other materials provided with the distribution.
 *
 * 3. Neither the name of the copyright holder nor the names of its contributors
 * may be used to endorse or promote products derived from this software without
 * specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 */

var assert = require("assert");
var console = require("console");
var crypto = require("crypto");
var fs = require("fs");
var os = require("os");
var path = require("path");
var spawn = require("child_process").spawn;
var uuid = require("uuid");

var jmp = require("jmp");
var zmq = jmp.zmq;

// Setup logging helpers
var LABEL = "JP-COFFEE-KERNEL: TEST";
var log;
var dontLog = function dontLog() {};
var doLog = function doLog() {
    process.stderr.write(LABEL);
    console.error.apply(this, arguments);
};

if (process.env.DEBUG) {
    global.DEBUG = true;

    try {
        doLog = require("debug")(LABEL);
    } catch (err) {}
}

log = global.DEBUG ? doLog : dontLog;

/**
 * @typedef     {Object} Message
 * @description IPython/Jupyter message
 *
 * @property    {Array}   [message.idents]        Identities
 * @property    {Object}  [message.header]        Header
 * @property    {Object}  [message.parent_header] Parent header
 * @property    {Object}  [message.metadata]      Message metadata
 * @property    {Object}  [message.content]       Message content
 */

/**
 * Callback to handle kernel messages
 *
 * @callback  onMessageCallback
 * @param    {string}  socketName Socket name
 * @param    {Message} message    IPython/Jupyter message
 */

/* eslint-disable max-len */
/**
 * @class     Context
 * @classdesc Context shared by tests
 *            (adapted from MessagingTestEngine in IJavascript)
 *            @see {@link https://github.com/n-riesco/ijavascript/tree/9c2cf6a94575b5016b4a6f3cbe380ab1fcfbcf76}
 * @param {string} protocolVersion Messaging protocol version
 */
function Context(protocolVersion) {
/* eslint-disable max-len */

    /**
     * @member          version
     * @member {String} version.protocol Messaging protocol version
     */
    this.version = {
        protocol: protocolVersion || "5.1",
    };

    /**
     * @member          path
     * @member {String} path.node           Path to node
     * @member {String} path.root           Path to package folder
     * @member {String} path.kernel         Path to kernel
     * @member {String} path.connectionFile Path to kernel connection file
     */
    this.path = {};

    /**
     * @member           connection
     * @member {String}  connection.transport    Transport protocol (e.g. "tcp")
     * @member {String}  connection.ip           IP address (e.g. "127.0.0.1")
     * @member {String}  connection.signature_scheme
     *                                           Signature scheme
     *                                           (e.g. "hmac-sha256")
     * @member {String}  connection.key          Hashing key (e.g. uuid.v4())
     * @member {Integer} connection.hb_port      HeartBeat port
     * @member {Integer} connection.shell_port   Shell port
     * @member {Integer} connection.stdin_port   Stdin port
     * @member {Integer} connection.iopub_port   IOPub port
     * @member {Integer} connection.control_port Control port
     */
    this.connection = {};

    /**
     * @member                     socket
     * @member {module:jmp~Socket} socket.control Control socket
     * @member {module:jmp~Socket} socket.hb      HearBeat socket
     * @member {module:jmp~Socket} socket.iopub   IOPub socket
     * @member {module:jmp~Socket} socket.shell   Shell socket
     * @member {module:jmp~Socket} socket.stdin   Stdin socket
     */
    this.socket = {};

    /**
     * @member      {module:child_process~ChildProcess} kernel
     * @description Kernel instance
     */
    this.kernel = null;

    /**
     * @member      {onMessageCallback} [onMessage]
     * @description Callback to handle kernel messages
     */
    this.onMessage = null;
}

/**
 * @method      init
 * @param       {function} done
 * @description Initialise the messaging test engine
 */
Context.prototype.init = function(done) {
    this._setPaths();
    this._initSockets();
    this._createConnectionFile();
    this._initKernel(done);
};

/**
 * @method      dispose
 * @description Dispose the messaging test engine
 */
Context.prototype.dispose = function() {
    this._disposeKernel();
    this._removeConnectionFile();
    this._disposeSockets();
};

/**
 * @method      _setPaths
 * @description Set up paths
 * @private
 */
Context.prototype._setPaths = function() {
    log("Setting paths");

    this.path.node = process.argv[0];
    this.path.root = path.dirname(__dirname);
    this.path.kernel = path.join(this.path.root, "lib", "kernel.js");

    this.path.connectionFile = path.join(
        os.tmpdir(),
        "conn-" + uuid.v4() + ".json"
    );
};

/**
 * @method      _createConnectionFile
 * @description Create connection file
 * @private
 */
Context.prototype._createConnectionFile = function() {
    log("Creating connection file");

    fs.writeFileSync(
        this.path.connectionFile,
        JSON.stringify(this.connection)
    );
};

/**
 * @method      _removeConnectionFile
 * @description Remove connection file
 * @private
 */
Context.prototype._removeConnectionFile = function() {
    log("Removing connection file");

    try {
        fs.unlinkSync(this.path.connectionFile);
    } catch (e) {
        console.error(e.message);
    }
};

/**
 * @method      _initSockets
 * @description Setup ZMQ sockets and message listeners
 * @private
 */
Context.prototype._initSockets = function() {
    log("Setting up ZMQ sockets");

    var transport = "tcp";
    var ip = "127.0.0.1";
    var address = transport + "://" + ip + ":";
    var scheme = "sha256";
    var key = crypto.randomBytes(256).toString("base64");

    this.connection = {
        transport: transport,
        ip: ip,
        signature_scheme: "hmac-" + scheme,
        key: key,
    };

    var socketNames = ["hb", "shell", "stdin", "iopub", "control"];
    var socketTypes = ["req", "dealer", "dealer", "sub", "dealer"];
    for (var i = 0, attempts = 0; i < socketNames.length; attempts++) {
        var socketName = socketNames[i];
        var socketType = socketTypes[i];
        var socket = (socketName === "hb") ?
            new zmq.Socket(socketType) :
            new jmp.Socket(socketType, scheme, key);
        var port = Math.floor(1024 + Math.random() * (65536 - 1024));

        try {
            socket.connect(address + port);
            this.connection[socketName + "_port"] = port;
            this.socket[socketName] = socket;
            i++;
        } catch (e) {
            log(e.stack);
        }

        if (attempts >= 100) {
            throw new Error("can't bind to any local ports");
        }
    }

    this.socket.iopub.subscribe("");

    log("Setting up message listeners");

    Object.getOwnPropertyNames(this.socket).forEach((function(socketName) {
        this.socket[socketName]
            .on("message", this._onMessage.bind(this, socketName));
    }).bind(this));
};

/**
 * @method      _disposeSockets
 * @description Dispose ZMQ sockets
 * @private
 */
Context.prototype._disposeSockets = function() {
    log("Disposing ZMQ sockets");

    this.socket.control.close();
    this.socket.hb.close();
    this.socket.iopub.close();
    this.socket.shell.close();
    this.socket.stdin.close();
};

/**
 * @method      _initKernel
 * @description Setup IJavascript kernel
 * @param       {function} done
 * @private
 */
Context.prototype._initKernel = function(done) {
    log("Initialising a kernel");

    var cmd = this.path.node;
    var args = [
        this.path.kernel,
        "--protocol=" + this.version.protocol,
        this.path.connectionFile,
    ];
    if (global.DEBUG) args.push("--debug");
    var config = {
        stdio: "inherit"
    };
    log("spawn", cmd, args, config);
    this.kernel = spawn(cmd, args, config);

    if (done) {
        this._waitUntilConnect(function() {
            this._waitUntilKernelIsIdle(done);
        }.bind(this));
    }
};

/**
 * @method      _disposeKernel
 * @description Dispose IJavascript kernel
 * @private
 */
Context.prototype._disposeKernel = function() {
    log("Disposing IJavascript kernel");

    if (this.kernel) {
        this.kernel.kill("SIGTERM");
    }
};

/**
 * @method      _waitUntilConnect
 * @description Wait for ZMQ sockets to connect
 * @param       {function} done
 * @private
 */
Context.prototype._waitUntilConnect = function(done) {
    log("Waiting for ZMQ sockets to connect");

    var socketNames = ["hb", "shell", "iopub", "control"];

    var waitGroup = socketNames.length;
    function onConnect() {
        waitGroup--;
        if (waitGroup === 0) {
            for (var i = 0; i < socketNames.length; i++) {
                this.socket[socketNames[i]].unmonitor();
            }
            if (done) done();
        }
    }

    for (var j = 0; j < socketNames.length; j++) {
        this.socket[socketNames[j]].on("connect", onConnect.bind(this));
        this.socket[socketNames[j]].monitor();
    }
};

/**
 * @method      _waitUntilKernelIsIdle
 * @description Wait until kernel is idle
 * @param       {function} done
 * @private
 */
Context.prototype._waitUntilKernelIsIdle = function(done) {
    log("Waiting until kernel is idle");

    var onMessage = this.onMessage;

    var request = this.run("", function(socketName, response) {
        if (response.parent_header.msg_id !== request.header.msg_id) {
            return;
        }

        if (socketName === "iopub") {
            if (response.header.msg_type === "status") {
                if (response.content.execution_state === "idle") {
                    this.onMessage = onMessage;
                    if (done) done();
                }
            }
        }
    });
};

/**
 * @method      _onMessage
 * @description Handle kernel message
 * @param       {string}  socketName Socket name
 * @param       {Message} message    IPython/Jupyter message
 * @private
 */
Context.prototype._onMessage = function(socketName, message) {
    log("Received on " + socketName, message);

    if (this.onMessage) this.onMessage(socketName, message);
};

/**
 * @method      run
 * @description Send kernel an execution request
 *
 * @param       {string}            code        Code to be run by kernel
 * @param       {onMessageCallback} [onMessage] Kernel message handler
 *
 * @returns     {Message}           message     IPython/Jupyter message
 */
Context.prototype.run = function(code, onMessage) {
    log("Running:", code);

    var message = {
        "header": {
            "msg_type": "execute_request"
        },
        "content": {
            "code": code
        }
    };

    if (!(message instanceof jmp.Message)) {
        message = new jmp.Message(message);
    }

    if (!message.header.msg_id) {
        message.header.msg_id = uuid.v4();
    }

    if (!message.header.username) {
        message.header.username = "user";
    }

    if (!message.header.session) {
        message.header.session = uuid.v4();
    }

    if (!message.header.version) {
        message.header.version = this.version.protocol;
    }

    this.onMessage = onMessage;
    this.socket.shell.send(message);

    return message;
};

describe("jp-coffee-kernel", function() {
    var ctx = new Context();

    before(function(done) {
        this.timeout(5000);
        ctx.init(done);
    });

    after(function() {
        ctx.dispose();
    });

    it("can run: console.log 'Hello, World!'", function(done) {
        test("console.log 'Hello, World!'", "Hello, World!\n", done);
    });

    it("uses coffeescript@2", function(done) {
        var code = [
            "outer = ->",
            "  inner = => Array.prototype.slice.call arguments",
            "  inner()",
            "",
            "console.log outer(1, 2)",
        ].join("\n");

        var stdout = "[ 1, 2 ]\n";

        test(code, stdout, done);
    });

    function test(code, stdout, done) {
        var receivedExecuteReply = false;
        var receivedStdout = false;

        var requestMessage = ctx.run(code, function(socketName, message) {
            if (message.parent_header.msg_id !== requestMessage.header.msg_id) {
                return;
            }

            var msg_type = message && message.header && message.header.msg_type;

            if (socketName === "shell") {
                if (msg_type === "execute_reply") {
                    receivedExecuteReply = true;
                }
            } else if (socketName === "iopub") {
                if (msg_type === "stream") {
                    var name = message.content.name;
                    if (name === "stdout") {
                        var text = (message.content.hasOwnProperty("text")) ?
                            message.content.text :
                            message.content.data;
                        assert.equal(text, stdout, "Unexpected stdout");
                        receivedStdout = true;
                    }
                }
            }

            if (receivedExecuteReply && receivedStdout) {
                ctx.onMessage = null;
                done();
            }
        });
    }
});