Phalcon + Websockets

Hey everybody,

Any live example/tutorial for using websockets + phalcon? Rachet/Phalcon? Anything would be useful! :)

Thanks a lot!



43.8k
edited Oct '14

Here is a a snippet from my upcoming Webird foundation. It is a very heavily modified Vokuro setup. I'll be open sourcing it when every critical structural issue is addressed.

Note: Ratchet doesn't currently support WAMPv2 protocol and the JS libraries that I need to use only support WAMPv2. I think that WAMPv2 offers a far better programming model than raw web sockets and so I am putting this part off until the Thruway project (based on some Ratchet tech) offers WAMPv2 as they have pledged to do.

The Phalcon CLI Task

<?php
namespace Project\Cli\Tasks;

use ZMQ,
    PDO,
    React\ZMQ\Context as ZMQContext,
    React\EventLoop\Factory as EventLoopFactory,
    Ratchet\Server\IoServer,
    Ratchet\Http\HttpServer,
    Ratchet\WebSocket\WsServer,
    Ratchet\Session\SessionProvider,
    Symfony\Component\HttpFoundation\Session\Storage\Handler,
    Webird\Cli\TaskBase,
    Webird\Cli\Chat;

/**
 * Task for websocket
 *
 */
class ServiceTask extends TaskBase
{
    public function mainAction(array $params)
    {
        echo "The default action inside of the ", CURRENT_TASK, " task is not configured\n";
    }



    public function websocketListenAction(array $params)
    {
        // $this->ensureRunningAsWebUser();
        $opts = $params['opts'];
        $config = $this->config;

        $wsPort = (isset($opts['wsport'])) ? $opts['wsport'] : $config->app->wsPort;
        $zmqPort = (isset($opts['zmqport'])) ? $opts['zmqport'] : $config->app->zmqPort;


        $loop = EventLoopFactory::create();
        $chat = new Chat();
        $chat->setDI($this->getDI());


        // Listen for the web server to make a ZeroMQ push after an ajax request
        // $context = new ZMQContext($loop);
        // $pull = $context->getSocket(ZMQ::SOCKET_PULL);
        // $pull->bind("tcp://127.0.0.1:${zmqPort}"); // Binding to 127.0.0.1 means the only client that can connect is itself
        // $pull->on('message', [$chat, 'onUserJoin']);

        $wsServer = new WsServer($chat);


        $ioServer = IoServer::factory(
            new HttpServer($wsServer),
            $wsPort
        );

        echo "websocket listening on port $wsPort in " . ENVIRONMENT . " mode\n";

        $ioServer->run();
    }


}

The Chat application for Ratchet

<?php
namespace Webird\Cli;

use Phalcon\DI\Injectable as DIInjectable,
    Ratchet\MessageComponentInterface,
    Ratchet\ConnectionInterface,
    Webird\DatabaseSessionReader;


/**
 * Basic chat logic for a Ratchet application
 */
class Chat extends DIInjectable implements MessageComponentInterface
{

    protected $clients;

    /**
     * Class constructor
     */
    public function __construct()
    {
        $this->clients = new \SplObjectStorage();
    }

    /**
     * Connection open function
     *
     * @param \Ratchet\ConnectionInterface  $conn
     */
    public function onOpen(ConnectionInterface $conn)
    {
try {
        echo "New connection! ({$conn->resourceId})\n";

        $cookies = $conn->WebSocket->request->getCookies();
        if (! array_key_exists('PHPSESSID', $cookies)) {
            echo "Connection Rejected: Session Cookie was not present.\n";
            return $conn->close();
        }
        $sessionId = $cookies['PHPSESSID'];

        $sessionReader = $this->getDI()->getSessionReader();
        if ($sessionReader->read($sessionId) === false) {
            echo "Connection Rejected: Session could not be found.\n";
            return $conn->close();
        }
        if (($identity = $sessionReader->get('auth-identity')) === false) {
            echo "Connection Rejected: session auth-identity data is not present.\n";
            return $conn->close();
        }
        if (!isset($identity['role'])) {
            echo "Connection Rejected: session user role data is not present.\n";
            return $conn->close();
        }

        $role = $identity['role'];
        $acl = $this->getDI()->getAcl();

        if (!$this->acl->isAllowed($role, 'websocket', 'open')) {
            echo "Connection Rejected: user does not have permission to open a websocket.\n";
            return $conn->close();
        }



} catch (\Exception $e) {
    echo $e->getMessage() . "\n";
}


        // var_export($sessionReader->get('id'));
        // var_export($sess);

        // Store the new connection to send messages to later
        $this->clients->attach($conn, $sessionId);
    }

    /**
     * Receives a message when registered in the websocket server
     *
     * @param \Ratchet\ConnectionInterface  $from
     * @param string                        $msg
    */
    public function onMessage(ConnectionInterface $from, $msg) {
        $numRecv = $this->clients->count() - 1;

        // echo $from->Session->getName() . "\n";

        echo sprintf('Connection %d sending message "%s" to %d other connection%s' . "\n"
            , $from->resourceId, $msg, $numRecv, $numRecv == 1 ? '' : 's');

        foreach ($this->clients as $client) {
            if ($from !== $client) {
                // The sender is not the receiver, send to each client connected
                $client->send($msg);
            }
        }
    }

    /**
     * Handle closing of a connection
     *
     * @param \Ratchet\ConnectionInterface  $conn
    */
    public function onClose(ConnectionInterface $conn) {
        // The connection is closed, remove it, as we can no longer send it messages
        $this->clients->detach($conn);

        echo "Connection {$conn->resourceId} has disconnected\n";
    }

    /**
     * Handles exceptions in the application
     *
     * @param \Ratchet\ConnectionInterface  $from
     * @param \Exception                    $e
    */
    public function onError(ConnectionInterface $conn, \Exception $e) {
        echo "An error has occurred: {$e->getMessage()}\n";

        $conn->close();
    }

}

The read-only Database session reader.

<?php
namespace Webird;

use Phalcon\Db;


/**
 * Read-only session access
 *
 */
class DatabaseSessionReader
{
    private $options;

    private $data;

    /**
     * [email protected]}
     *
     * @param  array                      $options
     * @throws \Phalcon\Session\Exception
     */
    public function __construct($options = null)
    {
        if (!isset($options['db'])) {
            throw new \Exception("The parameter 'db' is required");
        }
        if (!isset($options['unique_id'])) {
            throw new \Exception("The parameter unique_id is required");
        }
        if (!isset($options['db_table'])) {
            throw new \Exception("The parameter 'db_table' is required");
        }
        if (!isset($options['db_id_col'])) {
            throw new \Exception("The parameter 'db_id_col' is required");
        }
        if (!isset($options['db_data_col'])) {
            throw new \Exception("The parameter 'db_data_col' is required");
        }
        if (!isset($options['db_time_col'])) {
            throw new \Exception("The parameter 'db_time_col' is required");
        }

        $this->options = $options;
        $this->data = false;
    }


    protected function getOptions()
    {
        return $this->options;
    }

    /**
     * [email protected]}
     * @param  string $sessionId
     * @return string
     */
    public function read($sessionId)
    {
        $options = $this->getOptions();
        $row = $options['db']->fetchOne(
            sprintf(
                'SELECT %s FROM %s WHERE %s = ?',
                $options['db']->escapeIdentifier($options['db_data_col']),
                $options['db']->escapeIdentifier($options['db_table']),
                $options['db']->escapeIdentifier($options['db_id_col'])
            ),
            Db::FETCH_NUM,
            [$sessionId]
        );

        $this->data = (empty($row[0])) ? false : $this->unserialize_php($row[0]);

        return ($this->data !== false);
    }



    public function has($key)
    {
        if (!is_string($key)) {
            throw new \Exception('The key must be a string');
        }

        if ($this->data == false) {
            return false;
        }

        $uniqueId = $this->getOptions()['unique_id'];
        return (array_key_exists("{$uniqueId}{$key}", $this->data));
    }


    public function get($key)
    {
        if (!$this->has($key)) {
            return false;
        }

        $uniqueId = $this->getOptions()['unique_id'];
        return $this->data["{$uniqueId}{$key}"];
    }



    private function unserialize_php($session_data)
    {
        $return_data = array();
        $offset = 0;
        while ($offset < strlen($session_data)) {
            if (!strstr(substr($session_data, $offset), "|")) {
                throw new \Exception("invalid data, remaining: " . substr($session_data, $offset));
            }
            $pos = strpos($session_data, "|", $offset);
            $num = $pos - $offset;
            $varname = substr($session_data, $offset, $num);
            $offset += $num + 1;
            $data = unserialize(substr($session_data, $offset));
            $return_data[$varname] = $data;
            $offset += strlen(serialize($data));
        }
        return $return_data;
    }

}

Here is the DI to setup the session reader.

$di->set('sessionReader', function() use ($di) {
    $config = $di->get('config');
    $connection = $di->get('db');

    $sessionReader = new DatabaseSessionReader([
        'db'          => $connection,
        'unique_id'    => $config->session->unique_id,
        'db_table'    => $config->session->db_table,
        'db_id_col'   => $config->session->db_id_col,
        'db_data_col' => $config->session->db_data_col,
        'db_time_col' => $config->session->db_time_col,
        'uniqueId'    => $config->session->unique_id
    ]);

    return $sessionReader;
});


43.8k

Addtionally, if you are lucky enough to be able to use an older deprecated JS library that supports WAMPv1 (such as Autobahn v0.8) than a WAMPv1 task would look something like the following:

Note: This particular code is old and hasn't been run in a long time and may not even be entirely functional.

    public function listenAction() {
        $wsPort  = $this->config->app->wsPort;
        $zmqPort = $this->config->app->zmqPort;

        $loop   = EventLoopFactory::create();
        $pusher = new Pusher();

        // Listen for the web server to make a ZeroMQ push after an ajax request
        $context = new ZMQContext($loop);
        $pull = $context->getSocket(ZMQ::SOCKET_PULL);
        $pull->bind("tcp://127.0.0.1:${zmqPort}"); // Binding to 127.0.0.1 means the only client that can connect is itself
        $pull->on('message', [$pusher, 'onBlogEntry']);

        // Set up our WebSocket server for clients wanting real-time updates
        $webSock = new SocketServer($loop);
        $webSock->listen($wsPort, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
        $webServer = new IoServer(
            new HttpServer(
                new WsServer(
                    new WampServer(
                        $pusher
                    )
                )
            ),
            $webSock
        );

        $loop->run();
    }


4.4k

I put that part in the backburner for now.. it's too much hassle, unfortunately :(