require 'benchmark' require 'memcache' require 'beanstalk-client' require "lib/aws/sqs.rb" require "lib/aws/sqs/client.rb" require "lib/aws/sqs/queue.rb" require 'lib/server_config' require 'ruby-debug' REMOTE_SERVER_IP = "ec2-72-44-38-120.compute-1.amazonaws.com" #number of test messages to send for timing #FOR Amazon SQS implement a server_config as we have to return your private keys #or hardcode the values below #uncomment the variable queues below to turn on which ever queues you wish to test MESSAGES = 100 class MemoryQueue def initialize @queue = [] end def put(value) @queue.push(value) end def take @queue.shift end end class StarlingQueue def initialize @starling = MemCache.new("#{ip_addr}:#{port}") @queue_name = 'queue' end def put(value) @starling.set(@queue_name,value) end def take @starling.get(@queue_name) end end class LocalStarlingQueue < StarlingQueue # note: you need to manually start starling by executing # > sudo starling # locally, after sudo gem install starling, and sudo mkdir /var/spool/starling def ip_addr "127.0.0.1" end def port 22122 end end class RemoteStarlingQueue < StarlingQueue # note: you need to manually start starling by executing # > starling -h -p 8080 # on an amazon machine def ip_addr REMOTE_SERVER_IP end def port 8080 end end class SQS1 def initialize config = ServerConfig.instance key, secret_key, endpoint = config.AWS_ACCESS_KEY_ID, config.AWS_SECRET_ACCESS_KEY, config.ENDPOINT client = AWS::SQS::Client.new(key,secret_key,:endpoint => endpoint) queue_name = 'sqs1-queue' @queue = client.create_queue(queue_name) end def put(value) @queue.send_message(value.to_s) end def take message = @queue.receive_messages(1) receipt_handle = message[0]["Message"][0]["ReceiptHandle"][0] #message["Message"][0]["ReceiptHandle"][0] @queue.delete_message(receipt_handle) message end end class SQS2 def initialize config = ServerConfig.instance key, secret_key, endpoint = config.AWS_ACCESS_KEY_ID, config.AWS_SECRET_ACCESS_KEY, config.ENDPOINT SQS.access_key_id = key SQS.secret_access_key = secret_key queue_name = "sqs2-queue" @queue = SQS.create_queue queue_name end def put(value) @queue.send_message(value) end def take message = @queue.receive_message message.delete message.body end end class BeanstalkClient # note: you need to manually start beanstalkd by executing # > beanstalkd # of course after installing beanstalkd (http://xph.us/software/beanstalkd/) def initialize() @beanstalk = Beanstalk::Pool.new(['localhost:11300']) end def put(msg) @beanstalk.put(msg.to_s) end def take job = @beanstalk.reserve job.delete job.body end end class BeanstalkClientRemote # note: you need to manually start beanstalkd on a EC2 instance # > beanstalkd # of course after installing beanstalkd (http://xph.us/software/beanstalkd/) def initialize() # put amazon ip here @beanstalkR = Beanstalk::Pool.new(["#{REMOTE_SERVER_IP}:11300"]) end def put(msg) @beanstalkR.put(msg.to_s) end def take job = @beanstalkR.reserve job.delete job.body end end # Note uncomment whatever set of msging systems you want to compare queues = [MemoryQueue, BeanstalkClient] #queues = [MemoryQueue, LocalStarlingQueue] #queues = [MemoryQueue, SQS1] #note sqs is slow set a lot msg level #queues = [MemoryQueue, LocalStarlingQueue, BeanstalkClient] #queues = [MemoryQueue, BeanstalkClientRemote, RemoteStarlingQueue] #queues = [MemoryQueue, LocalStarlingQueue, BeanstalkClient, BeanstalkClientRemote, RemoteStarlingQueue] #queues = [MemoryQueue, LocalStarlingQueue, BeanstalkClient, BeanstalkClientRemote, RemoteStarlingQueue, SQS1] queues.map!{|x|x.new} def mean(ary) ary.inject(0) { |sum, i| sum += i }/ary.length.to_f end def std_dev(ary, mean) Math.sqrt( (ary.inject(0) { |dev, i| dev += (i - mean) ** 2}/ary.length.to_f) ) end def total_time(queue) MESSAGES.times do |x| queue.put(x) end MESSAGES.times do |x| queue.take end end @stats = {} def operation_stats(queue) put_times = [] take_times = [] MESSAGES.times do |x| put_times << Benchmark.realtime { queue.put(x) } end MESSAGES.times do |x| take_times << Benchmark.realtime { queue.take } end put_mean = mean(put_times) put_dev = std_dev(put_times,put_mean) take_mean = mean(take_times) take_dev = std_dev(take_times,take_mean) @stats[queue.class]=[put_mean,put_dev,take_mean,take_dev] end def show_stats(queue) put_mean, put_dev, take_mean, take_dev = @stats[queue.class] printf "mean time for put : %f\n", put_mean printf "std dev for put: %f\n", put_dev printf "mean time for take: %f\n", take_mean printf "std dev for take: %f\n", take_dev end def compare_stats(queue) baseline = MemoryQueue put_mean, put_dev, take_mean, take_dev = @stats[queue.class] base_put_mean, base_put_dev, base_take_mean, base_take_dev = @stats[baseline] puts "put mean is #{put_mean/base_put_mean} slower than #{baseline}" puts "take mean is #{take_mean/base_take_mean} slower than #{baseline}" end Benchmark.bmbm(label_width=7) do |x| queues.each do |queue| x.report("#{queue.class}:") { total_time(queue)} end end puts "=======================" queues.each do |queue| operation_stats(queue) end queues.each do |queue| puts "\n\n#{queue.class}:::::" show_stats(queue) compare_stats(queue) end