We have moved our forum to GitHub Discussions. For questions about Phalcon v3/v4/v5 you can visit here and for Phalcon v6 here.

How to check if element has in Phalcon Beanstalk Queue

Hello everyone,

I want to use the Beanstalk queue structure that Phalcon already makes available. I want to know if a given element already exists in the queue, but I can't check it.

How can I do this check?

You can only peek the next job, there is no way you can retreive the list of queued items, but that's a limitation of Beanstalkd,

You could use jobPeek ($id), but you'll have to keep track of the $ids returned by put()



12.1k
edited Oct '17

I don't know if I've understood what you need but using the incubator extension of Beanstalk, if you use an identificator inside the job body, I think you can write a worker that returns true if it find that id without deleting or burying the job.

$job = $this->reserveFromTube($tube);
if ($job && ($job instanceof Job)) {
$jobBody = $job->getBody();
if($jobBody['id']=='whatyouWant') return true;
}

Hello @ cosmy81,

Could you give me an example of how to use the Incubator Extension? I use this queue in a modular Phalcon application.

I don't know if I've understood what you need but using the incubator extension of Beanstalk, if you use an identificator inside the job body, I think you can write a worker that returns true if it find that id without deleting or burying the job.

$job = $this->reserveFromTube($tube);
if ($job && ($job instanceof Job)) {
$jobBody = $job->getBody();
if($jobBody['id']=='whatyouWant') return true;
}

Im not sure hacking beanstalk will be the most beneficial... you are trying to make a use-case work where the underlying system was not desinged to support it.

Redis has the feature you are looking for out-of-the-box, if there is no environment limitation i'd go for it instead of meddling with beanstalk....



12.1k
edited Oct '17

This is the incubator beanstalk extension: https://github.com/phalcon/incubator/tree/master/Library/Phalcon/Queue/Beanstalk.
What I meant in the previous post is to add a method that works like i wrote and to use it instead of doWork, bacause If you don't delete or bury jobs, they still remain in your queue.

Hello @lajosbencz

I really have this limitation. I can only use Beanstalk for this. Unfortunately.

Im not sure hacking beanstalk will be the most beneficial... you are trying to make a use-case work where the underlying system was not desinged to support it.

Redis has the feature you are looking for out-of-the-box, if there is no environment limitation i'd go for it instead of meddling with beanstalk....

edited Oct '17

You're mixing responsibilities here. MQ broker is responsible for assigning jobs, not your app, as by definition, there might be many apps which makes use of the same MQ broker. So MQ broker is one large pool, while apps are workers (they do the hard work). Only thing you should do is reserve job in beanstakld in order to make sure your app (if many) have the exclusive lock over that certain job.

Hello @stamster,

I did not get it right. Could you explain better?

You're mixing responsibilities here. MQ broker is responsible for assigning jobs, not your app, as by definition, there might be many apps which makes use of the same MQ broker. So MQ broker is one large pool, while apps are workers (they do the hard work). Only thing you should do is reserve job in beanstakld in order to make sure your app (if many) have the exclusive lock over that certain job.

Just to explain, I'm using a specific tube to store these jobs. My jobs consist of a simple array, enclosing keys and values. I would like to check if a previously stored element exists in this queue. I could go through all the elements of it to do the check, but with Beanstalk I can only go through the elements if I delete the job at the end. Unfortunately.

You're mixing responsibilities here. MQ broker is responsible for assigning jobs, not your app, as by definition, there might be many apps which makes use of the same MQ broker. So MQ broker is one large pool, while apps are workers (they do the hard work). Only thing you should do is reserve job in beanstakld in order to make sure your app (if many) have the exclusive lock over that certain job.



12.1k
edited Nov '17

Just to explain, I'm using a specific tube to store these jobs. My jobs consist of a simple array, enclosing keys and values.
I would like to check if a previously stored element exists in this queue. I could go through all the elements of it to do the check, but with Beanstalk I can only go >through the elements if I delete the job at the end. Unfortunately.

That's true, if you use beanstalk in the right way. But with a little bit hacking you can do a work without deleting and burying it. How to do it depends on how you have integrated beanstalk. In my case i have the doWork cicle that launch the workers, and the workers return true or false. Then the doWork cicle decide to delete or bury the job depending on the value returned. So you can write something similar to doWork, without running workers and without deleting jobs, just checkoing parameters in job's bodies. It's not a good practice in my opinion but I think it's the only way to solve your problem.

edited Nov '17

Hello @cosmy81,

Do you have an example to illustrate?

Thank you very much!

Just to explain, I'm using a specific tube to store these jobs. My jobs consist of a simple array, enclosing keys and values.
I would like to check if a previously stored element exists in this queue. I could go through all the elements of it to do the check, but with Beanstalk I can only go >through the elements if I delete the job at the end. Unfortunately.

That's true, if you use beanstalk in the right way. But with a little bit hacking you can do a work without deleting and burying it. How to do it depends on how you have integrated beanstalk. In my case i have the doWork cicle that launch the workers, and the workers return true or false. Then the doWork cicle decide to delete or bury the job depending on the value returned. So you can write something similar to doWork, without running workers and without deleting jobs, just checkoing parameters in job's bodies. It's not a good practice in my opinion but I think it's the only way to solve your problem.



12.1k

Post your queue implementation and we'll see how to manage inside it.

Hello @cosmy81,

I apologize for the delay. Follow the code:

AbstractBeanstalkQueue:

<?php

namespace Queue;

use Phalcon\Queue\Beanstalk as BQueue;

class AbstractBeanstalkQueue extends AbstractQueue
{
    /**
     * Tube, used in Beanstalk service
     * @var string
     */
    protected $tube = 'default';

    /**
     * {@inheritdoc}
     */
    public function __construct($host = '', $port = 0)
    {
        if (true === empty($port)) {
            $port = BQueue::DEFAULT_PORT;
        }

        if (true === empty($host)) {
            $host = BQueue::DEFAULT_HOST;
        }

        $this->queue = new BQueue([
            'host'       => $host,
            'port'       => $port,
            'persistent' => true,
        ]);

        try
        {
            @$this->queue->connect();
        } catch (\Exception $e) {
            $this->queue = null;
        }
        return $this->queue;
    }

    /**
     * {@inheritdoc}
     */
    public function setElementQueue($element, $urgent = false, $tube = '')
    {
        if (true === empty($tube)) {
            $tube = $this->tube;
        }

        $options      = [];
        $jobQueueData = [
            'type'    => $tube,
            'element' => $element,
        ];

        if (false === empty($urgent)) {
            $options = [
                'priority' => 0,
            ];
        }

        // Connect to the queue
        if (
            false === empty($this->queue) and
            false !== $this->queue->connect() and
            false !== $this->queue->choose($tube)
        ) {
            return $this->queue->put($jobQueueData, $options);
        }

        return 0;
    }

    /**
     * {@inheritdoc}
     */
    public function getElementQueue($tube = '')
    {
        if (true === empty($tube)) {
            $tube = $this->tube;
        }

        if (
            false === empty($this->queue) and
            false !== $this->queue->connect() and
            false !== $this->queue->choose($tube) and
            false !== ($queueJob = $this->queue->peekReady())
        ) {
            $jobBody = $queueJob->getBody();

            // Get the task of the attendance type
            if (false === empty($jobBody['type']) and $this->tube === $jobBody['type']) {
                $attendance = $jobBody['element'];
                $queueJob->delete();
                return $attendance;
            }
        }
        return null;
    }

    /**
     * {@inheritdoc}
     */
    public function isEmpty()
    {
        // Connect to the queue
        if (
            false === empty($this->queue) and
            false !== $this->queue->connect() and
            false !== $this->queue->choose($this->tube)
        ) {
            // Get the total jobs in the queue tube
            return !($this->queue->statsTube($this->tube)['total-jobs'] > 0);
        }
        return true;
    }

    /**
     * {@inheritdoc}
     */
    public function count()
    {
        // Connect to the queue
        if (
            false === empty($this->queue) and
            false !== $this->queue->connect() and
            false !== $this->queue->choose($this->tube)
        ) {
            // Get the total jobs in the queue tube
            return $this->queue->statsTube($this->tube)['total-jobs'];
        }
        return 0;
    }
}

AttendanceBeanstalkQueue:

<?php

namespace Queue\BeanstalkQueue;

use Queue\AbstractBeanstalkQueue;

class AttendanceBeanstalkQueue extends AbstractBeanstalkQueue
{
    /**
     * {@inheritdoc}
     */
    protected $tube = 'attendance';
}