Class: Simultaneous::AsyncClient
- Inherits:
-
Object
- Object
- Simultaneous::AsyncClient
- Defined in:
- lib/simultaneous/async_client.rb
Instance Attribute Summary collapse
-
#domain ⇒ Object
readonly
Returns the value of attribute domain.
Instance Method Summary collapse
- #attempt_reconnect ⇒ Object
- #close ⇒ Object
- #connect ⇒ Object
- #connection(&callback) ⇒ Object
- #event_machine(&block) ⇒ Object
- #handler ⇒ Object
-
#initialize(domain, connection_string, &block) ⇒ AsyncClient
constructor
A new instance of AsyncClient.
- #notify! ⇒ Object
- #on_event(event, &block) ⇒ Object
- #receive(data) ⇒ Object
- #reconnect! ⇒ Object
- #run(command) ⇒ Object
- #send(message) ⇒ Object
- #subscribers ⇒ Object
Constructor Details
#initialize(domain, connection_string, &block) ⇒ AsyncClient
Returns a new instance of AsyncClient.
11 12 13 14 15 16 17 |
# File 'lib/simultaneous/async_client.rb', line 11 def initialize(domain, connection_string, &block) @connection = Simultaneous::Connection.new(connection_string) @domain = domain @callbacks = [] @socket = nil connect end |
Instance Attribute Details
#domain ⇒ Object (readonly)
Returns the value of attribute domain.
9 10 11 |
# File 'lib/simultaneous/async_client.rb', line 9 def domain @domain end |
Instance Method Details
#attempt_reconnect ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/simultaneous/async_client.rb', line 44 def attempt_reconnect @socket = nil @reconnect_timer = EM::PeriodicTimer.new(1) do $stderr.puts "#{Time.now} Attempting reconnect" begin connect EM.cancel_timer(@reconnect_timer) @reconnect_timer = nil $stderr.puts "#{Time.now} Re-connection successful" rescue => e $stderr.puts "#{Time.now} Reconnection failed `#{e}`" end end end |
#close ⇒ Object
59 60 61 |
# File 'lib/simultaneous/async_client.rb', line 59 def close @socket.close_connection_after_writing if @socket end |
#connect ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/simultaneous/async_client.rb', line 78 def connect event_machine do # EventMachine.connect(*Simultaneous.parse_connection(@connection_string), handler) do |conn| @connection.async_socket(handler) do |conn| conn.client = self @socket = conn end end end |
#connection(&callback) ⇒ Object
74 75 76 |
# File 'lib/simultaneous/async_client.rb', line 74 def connection(&callback) callback.call(@socket) if @socket end |
#event_machine(&block) ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/simultaneous/async_client.rb', line 114 def event_machine(&block) if EM.reactor_running? block.call else Thread.new { EM.run } EM.next_tick { block.call } end end |
#handler ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/simultaneous/async_client.rb', line 19 def handler handler = Class.new(EventMachine::Connection) do include EventMachine::Protocols::LineText2 def client=(client); @client = client end def client; @client end def connection_completed; end def receive_line(line) client.receive(line) end def unbind $stderr.puts "#{Time.now} Client Connection closed\n" client.reconnect! end end handler end |
#notify! ⇒ Object
97 98 99 100 101 102 103 104 |
# File 'lib/simultaneous/async_client.rb', line 97 def notify! if @message.valid? and @message.domain == @domain subscribers[@message.event].each do |subscriber| subscriber.call(@message) end end @message = nil end |
#on_event(event, &block) ⇒ Object
110 111 112 |
# File 'lib/simultaneous/async_client.rb', line 110 def on_event(event, &block) subscribers[event.to_s] << block end |
#receive(data) ⇒ Object
88 89 90 91 92 93 94 95 |
# File 'lib/simultaneous/async_client.rb', line 88 def receive(data) if data == "" notify! if @message else @message ||= Simultaneous::BroadcastMessage.new @message << data end end |
#reconnect! ⇒ Object
40 41 42 |
# File 'lib/simultaneous/async_client.rb', line 40 def reconnect! attempt_reconnect end |
#run(command) ⇒ Object
63 64 65 66 |
# File 'lib/simultaneous/async_client.rb', line 63 def run(command) command.domain = self.domain send(command.dump) end |
#send(message) ⇒ Object
68 69 70 71 72 |
# File 'lib/simultaneous/async_client.rb', line 68 def send() connection do |c| c.send_data() end end |
#subscribers ⇒ Object
106 107 108 |
# File 'lib/simultaneous/async_client.rb', line 106 def subscribers @subscribers ||= Hash.new { |hash, key| hash[key] = [] } end |