• Ruby Beanstalkd distributed worker basics

    by Dan

    At Devver we have a lot of jobs to do quickly, so we distribute our work out to a group of EC2 workers. We have tried and used a number of queuing solutions with Ruby, but in the end beanstalkd seemed to be the best solution for us at the time.

    I have only seen a few posts about the basics of using beanstalkd with Ruby. I decided to make two posts evolving a simple Ruby beanstalkd example into a more complicated example. This way people new to beanstalkd could see how easy it can be to get up and running with distributed processing using Ruby and beanstalkd. Then people that are doing more advanced work with beanstalkd could see some examples of how we are working with it here at Devver. It would also be great for more experienced beanstalkd warriors to share their thoughts as there aren't many examples out in the wild. The lack of examples makes it harder to learn and difficult to decide what the best practices are when working with beanstalkd queues.

    I have also shared two scripts we have found useful while working with beanstalkd. beanstalk_monitor.rb, which lets you see all the queue statistics about current usage, or to monitor the information of a single queue you are interested in. Finally, beanstalk_killer.rb, which is useful if you want to work on how your code will react to beanstalkd getting backed up or stalling (in beanstalkd speak, "Putting on the brakes"). It was a little harder to pull everything out and make a simple example from our code than I thought, and obviously the example is a bit useless. It should still give a solid example of how to do the basics of distributing jobs with beanstalkd.

    For those new to beanstalk, there are a few things you will need to know like how to get a queue object, how to put objects on the queue, how to take objects off the queue, and how to control which queue you are working with. For a higher level overview or more detailed information, I recommend checking out the beanstalkd FAQ. The full example code is below, but first taking a look at the basic snippets might help.

    #to work with beanstalk you need to get a client connection
    queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
    #by default you will be working on the 'default' tube or queue
    #if we wanted to work on a different queue we could change tubes, like so
    queue.watch('test_queue')
    queue.use('test_queue')
    queue.ignore('default')
    #to put a simple string on a queue
    queue.put('hello queue world')
    #to receive a simple string
    job = queue.reserve
    puts job.body #prints 'hello queue world'
    #if you don't delete the job when you're done, the queue assumes there is an error
    #and the job will show back up on the queue again
    job.delete
    

    How to run this example (on OS X, with macports installed)

    > sudo port install beanstalkd
    > sudo gem install beanstalk-client
    > beanstalkd
    > ruby beanstalk_tester.rb

    Download: beanstalk_tester.rb

    require 'beanstalk-client.rb'
    
    DEFAULT_PORT = 11300
    SERVER_IP = '127.0.0.1'
    #beanstalk will order the queues based on priority, with the same priority
    #it acts FIFO, in a later example we will use the priority
    #(higher numbers are higher priority)
    DEFAULT_PRIORITY = 65536
    #TTR is time for the job to reappear on the queue.
    #Assuming a worker died before completing work and never called job.delete
    #the same job would return back on the queue (in seconds)
    TTR = 3
    
    class BeanBase
    
      #To work with multiple queues you must tell beanstalk which queues
      #you plan on writing to (use), and which queues you will reserve jobs from
      #(watch). In this case we also want to ignore the default queue
      def get_queue(queue_name)
        queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
        queue.watch(queue_name)
        queue.use(queue_name)
        queue.ignore('default')
        queue
      end
    
    end
    
    class BeanDistributor < BeanBase
    
      def initialize(amount)
        @messages = amount
      end
    
      def start_distributor
        #put all the work on the request queue
        bean_queue = get_queue('requests')
        @messages.times do |num|
          msg = BeanRequest.new(1,num)
          #Take our ruby object and convert it to yml and put it on the queue
          bean_queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
        end
    
        puts "distributor now getting results"
        #get all the results from the results queue
        bean_queue = get_queue('results')
        @messages.times do |num|
          result = take_msg(bean_queue)
          puts "result: #{result}"
        end
    
      end
    
      #this will take a message off the queue, process it and return the result
      def take_msg(queue)
        msg = queue.reserve
        #by calling ybody we get the content of the message and convert it from yml
        count = msg.ybody.count
        msg.delete
        return count
      end
    
    end
    
    class BeanWorker < BeanBase
    
      def initialize(amount)
        @messages = amount
        @received_msgs = 0
      end
    
      def start_worker
        results = []
        #get and process all the requests, on the requests queue
        bean_queue = get_queue('requests')
        @messages.times do |num|
          result = take_msg(bean_queue)
          results << result
          @received_msgs += 1
        end
    
        #return all of the results, by placing them on the separate results queue
        bean_queue = get_queue('results')
        results.each do |result|
          msg = BeanResult.new(1,result)
          bean_queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
        end
    
        #this is just to pass information out of the forked process
        #we return the number of messages we received as our exit status
        exit @received_msgs
      end
    
      #this will take a message off the queue, process it and return the result
      def take_msg(queue)
        msg = queue.reserve
        #by calling ybody we get the content of the message and convert it from yml
        count = msg.ybody.count
        result = count*count
        msg.delete
        return result
      end
    
    end
    
    ############
    # These are just simple message classes that we pass using beanstalks
    # to yml and from yml functions.
    ############
    class BeanRequest
      attr_accessor :project_id, :count
      def initialize(project_id, count=0)
        @project_id = project_id
        @count = count
      end
    end
    
    class BeanResult
      attr_accessor :project_id, :count
      def initialize(project_id, count=0)
        @project_id = project_id
        @count = count
      end
    end
    
    #write X messages on the queue
    numb = 10
    
    recv_count = 0
    
    # Most of the time you will have two entirely seperate classes
    # but to make it easy to run this example we will just fork and start our server
    # and client seperately. We will wait for them to complete and check
    # if we received all the messages we expected.
    puts "starting distributor"
    server_pid = fork {
      BeanDistributor.new(numb).start_distributor
    }
    
    puts "starting client"
    client_pid = fork {
      BeanWorker.new(numb).start_worker
    }
    
    Process.wait(client_pid)
    recv_count = $?.exitstatus
    puts "client finished received #{recv_count} msgs"
    if(numb==recv_count)
      puts "received the expected number of messages"
    else
      puts "error didn't receive the correct number of messages"
    end
    
    Process.wait(server_pid)
    

    Devver Caliper: Hosted metric_fu for your Ruby project.
    Get set up in under a minute

    Posted on October 28th, 2008 by Dan in Development, Devver, Hacking, Tips & Tricks.