This post is a follow up to Ruby beanstalkd basics, I will try to make the example code little more interesting and useful. I am calling this is a Ruby beanstalkd intermediate write up, it sets up a few workers and distributes and receives results simultaneously. In this example the code resembles real code a bit more (using a queue cache and block passing). If there is enough interest in the Ruby/beanstalkd community, I will follow up with beanstalkd advanced lessons, and go into how we deal with failure cases such as worker dying during jobs, random jobs failing, processing multiple ‘projects’ at one time, using job priority settings, and using TTR/timeouts.
So in this example we are making an estimate of PI. Yes I know that there are far better approximations out there than my simple results, but this was what I came up with for an incredibly simple distributed computing problem. I based my example on the PI Calculation problem from an Introduction to Parallel Computing. The basic idea is that you can calculate pi by guessing random points in a square and then seeing how many points are inside a circle that fits inside the square (PI= 4 * points_in_circle/total_points).
I made a bunch of comments in the code that should help you follow but there are a few key sections worth pointing out.
In the Ruby beanstalkd Basics, both the Server and the Clients only used one queue at a time. Now since we are sending on one queue while also listening on another we need access to both queues at once. We simply have a helper function with a queue_cache to make getting and reusing multiple queues incredibly easy.
def get_queue(queue_name)
@queue_cache ||= {}
if @queue_cache.has_key?(queue_name)
return @queue_cache[queue_name]
else
queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
queue.watch(queue_name)
queue.use(queue_name)
queue.ignore('default')
@queue_cache[queue_name] = queue
return queue
end
end
In the basic example each class had a function that got a job and did some work and deleted the job. It is easy to imagine workers that might have many different kinds of work to do on jobs. In every case they are going to grab a job, work on the job, and delete the job. We decided to break that up and make it easy to just pass a work block when workers get a job.
def take_msg(queue)
msg = queue.reserve
#by calling ybody we get the content of the message and convert it from yml
body = msg.ybody
if block_given?
yield(body)
end
msg.delete
end
#call take_msg like so
take_msg(queue) do |body|
#work on body
end
One other thing you should keep a look out for in the code below is checking if a queue has any jobs. Many times workers will check if jobs exist and take them, and if there aren’t any jobs the process is free to do something else. I do this in this example, the server continually checks incoming results to immediately display. If no results have arrived yet, the server continues sending out job requests as fast as it can. This is useful since taking jobs from beanstalkd is a blocking call. They did add support for non-blocking calls in beanstalkd 1.1, but I haven’t started using the newest version yet. I think everything else should be pretty self explanatory, feel free to ask me any questions. To run the code it is the same as before: download beanstalk_intermediate.rb, start beanstalkd, and run the example with ruby.
$ beanstalkd &
$ ruby beanstalk_intermediate.rb
starting distributor
starting client(s)
distributor sending out jobs
.......................................................
.............................................
received all the results our estimate for pi is: 3.142776
| # of workers | time to complete |
| 1 | real 0m7.282s user 0m4.114s sys 0m0.978s |
| 2 | real 0m5.667s user 0m2.736s sys 0m0.670s |
| 3 | real 0m4.999s user 0m2.014s sys 0m0.515s |
| 4 | real 0m4.612s user 0m1.608s sys 0m0.442s |
| 5 | real 0m4.517s user 0m1.474s sys 0m0.416s |
require 'beanstalk-client.rb'
DEFAULT_PORT = 11300
SERVER_IP = '127.0.0.1'
#beanstalk will order the queues based on priority, with the same priority
#it acts FIFO, in a later example we will use the priority
#(higher numbers are higher priority)
DEFAULT_PRIORITY = 65536
#TTR is time for the job to reappear on the queue.
#Assuming a worker died before completing work and never called job.delete
#the same job would return back on the queue (in TTR seconds)
TTR = 3
class BeanBase
#To work with multiple queues you must tell beanstalk which queues
#you plan on writing to (use), and which queues you will reserve jobs from
#(watch). In this case we also want to ignore the default queue
#you need a different queue object for each tube you plan on using or
#you can switch what the tub is watching and using a bunch, we just keep a few
#queues open on the tubes we want.
def get_queue(queue_name)
@queue_cache ||= {}
if @queue_cache.has_key?(queue_name)
return @queue_cache[queue_name]
else
queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
queue.watch(queue_name)
queue.use(queue_name)
queue.ignore('default')
@queue_cache[queue_name] = queue
return queue
end
end
#this will take a message off the queue, and process it with the block
def take_msg(queue)
msg = queue.reserve
#by calling ybody we get the content of the message and convert it from yml
body = msg.ybody
if block_given?
yield(body)
end
msg.delete
end
def results_ready?(queue)
queue.peek_ready!=nil
end
end
class BeanDistributor < BeanBase
def initialize(chunks,points_per_chunk)
@chunks = chunks
@points_per_chunk = points_per_chunk
@messages_out = 0
@circle_count = 0
end
def get_incoming_results(queue)
if(results_ready?(queue))
result = nil
take_msg(queue) do |body|
result = body.count
end
@messages_out -= 1
print "." #display that we received another result
@circle_count += result
else
#do nothing
end
end
def start_distributor
request_queue = get_queue('requests')
results_queue = get_queue('results')
#put all the work on the request queue
puts "distributor sending out #{@messages} jobs"
@chunks.times do |num|
msg = BeanRequest.new(1,@points_per_chunk)
#Take our ruby object and convert it to yml and put it on the queue
request_queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
@messages_out += 1
#if there are results get them if not continue sending out work
get_incoming_results(results_queue)
end
while @messages_out > 0
get_incoming_results(results_queue)
end
npoints = @chunks * @points_per_chunk
pi = 4.0*@circle_count/(npoints)
puts "\nreceived all the results our estimate for pi is: #{pi}"
end
end
class BeanWorker < BeanBase
def initialize()
end
def write_result(queue, result)
msg = BeanResult.new(1,result)
queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
end
def in_circle
#generate 2 random numbers see if they are in the circle
range = 1000000.0
radius = range / 2
xcord = rand(range) - radius
ycord = rand(range) - radius
if( (xcord**2) + (ycord**2) <= (radius**2) )
return 1
else
return 0
end
end
def start_worker
request_queue = get_queue('requests')
results_queue = get_queue('results')
#get requests and do the work until the worker is killed
while(true)
result = 0
take_msg(request_queue) do |body|
chunks = body.count
chunks.times { result += in_circle}
end
write_result(results_queue,result)
end
end
end
############
# These are just simple message classes that we pass using beanstalks
# to yml and from yml functions.
############
class BeanRequest
attr_accessor :project_id, :count
def initialize(project_id, count=0)
@project_id = project_id
@count = count
end
end
class BeanResult
attr_accessor :project_id, :count
def initialize(project_id, count=0)
@project_id = project_id
@count = count
end
end
#how many different jobs we should do
chunks = 100
#how many points to calculate per chunk
points_per_chunk = 10000
#how many workers should we have
#(normally different machines, in our example fork them off)
workers = 5
# Most of the time you will have two entirely separate classes
# but to make it easy to run this example we will just fork and start our server
# and client separately. We will wait for them to complete and check
# if we received all the messages we expected.
puts "starting distributor"
server_pid = fork {
BeanDistributor.new(chunks,points_per_chunk).start_distributor
}
puts "starting client(s)"
client_pids = []
workers.times do |num|
client_pid = fork {
BeanWorker.new.start_worker
}
client_pids << client_pid
end
Process.wait(server_pid)
#take down the clients
client_pids.each do |pid|
Process.kill("HUP",pid)
end