Skip to content
This repository has been archived by the owner on Feb 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #53 from Articus/master
Browse files Browse the repository at this point in the history
Custom tube names in Beanstalkd for queues
  • Loading branch information
Jurian Sluiman committed Jul 30, 2015
2 parents 231e031 + 2bc5d3f commit faddf75
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 7 deletions.
7 changes: 7 additions & 0 deletions config/slm_queue_beanstalkd.local.php.dist
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,12 @@ return array(
// 'timeout' => 2
),
),
/**
* Beanstalkd tubes that should be used for queues
*/
'queues' => array(
// 'my-queue' => array('tube' => 'beanstalkd-tube-for-my-queue'),
),

),
);
20 changes: 19 additions & 1 deletion src/SlmQueueBeanstalkd/Factory/BeanstalkdQueueFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace SlmQueueBeanstalkd\Factory;

use SlmQueueBeanstalkd\Options\QueueOptions;
use SlmQueueBeanstalkd\Queue\BeanstalkdQueue;
use Zend\ServiceManager\FactoryInterface;
use Zend\ServiceManager\ServiceLocatorInterface;
Expand All @@ -20,6 +21,23 @@ public function createService(ServiceLocatorInterface $serviceLocator, $name = '
$pheanstalk = $parentLocator->get('SlmQueueBeanstalkd\Service\PheanstalkService');
$jobPluginManager = $parentLocator->get('SlmQueue\Job\JobPluginManager');

return new BeanstalkdQueue($pheanstalk, $requestedName, $jobPluginManager);
$queueOptions = $this->getQueueOptions($parentLocator, $requestedName);

return new BeanstalkdQueue($pheanstalk, $requestedName, $jobPluginManager, $queueOptions);
}

/**
* Returns custom beanstalkd options for specified queue
* @param ServiceLocatorInterface $serviceLocator
* @param string $queueName
* @return QueueOptions
*/
protected function getQueueOptions(ServiceLocatorInterface $serviceLocator, $queueName)
{
$config = $serviceLocator->get('Config');
$queuesOptions = isset($config['slm_queue']['queues'])? $config['slm_queue']['queues'] : array();
$queueOptions = isset($queuesOptions[$queueName])? $queuesOptions[$queueName] : array();

return new QueueOptions($queueOptions);
}
}
1 change: 0 additions & 1 deletion src/SlmQueueBeanstalkd/Options/BeanstalkdOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class BeanstalkdOptions extends AbstractOptions
*/
protected $connection;


/**
* Set the connection options
*
Expand Down
31 changes: 31 additions & 0 deletions src/SlmQueueBeanstalkd/Options/QueueOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

namespace SlmQueueBeanstalkd\Options;

use Zend\Stdlib\AbstractOptions;

class QueueOptions extends AbstractOptions
{
/**
* @var string
*/
protected $tube = '';

/**
* Get beanstalkd tube name for queue
* @return string
*/
public function getTube()
{
return $this->tube;
}

/**
* Set beanstalkd tube for queue
* @param string $tube
*/
public function setTube($tube)
{
$this->tube = $tube;
}
}
32 changes: 27 additions & 5 deletions src/SlmQueueBeanstalkd/Queue/BeanstalkdQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use SlmQueue\Job\JobInterface;
use SlmQueue\Job\JobPluginManager;
use SlmQueue\Queue\AbstractQueue;
use SlmQueueBeanstalkd\Options\QueueOptions;

/**
* BeanstalkdQueue
Expand All @@ -18,6 +19,10 @@ class BeanstalkdQueue extends AbstractQueue implements BeanstalkdQueueInterface
*/
protected $pheanstalk;

/**
* @var string
*/
protected $tubeName;

/**
* Constructor
Expand All @@ -26,9 +31,17 @@ class BeanstalkdQueue extends AbstractQueue implements BeanstalkdQueueInterface
* @param string $name
* @param JobPluginManager $jobPluginManager
*/
public function __construct(Pheanstalk $pheanstalk, $name, JobPluginManager $jobPluginManager)
{
public function __construct(
Pheanstalk $pheanstalk,
$name,
JobPluginManager $jobPluginManager,
QueueOptions $options = null
) {
$this->pheanstalk = $pheanstalk;
$this->tubeName = $name;
if (($options !== null) && $options->getTube()) {
$this->tubeName = $options->getTube();
}
parent::__construct($name, $jobPluginManager);
}

Expand All @@ -43,7 +56,7 @@ public function __construct(Pheanstalk $pheanstalk, $name, JobPluginManager $job
public function push(JobInterface $job, array $options = array())
{
$identifier = $this->pheanstalk->putInTube(
$this->getName(),
$this->getTubeName(),
$this->serializeJob($job),
isset($options['priority']) ? $options['priority'] : Pheanstalk::DEFAULT_PRIORITY,
isset($options['delay']) ? $options['delay'] : Pheanstalk::DEFAULT_DELAY,
Expand All @@ -64,7 +77,7 @@ public function push(JobInterface $job, array $options = array())
public function pop(array $options = array())
{
$job = $this->pheanstalk->reserveFromTube(
$this->getName(),
$this->getTubeName(),
isset($options['timeout']) ? $options['timeout'] : null
);

Expand Down Expand Up @@ -118,7 +131,16 @@ public function bury(JobInterface $job, array $options = array())
*/
public function kick($max)
{
$this->pheanstalk->useTube($this->getName());
$this->pheanstalk->useTube($this->getTubeName());
return $this->pheanstalk->kick($max);
}

/**
* Get the name of the beanstalkd tube that is used for storing queue
* @return string
*/
public function getTubeName()
{
return $this->tubeName;
}
}
7 changes: 7 additions & 0 deletions tests/SlmQueueBeanstalkdTest/Queue/BeanstalkdQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public function setUp()
$this->queue = new BeanstalkdQueue($this->pheanstalk, $this->queueName, $this->pluginManager);
}

public function testTubeNameGetter()
{
$tubeName = $this->queueName;
$result = $this->queue->getTubeName();
$this->assertEquals($result, $tubeName);
}

public function testSuccessfulKickWithSelectedTube()
{
$maxKick = 10;
Expand Down
108 changes: 108 additions & 0 deletions tests/SlmQueueBeanstalkdTest/Queue/BeanstalkdQueueTubeTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?php

namespace SlmQueueBeanstalkdTest\Queue;

use Pheanstalk\Job as PheanstalkJob;
use PHPUnit_Framework_TestCase as TestCase;
use SlmQueueBeanstalkd\Options\QueueOptions;
use SlmQueueBeanstalkd\Queue\BeanstalkdQueue;
use SlmQueueBeanstalkdTest\Asset\SimpleJob;

/**
* BeanstalkdQueue Test for custom tube name
*/
class BeanstalkdQueueTubeTest extends TestCase
{
/**
* @var string
*/
protected $queueName;
/**
* @var string
*/
protected $tubeName;
/**
* @var BeanstalkdQueue
*/
protected $queue;
/**
* @var \Pheanstalk\Pheanstalk|\PHPUnit_Framework_MockObject_MockObject
*/
protected $pheanstalk;
/**
* @var \SlmQueue\Job\JobPluginManager|\PHPUnit_Framework_MockObject_MockObject
*/
protected $pluginManager;

public function setUp()
{
$this->queueName = 'testQueueName';
$this->tubeName = 'testQueueTubeName';
$this->pheanstalk = $this->getMockBuilder('Pheanstalk\Pheanstalk')
->disableOriginalConstructor()
->getMock();

$this->pluginManager = $this->getMockBuilder('SlmQueue\Job\JobPluginManager')
->disableOriginalConstructor()
->getMock();

$queueOptions = new QueueOptions();
$queueOptions->setTube($this->tubeName);

$this->queue = new BeanstalkdQueue($this->pheanstalk, $this->queueName, $this->pluginManager, $queueOptions);
}

public function testTubeNameGetter()
{
$tubeName = $this->tubeName;
$result = $this->queue->getTubeName();
$this->assertEquals($result, $tubeName);
}

public function testSuccessfulKickWithSelectedTube()
{
$maxKick = 10;
$tubeName = $this->tubeName;
$pheanstalk = $this->pheanstalk;

$pheanstalk->expects($this->once())
->method('useTube')
->with($this->equalTo($tubeName))
->will($this->returnValue($pheanstalk));

$pheanstalk->expects($this->once())
->method('kick')
->with($this->equalTo($maxKick))
->will($this->returnValue($maxKick));

$result = $this->queue->kick($maxKick);
$this->assertEquals($result, $maxKick);
}

public function testPopPreservesMetadata()
{
$pheanstalk = $this->pheanstalk;
$tubeName = $this->tubeName;
$pluginManager = $this->pluginManager;

$job = new SimpleJob;
$job->setMetadata('foo', 'bar');

$pheanstalkJob = new PheanstalkJob(1, $this->queue->serializeJob($job));

$pheanstalk->expects($this->once())
->method('reserveFromTube')
->with($this->equalTo($tubeName))
->will($this->returnValue($pheanstalkJob));

$pluginManager->expects($this->once())
->method('get')
->with(get_class($job))
->will($this->returnValue($job));

$result = $this->queue->pop();

$this->assertEquals($result, $job);
$this->assertEquals('bar', $job->getMetadata('foo'));
}
}

0 comments on commit faddf75

Please sign in to comment.