ruby websocket_celluloid_server.rb
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ruby websocket_celluloid_server.rb相关的知识,希望对你有一定的参考价值。
require 'json'
require 'active_support/core_ext/numeric/bytes'
require 'delegate'
require 'forwardable'
require 'time'
require 'celluloid/io'
require 'websocket/driver'
require 'webmachine'
require 'webmachine/adapters/rack'
require 'celluloid/autostart'
module Wamp
module Consts
ABORT_MSG = '[3,{},"wamp.error.system_shutdown"]'
GOODBYE_MSG = 'wamp.error.system_shutdown'.freeze
GOODBYE_REPLY_MSG = 'wamp.error.goodbye_and_out'.freeze
WEBSOCKET_PROTOS = ['wamp.2.json'.freeze].freeze
REALM = 'com.example'.freeze
ROLES = {roles: {broker: {}.freeze, dealer: {}.freeze}.freeze}.freeze
TIMEOUT = (5).freeze
URI_MATCH = %r{^(([0-9a-z_-]{2,}\.)|\.)*([0-9a-z_-]{2,})?$}i.freeze
TWO_POW_53 = (2**53).freeze
ONE_MEGABYTE = (1.megabyte).freeze
EMPTY_DICT = {}.freeze
EMPTY_LIST = [].freeze
HTTP_11 = 'HTTP/1.1'.freeze
CRLF = "\r\n".freeze
RACK_URL_SCHEME = 'rack.url_scheme'.freeze
HTTP = 'http'.freeze
BUFFER_SIZE = (16384).freeze
end
end
module Wamp
class Status < ::Delegator
REASON = {
1 => 'HELLO',
2 => 'WELCOME',
3 => 'ABORT',
4 => 'CHALLENGE',
5 => 'AUTHENTICATE',
6 => 'GOODBYE',
7 => 'HEARTBEAT',
8 => 'ERROR',
16 => 'PUBLISH',
17 => 'PUBLISHED',
32 => 'SUBSCRIBE',
33 => 'SUBSCRIBED',
34 => 'UNSUBSCRIBE',
35 => 'UNSUBSCRIBED',
36 => 'EVENT',
48 => 'CALL',
49 => 'CANCEL',
50 => 'RESULT',
64 => 'REGISTER',
65 => 'REGISTERED',
66 => 'UNREGISTER',
67 => 'UNREGISTERED',
68 => 'INVOCATION',
69 => 'INTERRUPT',
70 => 'YIELD'
}.each { |_, v| v.freeze }.freeze
class << self
# Coerces given value to Status.
#
# @example
#
# Status.coerce("hello") # => Status.new(1)
# Status.coerce(:hello) # => Status.new(1)
# Status.coerce(1.0) # => Status.new(1)
# Status.coerce(true) # => raises ArgumentError
#
# @raise [ArgumentError] if coercion is impossible
# @param [String, Symbol, Numeric] object
# @return [Status]
def coerce(object)
code = case object
when String then SYMBOL_CODES[symbolize object]
when Symbol then SYMBOL_CODES[object]
when Numeric then object.to_i
else nil
end
return new code if code
fail ArgumentError, "Can't coerce #{object.class}(#{object}) to #{self}"
end
alias_method :[], :coerce
private
# Symbolizes given string
#
# @example
#
# symbolize "HELLO" # => :hello
#
# @param [#to_s] str
# @return [Symbol]
def symbolize(str)
str.to_s.downcase.to_sym
end
end
# Code to Symbol map
#
# @example Usage
#
# SYMBOLS[1] # => :hello
# SYMBOLS[2] # => :welcome
#
# @return [Hash<Fixnum => Symbol>]
SYMBOLS = Hash[REASON.map { |k, v| [k, symbolize(v)] }].freeze
# Reversed {SYMBOLS} map.
#
# @example Usage
#
# SYMBOL_CODES[:hello] # => 1
# SYMBOL_CODES[:welcome] # => 2
#
# @return [Hash<Symbol => Fixnum>]
SYMBOL_CODES = Hash[SYMBOLS.map { |k, v| [v, k] }].freeze
# Status code
#
# @return [Fixnum]
attr_reader :code
if RUBY_VERSION < '1.9.0'
# @param [#to_i] code
def initialize(code)
super __setobj__ code
end
end
# Status message
#
# @return [nil] unless code is well-known (see REASON)
# @return [String]
def reason
REASON[code]
end
# Symbolized {#reason}
#
# @return [nil] unless code is well-known (see REASON)
# @return [Symbol]
def symbolize
SYMBOLS[code]
end
# Printable version of HTTP Status, surrounded by quote marks,
# with special characters escaped.
#
# (see String#inspect)
def inspect
"#{code} #{reason}".inspect
end
SYMBOLS.each do |code, symbol|
class_eval <<-RUBY, __FILE__, __LINE__
def #{symbol}? # def hello?
#{code} == code # 1 == code
end # end
RUBY
end
def __setobj__(obj)
@code = obj.to_i
end
def __getobj__
@code
end
end
end
module Wamp
class StaticResource < Webmachine::Resource
include Consts
def last_modified
File.mtime(__FILE__)
end
def to_html
<<-HTML
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Reel WebSockets time server example</title>
<style>
body {
font-family: "HelveticaNeue-Light", "Helvetica Neue Light", "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif;
font-weight: 300;
text-align: center;
}
#content {
width: 800px;
margin: 0 auto;
background: #EEEEEE;
padding: 1em;
}
</style>
<!-- github isn't a cdn, don't use this in production code -->
<script src="https://raw.githubusercontent.com/KSDaemon/wampy.js/dev/build/wampy-all.min.js"></script>
<script>
var ws = new Wampy('/stream', {
realm: '#{REALM}',
onConnect: function() {
ws.subscribe('#{REALM}.time_update', function(data) {
document.getElementById('current-time').innerHTML = new Date(data * 1000.0).toISOString();
});
}
});
</script>
</head>
<body>
<div id="content">
<h1>Time Server Example</h1>
<div>The time is now: <span id="current-time">...</span></div>
</div>
</body>
</html>
HTML
end
end
end
module Wamp
class WebSocketHandler
extend Forwardable
include Celluloid::Logger
include Celluloid
include Celluloid::Notifications
include Celluloid::FSM
include Consts
class Call < Struct.new(:peer_id, :options, :procedure, :arguments, :argumentskw) ; end
def_delegators :@prng, :rand
def_delegators :@driver, :parse, :protocol
execute_block_on_receiver :initialize
finalizer :disconnect
default_state :idle
state :idle, to: [:send_website, :websocket_start, :disconnect]
state :send_website, to: [:disconnect]
state :websocket_start, to: [:websocket_connected, :disconnect]
state :websocket_connected, to: [:wamp_abort, :wamp_welcome, :websocket_close, :disconnect]
state :wamp_abort, to: [:websocket_close, :disconnect]
state :wamp_welcome, to: [:wamp_connected, :websocket_close, :disconnect]
state :wamp_connected, to: [:wamp_goodbye, :wamp_goodbye_reply, :websocket_close, :disconnect]
state :websocket_close, to: [:disconnect]
state :disconnect, to: [:disconnected]
state :disconnected
state :send_website do
debug state
begin
rack_adapter = Webmachine::Adapters::Rack.new(Webmachine.application)
@driver.env[RACK_URL_SCHEME] = HTTP
status, headers, body = rack_adapter.call(@driver.env)
response = "#{HTTP_11} #{status}#{CRLF}"
headers.each do |k, v|
response << "#{k}: #{v}#{CRLF}"
end
response << CRLF
@socket.write response << body.join
transition :disconnect
rescue IOError, Errno::EPIPE, Errno::ECONNRESET
transition :disconnect
end
end
state :websocket_start do
debug state
begin
@driver.start
transition :websocket_connected
rescue IOError, Errno::EPIPE, Errno::ECONNRESET
transition :disconnect
end
end
state :websocket_connected do
debug state
async.send_ping
end
state :wamp_abort do
debug state
begin
@driver.frame(ABORT_MSG)
transition :websocket_close
rescue IOError, Errno::EPIPE, Errno::ECONNRESET
transition :disconnect
end
end
state :wamp_welcome do
debug state
begin
@peer_id = generate_id
msg = JSON.generate([Status[:welcome], @peer_id, @roles])
debug msg
@driver.frame(msg)
transition :wamp_connected
rescue IOError, Errno::EPIPE, Errno::ECONNRESET
transition :disconnect
end
end
state :wamp_connected do
debug state
async.heartbeat
end
state :wamp_goodbye do
debug state
begin
@driver.frame(JSON.generate([Status[:goodbye], EMPTY_DICT, GOODBYE_MSG]))
transition :websocket_close
rescue IOError, Errno::EPIPE, Errno::ECONNRESET
transition :disconnect
end
end
state :wamp_goodbye_reply do
debug state
begin
@driver.frame(JSON.generate([Status[:goodbye], EMPTY_DICT, GOODBYE_REPLY_MSG]))
transition :websocket_close
rescue IOError, Errno::EPIPE, Errno::ECONNRESET
transition :disconnect
end
end
state :websocket_close do
debug state
begin
@driver.close
transition :disconnect
rescue IOError, Errno::EPIPE, Errno::ECONNRESET
transition :disconnect
end
end
state :disconnect do
debug state
if @socket
@socket.close unless @socket.closed?
end
transition :disconnected
end
state :disconnected do
debug state
terminate rescue nil
end
def initialize(socket, options = {}, &callback)
super()
@out_hb_number, @inc_hb_number, @peer_id = 0, 0, 0
@latency = 0.0
@subscriptions = {}
@prng = Random.new
@socket = socket
@driver = WebSocket::Driver.server(@socket, options)
@driver.on(:connect) do
if WebSocket::Driver.websocket? @driver.env
transition :websocket_start
else
transition :send_website
end
end
@driver.on(:open) do
unless protocol
transition :websocket_close
end
end
@driver.on(:error) {|e| error e}
@driver.on(:close) {|e| debug e; disconnect}
@driver.on(:message) {|e| parse_message(e.data)}
@callback = callback
if @callback
@roles = ROLES
else
@roles = {roles: {broker: EMPTY_DICT}.freeze}.freeze
end
end
def wampify_message(message)
case message
when Array then message
when String, Numeric, TrueClass, FalseClass, NilClass, Symbol then [message]
when Hash then [EMPTY_LIST, message]
when IO, StringIO
message.rewind if message.respond_to?(:rewind)
[message.read]
else
if message.respond_to?(:to_ary)
message.to_ary
elsif message.respond_to?(:to_hash)
[EMPTY_LIST, message.to_hash]
elsif message.respond_to?(:to_h)
[EMPTY_LIST, message.to_h]
elsif message.respond_to?(:to_a)
message.to_a
elsif message.respond_to?(:to_io)
message.to_io.rewind if message.to_io.respond_to?(:rewind)
[message.to_io.read]
elsif (io_message = IO.try_convert(message))
io_message.rewind if io_message.respond_to?(:rewind)
[io_message.read]
else
fail ArgumentError, "Don't know how to publish #{message.class}"
end
end
end
def generate_id
rand(TWO_POW_53)
end
def parse_message(msg)
debug "parse: #{msg}"
message = JSON.parse(msg)
if message.is_a?(Array) && message[0].is_a?(Numeric) && message.length >= 2
case state
when :websocket_connected
if Status[message[0]].hello? && message[1] == REALM
transition :wamp_welcome
else
transition :wamp_abort
end
when :wamp_connected
# Legitimate message handling happens here, the rest is just error management.
case Status[message[0]].symbolize
when :hello
transition :wamp_goodbye
when :subscribe
# message[3] is the topic uri
if message[3] =~ URI_MATCH && message[3].start_with?(REALM)
subscription_id = generate_id
@subscriptions[message[3]] ||= []
@subscriptions[message[3]] << subscription_id
@subscriptions[subscription_id] = subscribe(message[3], :subscribe_handler)
write(JSON.generate([Status[:subscribed], Integer(message[1]), subscription_id]))
else
transition :wamp_goodbye
end
when :unsubscribe
# message[2] is the subscription_id from above
if message[2]
if unsubscribe_handler(Integer(message[2]))
# message[1] is a randomly generated id from the client
write(JSON.generate([Status[:unsubscribed], Integer(message[1])]))
else
transition :wamp_goodbye
end
else
transition :wamp_goodbye
end
when :heartbeat
if message[2]
@inc_hb_number = Integer(message[2])
else
transition :wamp_goodbye
end
when :goodbye
case message[2]
when GOODBYE_REPLY_MSG
transition :websocket_close
when GOODBYE_MSG
transition :wamp_goodbye_reply
else
transition :wamp_goodbye
end
when :call
if @callback
if message[3] =~ URI_MATCH && message[3].start_with?(REALM)
result = @callback.call(Call.new(@peer_id, message[2], message[3], message[4], message[5]))
base_res = [Status[:result], Integer(message[1]), EMPTY_DICT]
write(JSON.generate(base_res << wampify_message(result)))
else
transition :wamp_goodbye
end
else
transition :wamp_goodbye
end
else
debug "wamp_connected_status: #{Status[message[0]].symbolize}"
transition :wamp_goodbye
end
else
debug state
transition :disconnect
end
else
disconnect
end
rescue JSON::ParserError, ArgumentError
disconnect
end
def subscribe_handler(topic, message)
if @subscriptions[topic]
msg = wampify_message(message)
@subscriptions[topic].each do |subscriber|
base_msg = [Status[:event], subscriber, generate_id, EMPTY_DICT]
write JSON.generate(base_msg << msg)
end
end
end
def unsubscribe_handler(subscription_id)
if subscription = @subscriptions.delete(subscription_id)
@subscriptions[subscription.pattern].delete(subscription_id)
if @subscriptions[subscription.pattern].length == 0
@subscriptions.delete(subscription.pattern)
end
unsubscribe(subscription)
true
else
false
end
end
def write(msg = '', type = nil, code = nil)
debug "write: #{msg} #{type} #{code}"
@driver.frame(msg, type, code)
rescue IOError, Errno::EPIPE, Errno::ECONNRESET
transition :disconnect
end
def disconnect
debug "disconnect"
case state
when :wamp_connected
transition :wamp_goodbye
when :websocket_connected
transition :wamp_abort
when :disconnect, :disconnected
else
debug "disconnect: #{state}"
transition :disconnect
end
end
private
def heartbeat
every(15) {
async.write(JSON.generate([Status[:heartbeat], @inc_hb_number, @out_hb_number += 1]))
}
end
def send_ping
every(5) {
async.ping
}
end
def ping
id = "#{generate_id}"
info "Sending ping: #{id}"
start = Time.now.to_f
@driver.ping(id) do
@latency = Time.now.to_f - start
info 'Recieved ping'
info "Latency to #{@socket.peeraddr}: #{@latency}"
end
end
end
end
class Server
include Celluloid::Logger
include Celluloid::IO
include Celluloid::Notifications
include Wamp::Consts
execute_block_on_receiver :initialize
finalizer :shutdown
trap_exit :crash_notifier
def initialize(&callback)
@callback = callback
@server = TCPServer.new('0.0.0.0', 8080)
async.time_notifier
async.run
end
def handle_connection(socket)
websocket = Wamp::WebSocketHandler.new(socket, max_length: ONE_MEGABYTE, protocols: WEBSOCKET_PROTOS, &@callback)
while websocket.alive? do
begin
buffer = socket.readpartial(BUFFER_SIZE)
if websocket.alive?
websocket.parse(buffer)
else
break
end
rescue IOError, Errno::EPIPE, Errno::ECONNRESET
break
rescue
break
end
end
ensure
websocket.disconnect rescue nil
end
def crash_notifier(actor, reason)
debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
end
def shutdown
@server.close if @server
end
private
def time_notifier
every(1.0) {
publish("#{REALM}.time_update", Time.now.to_f)
}
end
def run
loop { async.handle_connection(@server.accept) }
end
end
Webmachine.application.routes do
add_route ['*'], Wamp::StaticResource
end
Server.run do |call|
'world'
end
以上是关于ruby websocket_celluloid_server.rb的主要内容,如果未能解决你的问题,请参考以下文章