• Ruby Beanstalkd distributed worker intermediate lessons

    This post is a follow up to Ruby beanstalkd basics, I will try to make the example code little more interesting and useful. I am calling this is a Ruby beanstalkd intermediate write up, it sets up a few workers and distributes and receives results simultaneously. In this example the code resembles real code a bit more (using a queue cache and block passing). If there is enough interest in the Ruby/beanstalkd community, I will follow up with beanstalkd advanced lessons, and go into how we deal with failure cases such as worker dying during jobs, random jobs failing, processing multiple 'projects' at one time, using job priority settings, and using TTR/timeouts.

    So in this example we are making an estimate of PI. Yes I know that there are far better approximations out there than my simple results, but this was what I came up with for an incredibly simple distributed computing problem. I based my example on the PI Calculation problem from an Introduction to Parallel Computing. The basic idea is that you can calculate pi by guessing random points in a square and then seeing how many points are inside a circle that fits inside the square (PI= 4 * points_in_circle/total_points).

    I made a bunch of comments in the code that should help you follow but there are a few key sections worth pointing out.

    In the Ruby beanstalkd Basics, both the Server and the Clients only used one queue at a time. Now since we are sending on one queue while also listening on another we need access to both queues at once. We simply have a helper function with a queue_cache to make getting and reusing multiple queues incredibly easy.

    def get_queue(queue_name)
        @queue_cache ||= {}
        if @queue_cache.has_key?(queue_name)
          return @queue_cache[queue_name]
        else
          queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
          queue.watch(queue_name)
          queue.use(queue_name)
          queue.ignore('default')
          @queue_cache[queue_name] = queue
          return queue
        end
      end

    In the basic example each class had a function that got a job and did some work and deleted the job. It is easy to imagine workers that might have many different kinds of work to do on jobs. In every case they are going to grab a job, work on the job, and delete the job. We decided to break that up and make it easy to just pass a work block when workers get a job.

    def take_msg(queue)
        msg = queue.reserve
        #by calling ybody we get the content of the message and convert it from yml
        body = msg.ybody
        if block_given?
          yield(body)
        end
        msg.delete
      end
    
    #call take_msg like so
    take_msg(queue) do |body|
      #work on body
    end

    One other thing you should keep a look out for in the code below is checking if a queue has any jobs. Many times workers will check if jobs exist and take them, and if there aren't any jobs the process is free to do something else. I do this in this example, the server continually checks incoming results to immediately display. If no results have arrived yet, the server continues sending out job requests as fast as it can. This is useful since taking jobs from beanstalkd is a blocking call. They did add support for non-blocking calls in beanstalkd 1.1, but I haven't started using the newest version yet. I think everything else should be pretty self explanatory, feel free to ask me any questions. To run the code it is the same as before: download beanstalk_intermediate.rb, start beanstalkd, and run the example with ruby.

    $ beanstalkd &
    $ ruby beanstalk_intermediate.rb
    starting distributor
    starting client(s)
    distributor sending out jobs
    .......................................................
    .............................................
    received all the results our estimate for pi is: 3.142776

    # of workers time to complete
    1 real 0m7.282s
    user 0m4.114s
    sys 0m0.978s
    2 real 0m5.667s
    user 0m2.736s
    sys 0m0.670s
    3 real 0m4.999s
    user 0m2.014s
    sys 0m0.515s
    4 real 0m4.612s
    user 0m1.608s
    sys 0m0.442s
    5 real 0m4.517s
    user 0m1.474s
    sys 0m0.416s
    require 'beanstalk-client.rb'
    
    DEFAULT_PORT = 11300
    SERVER_IP = '127.0.0.1'
    #beanstalk will order the queues based on priority, with the same priority
    #it acts FIFO, in a later example we will use the priority
    #(higher numbers are higher priority)
    DEFAULT_PRIORITY = 65536
    #TTR is time for the job to reappear on the queue.
    #Assuming a worker died before completing work and never called job.delete
    #the same job would return back on the queue (in TTR seconds)
    TTR = 3
    
    class BeanBase
    
      #To work with multiple queues you must tell beanstalk which queues
      #you plan on writing to (use), and which queues you will reserve jobs from
      #(watch). In this case we also want to ignore the default queue
      #you need a different queue object for each tube you plan on using or
      #you can switch what the tub is watching and using a bunch, we just keep a few
      #queues open on the tubes we want.
      def get_queue(queue_name)
        @queue_cache ||= {}
        if @queue_cache.has_key?(queue_name)
          return @queue_cache[queue_name]
        else
          queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
          queue.watch(queue_name)
          queue.use(queue_name)
          queue.ignore('default')
          @queue_cache[queue_name] = queue
          return queue
        end
      end
    
      #this will take a message off the queue, and process it with the block
      def take_msg(queue)
        msg = queue.reserve
        #by calling ybody we get the content of the message and convert it from yml
        body = msg.ybody
        if block_given?
          yield(body)
        end
        msg.delete
      end
    
      def results_ready?(queue)
        queue.peek_ready!=nil
      end
    
    end
    
    class BeanDistributor < BeanBase
    
      def initialize(chunks,points_per_chunk)
        @chunks = chunks
        @points_per_chunk = points_per_chunk
        @messages_out = 0
        @circle_count = 0
      end
    
      def get_incoming_results(queue)
        if(results_ready?(queue))
          result = nil
          take_msg(queue) do |body|
            result = body.count
          end
          @messages_out -= 1
          print "." #display that we received another result
          @circle_count += result
        else
          #do nothing
        end
      end
    
      def start_distributor
        request_queue = get_queue('requests')
        results_queue = get_queue('results')
        #put all the work on the request queue
        puts "distributor sending out #{@messages} jobs"
        @chunks.times do |num|
          msg = BeanRequest.new(1,@points_per_chunk)
          #Take our ruby object and convert it to yml and put it on the queue
          request_queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
          @messages_out += 1
          #if there are results get them if not continue sending out work
          get_incoming_results(results_queue)
        end
    
        while @messages_out > 0
          get_incoming_results(results_queue)
        end
        npoints = @chunks * @points_per_chunk
        pi = 4.0*@circle_count/(npoints)
        puts "\nreceived all the results our estimate for pi is: #{pi}"
      end
    
    end
    
    class BeanWorker < BeanBase
    
      def initialize()
      end
    
      def write_result(queue, result)
        msg = BeanResult.new(1,result)
        queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
      end
    
      def in_circle
        #generate 2 random numbers see if they are in the circle
        range = 1000000.0
        radius = range / 2
        xcord = rand(range) - radius
        ycord = rand(range) - radius
        if( (xcord**2) + (ycord**2) <= (radius**2) )
          return 1
        else
          return 0
        end
      end
    
      def start_worker
        request_queue = get_queue('requests')
        results_queue = get_queue('results')
        #get requests and do the work until the worker is killed
        while(true)
          result = 0
          take_msg(request_queue) do |body|
            chunks = body.count
            chunks.times { result += in_circle}
          end
          write_result(results_queue,result)
        end
    
      end
    
    end
    
    ############
    # These are just simple message classes that we pass using beanstalks
    # to yml and from yml functions.
    ############
    class BeanRequest
      attr_accessor :project_id, :count
      def initialize(project_id, count=0)
        @project_id = project_id
        @count = count
      end
    end
    
    class BeanResult
      attr_accessor :project_id, :count
      def initialize(project_id, count=0)
        @project_id = project_id
        @count = count
      end
    end
    
    #how many different jobs we should do
    chunks = 100
    #how many points to calculate per chunk
    points_per_chunk = 10000
    #how many workers should we have
    #(normally different machines, in our example fork them off)
    workers = 5
    
    # Most of the time you will have two entirely separate classes
    # but to make it easy to run this example we will just fork and start our server
    # and client separately. We will wait for them to complete and check
    # if we received all the messages we expected.
    puts "starting distributor"
    server_pid = fork {
      BeanDistributor.new(chunks,points_per_chunk).start_distributor
    }
    
    puts "starting client(s)"
    client_pids = []
    workers.times do |num|
      client_pid = fork {
        BeanWorker.new.start_worker
      }
      client_pids << client_pid
    end
    
    Process.wait(server_pid)
    #take down the clients
    client_pids.each do |pid|
      Process.kill("HUP",pid)
    end

    Got a slow Test::Unit or RSpec suite?
    Devver can run it up to three times faster! Request a beta invite today.

    Posted on November 19th, 2008 by Dan in Development, Hacking, Ruby.