Class: Deepstream::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/deepstream.rb

Instance Method Summary collapse

Constructor Details

#initialize(address, port = 6021) ⇒ Client

Returns a new instance of Client.



83
84
85
# File 'lib/deepstream.rb', line 83

def initialize(address, port = 6021)
  @address, @port, @unread_msg, @event_callbacks, @records = address, port, nil, {}, {}
end

Instance Method Details

#_connectObject



140
141
142
143
144
# File 'lib/deepstream.rb', line 140

def _connect
  _open_socket
  @connected = true
  @connected = _write_and_read(%w{A REQ {}}) { |msg| msg == %w{A A} }
end

#_fire_event_callback(msg) ⇒ Object



174
175
176
# File 'lib/deepstream.rb', line 174

def _fire_event_callback(msg)
  @event_callbacks[msg[2]].tap { |cb| Thread.start { cb.(_parse_data(msg[3])) } if cb }
end

#_open_socketObject



130
131
132
133
134
135
136
137
138
# File 'lib/deepstream.rb', line 130

def _open_socket
  Timeout.timeout(2) { @socket = TCPSocket.new(@address, @port) }
  Thread.start do
    loop { _process_msg(@socket.gets(30.chr).tap { |m| break m.chomp(30.chr).split(31.chr) if m }) }
  end
rescue
  print Time.now.to_s[/.+ .+ /], "Can't connect to deepstream server\n"
  raise
end

#_parse_data(payload) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/deepstream.rb', line 189

def _parse_data(payload)
  case payload[0]
  when 'O' then JSON.parse(payload[1..-1], object_class: OpenStruct)
  when '{' then JSON.parse(payload, object_class: OpenStruct)
  when 'S' then payload[1..-1]
  when 'N' then payload[1..-1].to_f
  when 'T' then true
  when 'F' then false
  when 'L' then nil
  else JSON.parse(payload, object_class: OpenStruct)
  end
end

#_process_msg(msg) ⇒ Object



159
160
161
162
163
164
165
166
167
168
# File 'lib/deepstream.rb', line 159

def _process_msg(msg)
  case msg[0..1]
  when %w{E EVT} then _fire_event_callback(msg)
  when %w{R P} then @records[msg[2]]._patch(msg[3], msg[4], _parse_data(msg[5]))
  when %w{R U} then @records[msg[2]]._update(msg[3], _parse_data(msg[4]))
  when %w{R A} then @records.delete(msg[3]) if msg[2] == 'D'
  when %w{E A} then nil
  else @unread_msg = msg
  end
end

#_readObject



170
171
172
# File 'lib/deepstream.rb', line 170

def _read
  loop { break @unread_msg || (next sleep 0.01) }.tap { @unread_msg = nil }
end

#_typed(value) ⇒ Object



178
179
180
181
182
183
184
185
186
187
# File 'lib/deepstream.rb', line 178

def _typed(value)
  case value
  when Hash then "O#{value.to_json}"
  when String then "S#{value}"
  when Numeric then "N#{value}"
  when TrueClass then 'T'
  when FalseClass then 'F'
  when NilClass then 'L'
  end
end

#_write(*args) ⇒ Object



152
153
154
155
156
157
# File 'lib/deepstream.rb', line 152

def _write(*args)
  _connect unless @connected
  @socket.write(args.join(31.chr) + 30.chr)
rescue
  @connected = false
end

#_write_and_read(*args) {|_read| ... } ⇒ Object

Yields:



146
147
148
149
150
# File 'lib/deepstream.rb', line 146

def _write_and_read(*args)
  @unread_msg = nil
  _write(*args)
  yield _read if block_given?
end

#delete(record_name) ⇒ Object



122
123
124
125
126
127
128
# File 'lib/deepstream.rb', line 122

def delete(record_name)
  if matching = record_name.match(/(?<namespace>\w+)\/(?<record>.+)/)
    tmp = get_list(matching[:namespace])
    tmp.remove(record_name)
  end
  _write('R', 'D', record_name)
end

#emit(event, value = nil) ⇒ Object



87
88
89
# File 'lib/deepstream.rb', line 87

def emit(event, value = nil)
  _write('E', 'EVT', event, _typed(value))
end

#get(record_name) ⇒ Object



96
97
98
# File 'lib/deepstream.rb', line 96

def get(record_name)
  get_record(record_name)
end

#get_list(list_name) ⇒ Object



114
115
116
117
118
119
120
# File 'lib/deepstream.rb', line 114

def get_list(list_name)
  @records[list_name] ||= (
    _write_and_read('R', 'CR', list_name)
    msg = _read
    Deepstream::List.new(self, list_name, _parse_data(msg[4]), msg[3].to_i)
  )
end

#get_record(record_name, list: nil) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/deepstream.rb', line 100

def get_record(record_name, list: nil)
  name = list ? "#{list}/#{record_name}" : record_name
  @records[name] ||= (
    _write_and_read('R', 'CR', name)
    msg = _read
    Deepstream::Record.new(self, name, _parse_data(msg[4]), msg[3].to_i)
  )
  if list
    @records[list] ||= get_list(list)
    @records[list].add(name)
  end
  @records[name]
end

#on(event, &block) ⇒ Object



91
92
93
94
# File 'lib/deepstream.rb', line 91

def on(event, &block)
  _write_and_read('E', 'S', event)
  @event_callbacks[event] = block
end