Class: Simultaneous::AsyncClient

Inherits:
Object
  • Object
show all
Defined in:
lib/simultaneous/async_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(domain, connection_string, &block) ⇒ AsyncClient

Returns a new instance of AsyncClient.



11
12
13
14
15
16
17
# File 'lib/simultaneous/async_client.rb', line 11

def initialize(domain, connection_string, &block)
  @connection = Simultaneous::Connection.new(connection_string)
  @domain = domain
  @callbacks = []
  @socket = nil
  connect
end

Instance Attribute Details

#domainObject (readonly)

Returns the value of attribute domain.



9
10
11
# File 'lib/simultaneous/async_client.rb', line 9

def domain
  @domain
end

Instance Method Details

#attempt_reconnectObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/simultaneous/async_client.rb', line 44

def attempt_reconnect
  @socket = nil
  @reconnect_timer = EM::PeriodicTimer.new(1) do
    $stderr.puts "#{Time.now} Attempting reconnect"
    begin
      connect
      EM.cancel_timer(@reconnect_timer)
      @reconnect_timer = nil
      $stderr.puts "#{Time.now} Re-connection successful"
    rescue => e
      $stderr.puts "#{Time.now} Reconnection failed `#{e}`"
    end
  end
end

#closeObject



59
60
61
# File 'lib/simultaneous/async_client.rb', line 59

def close
  @socket.close_connection_after_writing if @socket
end

#connectObject



78
79
80
81
82
83
84
85
86
# File 'lib/simultaneous/async_client.rb', line 78

def connect
  event_machine do
    # EventMachine.connect(*Simultaneous.parse_connection(@connection_string), handler) do |conn|
    @connection.async_socket(handler) do |conn|
      conn.client = self
      @socket = conn
    end
  end
end

#connection(&callback) ⇒ Object



74
75
76
# File 'lib/simultaneous/async_client.rb', line 74

def connection(&callback)
  callback.call(@socket) if @socket
end

#event_machine(&block) ⇒ Object



114
115
116
117
118
119
120
121
# File 'lib/simultaneous/async_client.rb', line 114

def event_machine(&block)
  if EM.reactor_running?
    block.call
  else
    Thread.new { EM.run }
    EM.next_tick { block.call }
  end
end

#handlerObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/simultaneous/async_client.rb', line 19

def handler
  handler = Class.new(EventMachine::Connection) do
    include EventMachine::Protocols::LineText2

    def client=(client); @client = client end
    def client; @client end

    def connection_completed; end

    def receive_line(line)
      client.receive(line)
    end

    def unbind
      $stderr.puts "#{Time.now} Client Connection closed\n"
      client.reconnect!
    end
  end
  handler
end

#notify!Object



97
98
99
100
101
102
103
104
# File 'lib/simultaneous/async_client.rb', line 97

def notify!
  if @message.valid? and @message.domain == @domain
    subscribers[@message.event].each do |subscriber|
      subscriber.call(@message)
    end
  end
  @message = nil
end

#on_event(event, &block) ⇒ Object



110
111
112
# File 'lib/simultaneous/async_client.rb', line 110

def on_event(event, &block)
  subscribers[event.to_s] << block
end

#receive(data) ⇒ Object



88
89
90
91
92
93
94
95
# File 'lib/simultaneous/async_client.rb', line 88

def receive(data)
  if data == ""
    notify! if @message
  else
    @message ||= Simultaneous::BroadcastMessage.new
    @message << data
  end
end

#reconnect!Object



40
41
42
# File 'lib/simultaneous/async_client.rb', line 40

def reconnect!
  attempt_reconnect
end

#run(command) ⇒ Object



63
64
65
66
# File 'lib/simultaneous/async_client.rb', line 63

def run(command)
  command.domain = self.domain
  send(command.dump)
end

#send(message) ⇒ Object



68
69
70
71
72
# File 'lib/simultaneous/async_client.rb', line 68

def send(message)
  connection do |c|
    c.send_data(message)
  end
end

#subscribersObject



106
107
108
# File 'lib/simultaneous/async_client.rb', line 106

def subscribers
  @subscribers ||= Hash.new { |hash, key| hash[key] = [] }
end