Class: Deepstream::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Celluloid, Celluloid::Internals::Logger
Defined in:
lib/deepstream/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, options = {}) ⇒ Client

Returns a new instance of Client.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/deepstream/client.rb', line 25

def initialize(url, options = {})
  @url = Helpers.url(url)
  @error_handler = ErrorHandler.new(self)
  @record_handler = RecordHandler.new(self)
  @event_handler = EventHandler.new(self)
  @options = Helpers.default_options.merge!(options)
  @message_buffer = []
  @last_hearbeat = nil
  @challenge_denied, @login_requested, @deliberate_close = false
  @failed_reconnect_attempts = 0
  @state = CONNECTION_STATE::CLOSED
  Celluloid.logger.level = @options[:verbose] ? LOG_LEVEL::INFO : LOG_LEVEL::OFF
  connect
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



13
14
15
# File 'lib/deepstream/client.rb', line 13

def options
  @options
end

#stateObject (readonly)

Returns the value of attribute state.



13
14
15
# File 'lib/deepstream/client.rb', line 13

def state
  @state
end

Instance Method Details

#closeObject



88
89
90
91
92
93
94
95
96
# File 'lib/deepstream/client.rb', line 88

def close
  return unless connected?
  @state = CONNECTION_STATE::CLOSED
  @deliberate_close = true
  @connection.close
  @connection.terminate
rescue => e
  on_exception(e)
end

#connected?Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/deepstream/client.rb', line 98

def connected?
  @state != CONNECTION_STATE::CLOSED
end

#inspectObject



110
111
112
# File 'lib/deepstream/client.rb', line 110

def inspect
  "#{self.class} #{@url} | connection state: #{@state}"
end

#logged_in?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/deepstream/client.rb', line 106

def logged_in?
  @state == CONNECTION_STATE::OPEN
end

#login(credentials = ) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/deepstream/client.rb', line 71

def (credentials = @options[:credentials])
  @login_requested = true
  @options[:credentials] = credentials
  if @challenge_denied
    on_error("this client's connection was closed")
  elsif !connected?
    async.connect
  elsif @state == CONNECTION_STATE::AUTHENTICATING
    @login_requested = false
    send_message(TOPIC::AUTH, ACTION::REQUEST, @options[:credentials].to_json)
  end
  self
rescue => e
  on_exception(e)
  self
end

#on_close(code, reason) ⇒ Object



63
64
65
66
67
68
69
# File 'lib/deepstream/client.rb', line 63

def on_close(code, reason)
  info("Websocket connection closed: code - #{code.inspect}, reason - #{reason.inspect}")
  @state = CONNECTION_STATE::CLOSED
  reconnect unless @deliberate_close
rescue => e
  on_exception(e)
end

#on_message(data) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/deepstream/client.rb', line 47

def on_message(data)
  message = Message.new(data)
  info("Incoming message: #{message.inspect}")
  case message.topic
  when TOPIC::AUTH       then authentication_message(message)
  when TOPIC::CONNECTION then connection_message(message)
  when TOPIC::EVENT      then @event_handler.on_message(message)
  when TOPIC::ERROR      then @error_handler.on_error(message)
  when TOPIC::RECORD     then @record_handler.on_message(message)
  when TOPIC::RPC        then raise(UnknownTopic, 'RPC is currently not implemented.')
  else raise(UnknownTopic, message)
  end
rescue => e
  on_exception(e)
end

#on_openObject



40
41
42
43
44
45
# File 'lib/deepstream/client.rb', line 40

def on_open
  info("Websocket connection opened")
  @state = CONNECTION_STATE::AWAITING_CONNECTION
  @connection_requested, @deliberate_close = false
  @failed_reconnect_attempts = 0
end

#reconnecting?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/deepstream/client.rb', line 102

def reconnecting?
  @state == CONNECTION_STATE::RECONNECTING
end

#send_message(*args) ⇒ Object



114
115
116
117
118
119
120
121
122
123
# File 'lib/deepstream/client.rb', line 114

def send_message(*args)
  message = Message.parse(*args)
  return unable_to_send_message(message) if !logged_in? && message.needs_authentication?
  info("Sending message: #{message.inspect}")
  @connection.text(message.to_s)
rescue Errno::EPIPE
  unable_to_send_message(message)
rescue => e
  on_exception(e)
end