Class: Bitflyer::Realtime::WebSocketClient

Inherits:
Object
  • Object
show all
Defined in:
lib/bitflyer/realtime/websocket.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host:, debug: false) ⇒ WebSocketClient



12
13
14
15
16
17
18
19
# File 'lib/bitflyer/realtime/websocket.rb', line 12

def initialize(host:, debug: false)
  @host = host
  @debug = debug
  @error = nil
  @channel_names = []
  @channel_callbacks = {}
  connect
end

Instance Attribute Details

#channel_callbacksObject

Returns the value of attribute channel_callbacks.



9
10
11
# File 'lib/bitflyer/realtime/websocket.rb', line 9

def channel_callbacks
  @channel_callbacks
end

#channel_nameObject

Returns the value of attribute channel_name.



9
10
11
# File 'lib/bitflyer/realtime/websocket.rb', line 9

def channel_name
  @channel_name
end

#errorObject

Returns the value of attribute error.



9
10
11
# File 'lib/bitflyer/realtime/websocket.rb', line 9

def error
  @error
end

#last_ping_atObject

Returns the value of attribute last_ping_at.



9
10
11
# File 'lib/bitflyer/realtime/websocket.rb', line 9

def last_ping_at
  @last_ping_at
end

#last_pong_atObject

Returns the value of attribute last_pong_at.



9
10
11
# File 'lib/bitflyer/realtime/websocket.rb', line 9

def last_pong_at
  @last_pong_at
end

#ping_intervalObject

Returns the value of attribute ping_interval.



9
10
11
# File 'lib/bitflyer/realtime/websocket.rb', line 9

def ping_interval
  @ping_interval
end

#ping_timeoutObject

Returns the value of attribute ping_timeout.



9
10
11
# File 'lib/bitflyer/realtime/websocket.rb', line 9

def ping_timeout
  @ping_timeout
end

#websocket_clientObject

Returns the value of attribute websocket_client.



9
10
11
# File 'lib/bitflyer/realtime/websocket.rb', line 9

def websocket_client
  @websocket_client
end

Instance Method Details

#connectObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/bitflyer/realtime/websocket.rb', line 28

def connect
  @websocket_client = WebSocket::Client::Simple.connect "#{@host}/socket.io/?transport=websocket"
  this = self

  Thread.new do
    loop do
      sleep 1
      if @websocket_client&.open?
        send_ping
        wait_pong
      end
    end
  end

  Thread.new do
    loop do
      sleep 1
      next unless @error

      reconnect
    end
  end

  @websocket_client.on(:message) { |payload| this.handle_message(payload: payload) }
  @websocket_client.on(:error) { |error| this.handle_error(error: error) }
end

#debug_log(message) ⇒ Object



138
139
140
141
142
# File 'lib/bitflyer/realtime/websocket.rb', line 138

def debug_log(message)
  return unless @debug

  p message
end

#disconnectObject



126
127
128
129
# File 'lib/bitflyer/realtime/websocket.rb', line 126

def disconnect
  debug_log 'Disconnecting from server...'
  @error = true
end

#emit_message(json:) ⇒ Object



131
132
133
134
135
136
# File 'lib/bitflyer/realtime/websocket.rb', line 131

def emit_message(json:)
  channel_name, *messages = JSON.parse json
  return unless channel_name

  messages.each { |message| @channel_callbacks[channel_name.to_sym]&.call(message) }
end

#handle_error(error:) ⇒ Object



86
87
88
89
90
91
# File 'lib/bitflyer/realtime/websocket.rb', line 86

def handle_error(error:)
  debug_log error
  return unless error.is_a? Errno::ECONNRESET

  reconnect
end

#handle_message(payload:) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/bitflyer/realtime/websocket.rb', line 93

def handle_message(payload:)
  debug_log payload.data
  return unless payload.data =~ /^\d+/

  code, body = payload.data.scan(/^(\d+)(.*)$/)[0]

  case code.to_i
  when 0 then setup_by_response(json: body)
  when 3 then receive_pong
  when 41 then disconnect
  when 42 then emit_message(json: body)
  end
rescue StandardError => e
  puts e
  puts e.backtrace.join("\n")
end

#receive_pongObject



121
122
123
124
# File 'lib/bitflyer/realtime/websocket.rb', line 121

def receive_pong
  debug_log 'Received pong'
  @last_pong_at = Time.now.to_i
end

#reconnectObject



72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/bitflyer/realtime/websocket.rb', line 72

def reconnect
  return unless @error

  debug_log 'Reconnecting...'

  @error = nil
  @websocket_client.close if @websocket_client.open?
  connect
  @channel_names.each do |channel_name|
    debug_log "42#{{ subscribe: channel_name }.to_json}"
    websocket_client.send "42#{['subscribe', channel_name].to_json}"
  end
end

#send_pingObject



55
56
57
58
59
60
61
62
# File 'lib/bitflyer/realtime/websocket.rb', line 55

def send_ping
  return unless @last_ping_at && @ping_interval
  return unless Time.now.to_i - @last_ping_at > @ping_interval / 1000

  debug_log 'Sent ping'
  @websocket_client.send '2'
  @last_ping_at = Time.now.to_i
end

#setup_by_response(json:) ⇒ Object



110
111
112
113
114
115
116
117
118
119
# File 'lib/bitflyer/realtime/websocket.rb', line 110

def setup_by_response(json:)
  body = JSON.parse json
  @ping_interval = body['pingInterval'].to_i || 25_000
  @ping_timeout  = body['pingTimeout'].to_i || 60_000
  @last_ping_at = Time.now.to_i
  @last_pong_at = Time.now.to_i
  channel_callbacks.each do |channel_name, _|
    websocket_client.send "42#{['subscribe', channel_name].to_json}"
  end
end

#subscribe(channel_name:, &block) ⇒ Object



21
22
23
24
25
26
# File 'lib/bitflyer/realtime/websocket.rb', line 21

def subscribe(channel_name:, &block)
  debug_log "Subscribe #{channel_name}"
  @channel_names = (@channel_names + [channel_name]).uniq
  @channel_callbacks[channel_name] = block
  websocket_client.send "42#{['subscribe', channel_name].to_json}"
end

#wait_pongObject



64
65
66
67
68
69
70
# File 'lib/bitflyer/realtime/websocket.rb', line 64

def wait_pong
  return unless @last_pong_at && @ping_timeout
  return unless Time.now.to_i - @last_pong_at > (@ping_interval + @ping_timeout) / 1000

  debug_log 'Timed out waiting pong'
  @websocket_client.close
end