Beanstalk: Cannot peek on the queue after choosing tube

In my services.php I have:

$di->set('queue', function () use ($config){
   $queue = new \Phalcon\Queue\Beanstalk([
       $config->queue->host,
       $config->queue->port,
       $config->queue->persistent
   ]);

    return $queue;
});

To put a job onto the queue "email" I:

                $this->queue->choose("email");
                $this->queue->put($to_notify);

I have verified the job is in tube 'email' via Chrome Browser App Beanstalkd Dashboard:

The problem I am having is that the script I have freezes when attempting to reserve a job after choosing the email tube from the queue.

$this->queue->choose('email');

while (true) {
    while ($this->queue->peekReady() !== false) {
        $job = $this->queue->reserve();                   //script freezes here!!!
    }
}
edited Sep '16

For getting jobs from queue and tube you should use watch, not choose method



22.8k

When I use watch it is not getting the job from the watched tube.

For getting jobs from queue and tube you should use watch, not choose method



22.8k
edited Sep '16

I found a solution here https://github.com/phalcon/cphalcon/issues/1882#issuecomment-33081184

After using these two lines next to each other in this order:

    $this->queue->choose('email');
    $this->queue->watch('email');

I was able to:

$this->queue->reserve()

I am using Phalcon 3.0.1

I am using phalcon 3.0.1 too and no problem at all:

<?php
/**
 * Created by PhpStorm.
 * User: Wojtek
 * Date: 2016-08-20
 * Time: 13:54
 */
use Phalcon\Cli\Task;
use Phalcon\Logger\Adapter\File;
use Phalcon\Queue\Beanstalk;
/**
 * Class LogTask
 *
 * @package Suzuki\Cli\Task
 */
class LogTask extends Task
{
    /**
     * @var Beanstalk
     */
    protected $queue;
    /**
     * @var File
     */
    protected $loggerBeforeDispatch;
    /**
     * @var File
     */
    protected $loggerBeforeException;
    /**
     * @var File
     */
    protected $loggerBeforeSendRequest;
    /**
     * @var File
     */
    protected $loggerSqlDebug;
    /**
     * @var File
     */
    protected $loggerPerformance;
    /**
     * @var File
     */
    protected $loggerModel;
    /**
     * @var File
     */
    protected $mailer;
    /**
     * Sets services
     */
    public function initialize()
    {
        $this->queue = $this->di->get('queue');
        $this->loggerBeforeDispatch = $this->di->get('loggerBeforeDispatch');
        $this->loggerBeforeException = $this->di->get('loggerBeforeException');
        $this->loggerBeforeSendRequest = $this->di->get('loggerBeforeSendRequest');
        $this->loggerSqlDebug = $this->di->get('loggerSqlDebug');
        $this->loggerPerformance = $this->di->get('loggerPerformance');
        $this->loggerModel = $this->di->get('loggerModel');
        $this->mailer = $this->di->get('mailer');
    }
    /**
     * Handle task
     */
    public function logAction()
    {
        $this->queue->watch('log');
        while ($this->queue->statsTube('log')["current-jobs-ready"] > 0 && ($job = $this->queue->reserve())) {
            $body = $job->getBody();
            if (isset($body['type'])) {
                switch ($body['type']) {
                    case 'beforeException':
                        $this->loggerBeforeException->error($body['message']);
                        break;
                    case 'beforeDispatch':
                        $this->loggerBeforeDispatch->info($body['message']);
                        break;
                    case 'beforeSendRequest':
                        $this->loggerBeforeSendRequest->info($body['message']);
                        break;
                    case 'beforeQuery':
                        $this->loggerSqlDebug->info($body['message']);
                        break;
                    case 'performance':
                        $this->loggerPerformance->info($body['message']);
                        break;
                    case 'modelException':
                        $this->loggerModel->info($body['message']);
                        break;
                    default:
                        break;
                }
            }
            $job->delete();
        }
    }
}


22.8k

This works:

    $this->queue->watch('email');

    while ($job = $this->queue->reserve()) {
        $message = $job->getBody();
    }

This works:

    $this->queue->choose('email');
    $this->queue->watch('email');

    while ($this->queue->peekReady() !== false) {
        $job = $this->queue->reserve();                  
        $message = $job->getBody();
    }`

As opposed to (doesn't work):

    $this->queue->watch('email');

    while ($this->queue->peekReady() !== false) {
        $job = $this->queue->reserve();                  
        $message = $job->getBody();
    }

and (doesn't work):

    $this->queue->choose('email');

    while ($this->queue->peekReady() !== false) {
        $job = $this->queue->reserve();                  
        $message = $job->getBody();
    }

It appears you cannot use peekReady in a tube besides 'default':

    $this->queue->peekReady();

unless you do this:

    $this->queue->choose('email');
    $this->queue->watch('email');


22.8k

Is this a bug with peekReady() ?

I don't know to be honest. Im using only reserve from getting jobes from tube and no problem for me.



22.8k

Neh botha. Ta mate

I don't know to be honest. Im using only reserve from getting jobes from tube and no problem for me.

Which OS do you use? Beanstalkd version?



22.8k

osx 10.11.6 beanstalkd 1.10

OSX again xD :) Have you tried to run your code on production env (i.e. GNU/Linux)?



22.8k

I haven't tested those cases in Centos no.

What makes you think this is related to OSX?

edited Sep '16

Since we discovered that OSX is case insensitive after all (in a weird way), who knows what to expect with such things as daemons.

Give it a shot and try it on GNU/Linux. If it does work, you know what you have to do... and @Jurigag already mentioned he can run this method w/o issues.

I'm using beanstalkd with put() and reserve() methods only, works like a charm for what it needs to do.



22.8k

As it turned out this had nothing to do with put() and reserve(). @stamster 's I changed the title of the thread after your last comment as I realised it had become misleading.

This thread turned out to be about the behaviour of $this->queue->peekReady() when using beanstalkd tubes. I think @Jurigag knows this (as I do) as he said "I don't know to be honest. Im using only reserve from getting jobes from tube and no problem for me.". Thanks @Jurigag as this is what made me realise it was an issue/feature I originally had with peekReady(). So I could investigate a bit more or ignore and just reserve() after choosing the tube. Also why bother using peekReady()? I still have no real idea why its there. Not when you can just reserve() away like pacman and consume away. Maybe I should ask the Beanstalkd protocol guys. All I know is at time of writing its used in example code in the Phalcon docs.

Just had to scratch that itch though. The Beanstalkd protocol doesn't mention anything about peekReady() and tubes but I now know that it works after choosing and then watching the tube.

Right now I think this about a Beanstalkd thing.