ruby websocket_celluloid_server.rb

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ruby websocket_celluloid_server.rb相关的知识,希望对你有一定的参考价值。

require 'json'
require 'active_support/core_ext/numeric/bytes'
require 'delegate'
require 'forwardable'
require 'time'
require 'celluloid/io'
require 'websocket/driver'
require 'webmachine'
require 'webmachine/adapters/rack'
require 'celluloid/autostart'

module Wamp
  module Consts
    ABORT_MSG   = '[3,{},"wamp.error.system_shutdown"]'
    GOODBYE_MSG = 'wamp.error.system_shutdown'.freeze
    GOODBYE_REPLY_MSG = 'wamp.error.goodbye_and_out'.freeze

    WEBSOCKET_PROTOS = ['wamp.2.json'.freeze].freeze
    REALM = 'com.example'.freeze
    ROLES = {roles: {broker: {}.freeze, dealer: {}.freeze}.freeze}.freeze
    TIMEOUT = (5).freeze

    URI_MATCH = %r{^(([0-9a-z_-]{2,}\.)|\.)*([0-9a-z_-]{2,})?$}i.freeze

    TWO_POW_53 = (2**53).freeze
    ONE_MEGABYTE = (1.megabyte).freeze
    EMPTY_DICT = {}.freeze
    EMPTY_LIST = [].freeze

    HTTP_11 = 'HTTP/1.1'.freeze
    CRLF = "\r\n".freeze
    RACK_URL_SCHEME = 'rack.url_scheme'.freeze
    HTTP = 'http'.freeze

    BUFFER_SIZE = (16384).freeze
  end
end

module Wamp
  class Status < ::Delegator
    REASON = {
      1  => 'HELLO',
      2  => 'WELCOME',
      3  => 'ABORT',
      4  => 'CHALLENGE',
      5  => 'AUTHENTICATE',
      6  => 'GOODBYE',
      7  => 'HEARTBEAT',
      8  => 'ERROR',
      16 => 'PUBLISH',
      17 => 'PUBLISHED',
      32 => 'SUBSCRIBE',
      33 => 'SUBSCRIBED',
      34 => 'UNSUBSCRIBE',
      35 => 'UNSUBSCRIBED',
      36 => 'EVENT',
      48 => 'CALL',
      49 => 'CANCEL',
      50 => 'RESULT',
      64 => 'REGISTER',
      65 => 'REGISTERED',
      66 => 'UNREGISTER',
      67 => 'UNREGISTERED',
      68 => 'INVOCATION',
      69 => 'INTERRUPT',
      70 => 'YIELD'
    }.each { |_, v| v.freeze }.freeze

    class << self
      # Coerces given value to Status.
      #
      # @example
      #
      #   Status.coerce("hello")      # => Status.new(1)
      #   Status.coerce(:hello)       # => Status.new(1)
      #   Status.coerce(1.0)          # => Status.new(1)
      #   Status.coerce(true)         # => raises ArgumentError
      #
      # @raise [ArgumentError] if coercion is impossible
      # @param [String, Symbol, Numeric] object
      # @return [Status]
      def coerce(object)
        code = case object
               when String  then SYMBOL_CODES[symbolize object]
               when Symbol  then SYMBOL_CODES[object]
               when Numeric then object.to_i
               else nil
               end

        return new code if code

        fail ArgumentError, "Can't coerce #{object.class}(#{object}) to #{self}"
      end
      alias_method :[], :coerce

    private

      # Symbolizes given string
      #
      # @example
      #
      #   symbolize "HELLO" # => :hello
      #
      # @param [#to_s] str
      # @return [Symbol]
      def symbolize(str)
        str.to_s.downcase.to_sym
      end
    end

    # Code to Symbol map
    #
    # @example Usage
    #
    #   SYMBOLS[1] # => :hello
    #   SYMBOLS[2] # => :welcome
    #
    # @return [Hash<Fixnum => Symbol>]
    SYMBOLS = Hash[REASON.map { |k, v| [k, symbolize(v)] }].freeze

    # Reversed {SYMBOLS} map.
    #
    # @example Usage
    #
    #   SYMBOL_CODES[:hello]    # => 1
    #   SYMBOL_CODES[:welcome]  # => 2
    #
    # @return [Hash<Symbol => Fixnum>]
    SYMBOL_CODES = Hash[SYMBOLS.map { |k, v| [v, k] }].freeze

    # Status code
    #
    # @return [Fixnum]
    attr_reader :code

    if RUBY_VERSION < '1.9.0'
      # @param [#to_i] code
      def initialize(code)
        super __setobj__ code
      end
    end

    # Status message
    #
    # @return [nil] unless code is well-known (see REASON)
    # @return [String]
    def reason
      REASON[code]
    end

    # Symbolized {#reason}
    #
    # @return [nil] unless code is well-known (see REASON)
    # @return [Symbol]
    def symbolize
      SYMBOLS[code]
    end

    # Printable version of HTTP Status, surrounded by quote marks,
    # with special characters escaped.
    #
    # (see String#inspect)
    def inspect
      "#{code} #{reason}".inspect
    end

    SYMBOLS.each do |code, symbol|
      class_eval <<-RUBY, __FILE__, __LINE__
        def #{symbol}?      # def hello?
          #{code} == code   # 1 == code
        end                 # end
      RUBY
    end

    def __setobj__(obj)
      @code = obj.to_i
    end

    def __getobj__
      @code
    end
  end
end

module Wamp
  class StaticResource < Webmachine::Resource
    include Consts
    def last_modified
      File.mtime(__FILE__)
    end

    def to_html
<<-HTML
<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <title>Reel WebSockets time server example</title>
  <style>
    body {
      font-family: "HelveticaNeue-Light", "Helvetica Neue Light", "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif;
      font-weight: 300;
      text-align: center;
    }

    #content {
      width: 800px;
      margin: 0 auto;
      background: #EEEEEE;
      padding: 1em;
    }
  </style>
  <!-- github isn't a cdn, don't use this in production code -->
  <script src="https://raw.githubusercontent.com/KSDaemon/wampy.js/dev/build/wampy-all.min.js"></script>
  <script>
  var ws = new Wampy('/stream', {
    realm: '#{REALM}',
    onConnect: function() {
      ws.subscribe('#{REALM}.time_update', function(data) {
        document.getElementById('current-time').innerHTML = new Date(data * 1000.0).toISOString();
      });
    }
  });
  </script>
</head>
<body>
  <div id="content">
    <h1>Time Server Example</h1>
    <div>The time is now: <span id="current-time">...</span></div>
  </div>
</body>
</html>
HTML
    end
  end
end

module Wamp
  class WebSocketHandler
    extend Forwardable
    include Celluloid::Logger
    include Celluloid
    include Celluloid::Notifications
    include Celluloid::FSM
    include Consts

    class Call < Struct.new(:peer_id, :options, :procedure, :arguments, :argumentskw) ; end

    def_delegators :@prng, :rand
    def_delegators :@driver, :parse, :protocol

    execute_block_on_receiver :initialize
    finalizer :disconnect

    default_state :idle

    state :idle,                to: [:send_website, :websocket_start, :disconnect]
    state :send_website,        to: [:disconnect]
    state :websocket_start,     to: [:websocket_connected, :disconnect]
    state :websocket_connected, to: [:wamp_abort, :wamp_welcome, :websocket_close, :disconnect]
    state :wamp_abort,          to: [:websocket_close, :disconnect]
    state :wamp_welcome,        to: [:wamp_connected, :websocket_close, :disconnect]
    state :wamp_connected,      to: [:wamp_goodbye, :wamp_goodbye_reply, :websocket_close, :disconnect]
    state :websocket_close,     to: [:disconnect]
    state :disconnect,          to: [:disconnected]
    state :disconnected

    state :send_website do
      debug state
      begin
        rack_adapter = Webmachine::Adapters::Rack.new(Webmachine.application)
        @driver.env[RACK_URL_SCHEME] = HTTP
        status, headers, body = rack_adapter.call(@driver.env)
        response = "#{HTTP_11} #{status}#{CRLF}"
        headers.each do |k, v|
          response << "#{k}: #{v}#{CRLF}"
        end
        response << CRLF
        @socket.write response << body.join
        transition :disconnect
      rescue IOError, Errno::EPIPE, Errno::ECONNRESET
        transition :disconnect
      end
    end

    state :websocket_start do
      debug state
      begin
        @driver.start
        transition :websocket_connected
      rescue IOError, Errno::EPIPE, Errno::ECONNRESET
        transition :disconnect
      end
    end

    state :websocket_connected do
      debug state
      async.send_ping
    end

    state :wamp_abort do
      debug state
      begin
        @driver.frame(ABORT_MSG)
        transition :websocket_close
      rescue IOError, Errno::EPIPE, Errno::ECONNRESET
        transition :disconnect
      end
    end

    state :wamp_welcome do
      debug state
      begin
        @peer_id = generate_id
        msg = JSON.generate([Status[:welcome], @peer_id, @roles])
        debug msg
        @driver.frame(msg)
        transition :wamp_connected
      rescue IOError, Errno::EPIPE, Errno::ECONNRESET
        transition :disconnect
      end
    end

    state :wamp_connected do
      debug state
      async.heartbeat
    end

    state :wamp_goodbye do
      debug state
      begin
        @driver.frame(JSON.generate([Status[:goodbye], EMPTY_DICT, GOODBYE_MSG]))
        transition :websocket_close
      rescue IOError, Errno::EPIPE, Errno::ECONNRESET
        transition :disconnect
      end
    end

    state :wamp_goodbye_reply do
      debug state
      begin
        @driver.frame(JSON.generate([Status[:goodbye], EMPTY_DICT, GOODBYE_REPLY_MSG]))
        transition :websocket_close
      rescue IOError, Errno::EPIPE, Errno::ECONNRESET
        transition :disconnect
      end
    end

    state :websocket_close do
      debug state
      begin
        @driver.close
        transition :disconnect
      rescue IOError, Errno::EPIPE, Errno::ECONNRESET
        transition :disconnect
      end
    end

    state :disconnect do
      debug state
      if @socket
        @socket.close unless @socket.closed?
      end
      transition :disconnected
    end

    state :disconnected do
      debug state
      terminate rescue nil
    end

    def initialize(socket, options = {}, &callback)
      super()
      @out_hb_number, @inc_hb_number, @peer_id = 0, 0, 0
      @latency = 0.0
      @subscriptions = {}
      @prng = Random.new
      @socket = socket
      @driver = WebSocket::Driver.server(@socket, options)
      @driver.on(:connect) do
        if WebSocket::Driver.websocket? @driver.env
          transition :websocket_start
        else
          transition :send_website
        end
      end
      @driver.on(:open) do
        unless protocol
          transition :websocket_close
        end
      end
      @driver.on(:error) {|e| error e}
      @driver.on(:close) {|e| debug e; disconnect}
      @driver.on(:message) {|e| parse_message(e.data)}
      @callback = callback
      if @callback
        @roles = ROLES
      else
        @roles = {roles: {broker: EMPTY_DICT}.freeze}.freeze
      end
    end

    def wampify_message(message)
      case message
      when Array then message
      when String, Numeric, TrueClass, FalseClass, NilClass, Symbol then [message]
      when Hash then [EMPTY_LIST, message]
      when IO, StringIO
        message.rewind if message.respond_to?(:rewind)
        [message.read]
      else
        if message.respond_to?(:to_ary)
          message.to_ary
        elsif message.respond_to?(:to_hash)
          [EMPTY_LIST, message.to_hash]
        elsif message.respond_to?(:to_h)
          [EMPTY_LIST, message.to_h]
        elsif message.respond_to?(:to_a)
          message.to_a
        elsif message.respond_to?(:to_io)
          message.to_io.rewind if message.to_io.respond_to?(:rewind)
          [message.to_io.read]
        elsif (io_message = IO.try_convert(message))
          io_message.rewind if io_message.respond_to?(:rewind)
          [io_message.read]
        else
          fail ArgumentError, "Don't know how to publish #{message.class}"
        end
      end
    end

    def generate_id
      rand(TWO_POW_53)
    end

    def parse_message(msg)
      debug "parse: #{msg}"
      message = JSON.parse(msg)
      if message.is_a?(Array) && message[0].is_a?(Numeric) && message.length >= 2
        case state
        when :websocket_connected
          if Status[message[0]].hello? && message[1] == REALM
            transition :wamp_welcome
          else
            transition :wamp_abort
          end
        when :wamp_connected
          # Legitimate message handling happens here, the rest is just error management.
          case Status[message[0]].symbolize
          when :hello
            transition :wamp_goodbye
          when :subscribe
            # message[3] is the topic uri
            if message[3] =~ URI_MATCH && message[3].start_with?(REALM)
              subscription_id = generate_id
              @subscriptions[message[3]] ||= []
              @subscriptions[message[3]] << subscription_id
              @subscriptions[subscription_id] = subscribe(message[3], :subscribe_handler)
              write(JSON.generate([Status[:subscribed], Integer(message[1]), subscription_id]))
            else
              transition :wamp_goodbye
            end
          when :unsubscribe
            # message[2] is the subscription_id from above
            if message[2]
              if unsubscribe_handler(Integer(message[2]))
                # message[1] is a randomly generated id from the client
                write(JSON.generate([Status[:unsubscribed], Integer(message[1])]))
              else
                transition :wamp_goodbye
              end
            else
              transition :wamp_goodbye
            end
          when :heartbeat
            if message[2]
              @inc_hb_number = Integer(message[2])
            else
              transition :wamp_goodbye
            end
          when :goodbye
            case message[2]
            when GOODBYE_REPLY_MSG
              transition :websocket_close
            when GOODBYE_MSG
              transition :wamp_goodbye_reply
            else
              transition :wamp_goodbye
            end
          when :call
            if @callback
              if message[3] =~ URI_MATCH && message[3].start_with?(REALM)
                result = @callback.call(Call.new(@peer_id, message[2], message[3], message[4], message[5]))

                base_res = [Status[:result], Integer(message[1]), EMPTY_DICT]
                write(JSON.generate(base_res << wampify_message(result)))
              else
                transition :wamp_goodbye
              end
            else
              transition :wamp_goodbye
            end
          else
            debug "wamp_connected_status: #{Status[message[0]].symbolize}"
            transition :wamp_goodbye
          end
        else
          debug state
          transition :disconnect
        end
      else
        disconnect
      end
    rescue JSON::ParserError, ArgumentError
      disconnect
    end

    def subscribe_handler(topic, message)
      if @subscriptions[topic]
        msg = wampify_message(message)
        @subscriptions[topic].each do |subscriber|
          base_msg = [Status[:event], subscriber, generate_id, EMPTY_DICT]
          write JSON.generate(base_msg << msg)
        end
      end
    end

    def unsubscribe_handler(subscription_id)
      if subscription = @subscriptions.delete(subscription_id)
        @subscriptions[subscription.pattern].delete(subscription_id)
        if @subscriptions[subscription.pattern].length == 0
          @subscriptions.delete(subscription.pattern)
        end
        unsubscribe(subscription)
        true
      else
        false
      end
    end

    def write(msg = '', type = nil, code = nil)
      debug "write: #{msg} #{type} #{code}"
      @driver.frame(msg, type, code)
    rescue IOError, Errno::EPIPE, Errno::ECONNRESET
      transition :disconnect
    end

    def disconnect
      debug "disconnect"
      case state
      when :wamp_connected
        transition :wamp_goodbye
      when :websocket_connected
        transition :wamp_abort
      when :disconnect, :disconnected
      else
        debug "disconnect: #{state}"
        transition :disconnect
      end
    end

    private

    def heartbeat
      every(15) {
        async.write(JSON.generate([Status[:heartbeat], @inc_hb_number, @out_hb_number += 1]))
      }
    end

    def send_ping
      every(5) {
        async.ping
      }
    end

    def ping
      id = "#{generate_id}"
      info "Sending ping: #{id}"
      start = Time.now.to_f
      @driver.ping(id) do
        @latency = Time.now.to_f - start
        info 'Recieved ping'
        info "Latency to #{@socket.peeraddr}: #{@latency}"
      end
    end
  end
end

class Server
  include Celluloid::Logger
  include Celluloid::IO
  include Celluloid::Notifications
  include Wamp::Consts

  execute_block_on_receiver :initialize

  finalizer :shutdown
  trap_exit :crash_notifier

  def initialize(&callback)
    @callback = callback
    @server = TCPServer.new('0.0.0.0', 8080)
    async.time_notifier
    async.run
  end

  def handle_connection(socket)
    websocket = Wamp::WebSocketHandler.new(socket, max_length: ONE_MEGABYTE, protocols: WEBSOCKET_PROTOS, &@callback)

    while websocket.alive? do
      begin
        buffer = socket.readpartial(BUFFER_SIZE)
        if websocket.alive?
          websocket.parse(buffer)
        else
          break
        end
      rescue IOError, Errno::EPIPE, Errno::ECONNRESET
        break
      rescue
        break
      end
    end
  ensure
    websocket.disconnect rescue nil
  end

  def crash_notifier(actor, reason)
    debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
  end

  def shutdown
    @server.close if @server
  end

  private

  def time_notifier
    every(1.0) {
      publish("#{REALM}.time_update", Time.now.to_f)
    }
  end

  def run
    loop { async.handle_connection(@server.accept) }
  end
end

Webmachine.application.routes do
  add_route ['*'], Wamp::StaticResource
end

Server.run do |call|
  'world'
end

以上是关于ruby websocket_celluloid_server.rb的主要内容,如果未能解决你的问题,请参考以下文章

Ruby 脚本可以告诉它在哪个目录中吗?

Ruby:无法加载此类文件 - LoadError

Ruby 命名空间

ruby sieve_1_prime_17.rb

ruby sieve_1_prime_13.rb

ruby sieve_1_prime_11.rb