• Sending Files with EventMachine

    Devver has to keep the developer's environment synchronized with our servers. To do this our Devver client sends all of the project files to our servers. We currently have a EventMachine client transfer files over SSL to a EventMachine server. We went through various stages and methods of sending files with EventMachine before finding a good solution. On smaller projects we didn't even realize how bad our performance was. After bringing up some larger projects we realized we needed to look more into our file transfer performance. Since I couldn't find much out on the web about this, I thought sharing some examples of how we had set up our EventMachine clients and servers to send files might be useful to someone else out there.

    I got some help from people on the EventMachine mailing list, here is the thread discussing sending large files with EventMachine.

    Since I was already playing around with a few of our options, I decided to do some comparisons between using EM.send_data, EM.stream_file_data, an alternative buffer recommended by James Tucker, our crappy buffer we have been using before we discovered the default EM BufferedTokenizer, and layering compression on top of the various methods. We had hacked together our buffer tokenizer rather quickly, and it always performed well enough in our initial testing, but it shows why performance tests are worth a little bit of effort. The benchmarks on the various setups are below (This was all done on localhost, it is worth noting that compression helps much more between remote servers).

    Sending log file with compression turned off (5 times)
    OurBadBufferedTokenizer: 10.40 s
    Standard EM BufferedTokenizer: 0.93 s
    Tucker's BufferedTokenizer: 0.92 s
    steam_file_data w/ EM BufferedTokenizer: 0.98 s

    Sending log file with compression turned on (5 times)
    OurBadBufferedTokenizer: 1.02 s
    Standard EM BufferedTokenizer: 0.99 s
    Tucker's BufferedTokenizer: 1.04 s
    steam_file_data w/ EM BufferedTokenizer: N/A can't use stream_file_data with on the fly compression

    Sending compressed Mp3 file with compression turned off (5 times)
    OurBadBufferedTokenizer: 18.55 s
    Standard EM BufferedTokenizer: 1.09 s
    Tucker's BufferedTokenizer: 1.10 s
    steam_file_data w/ EM BufferedTokenizer: 1.22 s

    Adding compression to already compressed files like mp3s doesn't change the time significantly. This is a longer run just to show how the times vary with a larger test. I also tested on full projects and the variance seemed to hold.
    Sending compressed Mp3 file with compression turned off (25 times)
    OurBadBufferedTokenizer: N/A takes too long
    Standard EM BufferedTokenizer: 5.70 s
    Tucker's BufferedTokenizer: 4.38 s
    steam_file_data w/ EM BufferedTokenizer: 4.82 s

    Below are the little tests and examples I was working with. Obviously you won't have the same files on your system or Tucker's buffer, so I packed everything up as zip. To try everything out just download the EventMachine sending files tests zip. Then extract, and run 'ruby em_send_file_test.rb'. Any thoughts or feedback are welcome, I am still learning the ins and outs of EventMachine so feel free to send me any tips.

    dir = File.expand_path(File.dirname(__FILE__))
    unless($LOAD_PATH.member?(dir))
      $LOAD_PATH.unshift(dir)
    end
    
    require 'test/unit'
    require 'eventmachine'
    require 'zlib'
    require 'yaml'
    require 'ruby-debug'
    require 'buffered_tokenizer_pastie'
    require 'benchmark'
    
    Thread.abort_on_exception = true
    
    SERVER_PORT = 7999
    SERVER_IP = '127.0.0.1'
    TOKEN = "|DEFAULTDELIMITED|"
    #check with different types of files compression
    #results varies a bunch for txt vs compressed like mp3
    FILE_NAME = '~/development.log'
    #FILE_NAME = '~/Blue.mp3'
    COMPRESS = false
    #COMPRESS = true
    
    TIMES = 5
    
    class EmClientExample < EventMachine::Connection
    
      def unbind
        puts "client connection has terminated"
      end
    
      def process(data)
        puts "client got data: #{data}"
        send_files() if data=="success"
        send(prepare("some_msg")) if data=="filesDone"
        send(prepare("quit")) if data=="ack"
        if(data=="goodbye")
          puts "Client successfully sent all data shutting down!!!!"
          EventMachine::stop_event_loop
        end
      end
    
      def send_files()
        puts "sending files"
        @files = Array.new(TIMES,[FILE_NAME, Time.now.to_s])
        send_files_loop
      end
    
      def send_files_loop
        if @files && @files.length > 0
          file = @files.shift
          EM.next_tick do
            send_file(file[0],file[1])
            send_files_loop
          end
        else
          puts "done syncing files"
          send(prepare("files_completed"))
        end
      end
    
      def send_file(path,mtime)
        puts "Syncing "+path
        contents = File.read(File.expand_path(path))
        contents = Zlib::Deflate.deflate(contents,Zlib::BEST_SPEED) if COMPRESS
        send(prepare("send_file #{path}, #{mtime}, content:#{contents}"))
      end
    
      def send(str)
        #puts "sending: #{str}"
        send_data str
      end
    
      def prepare(str)
        str+TOKEN
      end
    
      def self.push_start()
        EventMachine.connect(SERVER_IP,SERVER_PORT,self) do |c|
          c.send_files()
        end
      end
    
    end
    
    class EmClientExampleBadBuffer < EmClientExample
    
      attr_accessor :buffer
    
      def initialize(*args)
        super
        @buffer = DataBuffer.new
      end
    
      def receive_data(data)
        @buffer.append(data)
        while(command = @buffer.grab)
          process(command)
        end
      end
    
      def prepare(str)
        @buffer.prepare(str)
      end
    
    end
    
    class EmClientExampleBuffToken < EmClientExample
    
      def initialize(*args)
        super
        @recv_buffer = BufferedTokenizer.new(TOKEN)
      end
    
      def receive_data(data)
        @recv_buffer.extract(data).each do |m|
          process(m)
        end
      end
    
    end
    
    class EmClientExampleStreamBuffToken < EmClientExample
    
      def initialize(*args)
        super
        @recv_buffer = BufferedTokenizer.new(TOKEN)
      end
    
      def send_files_loop
        if @files && @files.length > 0
          file = @files.shift
          EM.next_tick do
            send_file(file[0],file[1])
          end
        else
          puts "done syncing files"
          send(prepare("files_completed"))
        end
      end
    
      def send_file(path,mtime)
        puts "Syncing "+path
        send("send_file #{path}, #{mtime}, content:")
    
        EM::Deferrable.future( stream_file_data(File.expand_path(path)) ) {
          send(prepare(""))
          send_files_loop
        }
      end
    
      def receive_data(data)
        @recv_buffer.extract(data).each do |m|
          process(m)
        end
      end
    
    end
    
    class EmClientExamplePastie < EmClientExample
    
      def initialize(*args)
        super
        @recv_buffer = BufferedTokenizerPastie.new(TOKEN)
      end
    
      def receive_data(data)
        @recv_buffer.extract(data).each do |m|
          process(m)
        end
      end
    
    end
    
    class EmServerExample < EventMachine::Connection
    
      def post_init
        if(@signature)
          client = Socket.unpack_sockaddr_in(get_peername)
          puts "Received a new connection from #{client.last}:#{client.first}"
        end
      end
    
      def unbind
        puts "server connection has terminated\n"
      end
    
      def process(data)
        #puts "server: #{data[0..15]}"
        send(prepare("success")) if data=="login"
        send(prepare("filesDone")) if data=="files_completed"
        send(prepare("ack")) if data=="some_msg"
        if data.match(/^send_file/)
          #puts data[0..40]
          puts "received file"
          start = data.index(", content:") + ", content:".length
          ender = data.length
          contents = data[start,ender]
          contents = Zlib::Inflate.inflate(contents) if COMPRESS
          file_contents = File.read(File.expand_path(FILE_NAME))
          if contents != file_contents
            puts "file was corrupted"
            puts "received length: #{contents.length} file lenght: #{file_contents.length}"
            #File.open(File.expand_path("~/copy.file"),"w") do |f|
            #  f << contents
            #end
          end
        end
        if data=="quit"
          send(prepare("goodbye"))
          close_connection_after_writing
        end
      end
    
      def prepare(str)
        str+TOKEN
      end
    
      def send(msg)
        #puts "server sent: #{msg}"
        send_data msg
      end
    
    end
    
    class EmServerExampleBadBuffer < EmServerExample
    
      def initialize(*args)
        super
        @buffer = DataBuffer.new
      end
    
      def receive_data(data)
        @buffer.append(data)
        while(command = @buffer.grab)
          process(command)
        end
      end
    
      def prepare(str)
        @buffer.prepare(str)
      end
    
    end
    
    class EmServerExampleBuffToken < EmServerExample
    
      def initialize(*args)
        super
        @recv_buffer = BufferedTokenizer.new(TOKEN)
      end
    
      def receive_data(data)
        @recv_buffer.extract(data).each do |m|
          process(m)
        end
      end
    
    end
    
    class EmServerExamplePastie < EmServerExample
    
      def initialize(*args)
        super
        @recv_buffer = BufferedTokenizerPastie.new(TOKEN)
      end
    
      def receive_data(data)
        @recv_buffer.extract(data).each do |m|
          process(m)
        end
      end
    
    end
    
    class DataBuffer
      FRONT_DELIMITER = "0x5b".hex.chr # '['
      BACK_DELIMITER = "0x5d".hex.chr #']'[0].to_s(16).hex.chr
      DELIMITER = "|#{FRONT_DELIMITER}#{FRONT_DELIMITER}#{FRONT_DELIMITER}GT_DELIM#{BACK_DELIMITER}#{BACK_DELIMITER}#{BACK_DELIMITER}#{BACK_DELIMITER}|"
      DELIM_ESCAPE = /#{Regexp.escape(DELIMITER)}/
        DELIM_ESCAPE_END = /#{Regexp.escape(DELIMITER)}\Z/
    
        def initialize
          @unprocessed = ""
          @commands = []
        end
    
        def grab
          new_messages = @unprocessed.split(DELIM_ESCAPE)
          while new_messages.length > 1
            @commands << new_messages.shift
          end
          msg_length = new_messages.length
          if msg_length > 0
            if msg_length == 1 && (@unprocessed=~DELIM_ESCAPE_END)
              @commands.push(new_messages.shift)
              @unprocessed = ""
            else
              #put the rest of the last statement back into the buffer
              while(cut=@unprocessed.index(DELIM_ESCAPE))
                @unprocessed = (@unprocessed[cut..@unprocessed.length]).sub(DELIMITER,"")
              end
            end
          end
          if @commands.length > 0
            return @commands.shift
          else
            return nil
          end
        end
    
        def prepare(str)
          str.to_s+DELIMITER
        end
    
        def append(data)
          @unprocessed = @unprocessed + data
        end
    
      end
    
      class EmSendFileTest < Test::Unit::TestCase
    
        def test_placeholder
          assert true
        end
    
        def start_server(server_type)
          server_pid = fork {
            EventMachine::run do
              EventMachine::start_server SERVER_IP, SERVER_PORT, server_type
              puts "Server now accepting requests..."
            end
          }
          server_pid
        end
    
        def start_client(client_type)
          client_pid = fork {
            EventMachine::run { client_type.push_start() }
          }
          client_pid
        end
    
        def run_against_server_client(client_example, server_example)
          assert_nothing_raised do
            puts Benchmark.realtime {
              server_pid = start_server(server_example)
              #make sure server is up for client to connect to
              sleep(0.2)
              client_pid = start_client(client_example)
              sleep(0.2)
    
              Process.wait(client_pid)
              puts "client finished"
    
              #I don't know a clean way to end event machine take it down
              Process.kill('KILL',server_pid)
              Process.waitall
            }
            puts "##############################################################"
          end
        end
    
        def test_em_send_files_with_em_buffered_tokenizer
          puts "send files test with em buffered tokenizer"
          client_example = EmClientExampleBuffToken
          server_example = EmServerExampleBuffToken
          run_against_server_client(client_example, server_example)
        end
    
        def test_em_stream_files_with_em_buffered_tokenizer
          puts "steam_file_data test with em buffered tokenizer"
          if COMPRESS == true
            puts "steam_file_data can't be used with on the fly compression"
          else
            client_example = EmClientExampleStreamBuffToken
            server_example = EmServerExampleBuffToken
            run_against_server_client(client_example, server_example)
          end
        end
    
        def test_em_send_files_with_bad_tokenizer
          puts "send files test with our bad bueffered tokenizer"
          client_example = EmClientExampleBadBuffer
          server_example = EmServerExampleBadBuffer
          run_against_server_client(client_example, server_example)
        end
    
        def test_em_send_files_with_pastie_tokenizer
          puts "send files test with the pastied tokenizer"
          client_example = EmClientExamplePastie
          server_example = EmServerExamplePastie
          run_against_server_client(client_example, server_example)
        end
    
      end
    

    Got a slow Test::Unit or RSpec suite?
    Devver can run it up to three times faster! Request a beta invite today.

    Posted on October 8th, 2008 by Dan in Uncategorized.