Class: Bitflyer::Realtime::WebSocketClient

Inherits:
Object
  • Object
show all
Defined in:
lib/bitflyer/realtime/websocket.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host:, key:, secret:, debug: false) ⇒ WebSocketClient

Returns a new instance of WebSocketClient.



13
14
15
16
17
18
19
20
21
22
# File 'lib/bitflyer/realtime/websocket.rb', line 13

def initialize(host:, key:, secret:, debug: false)
  @host = host
  @key = key
  @secret = secret
  @debug = debug
  @channel_names = []
  @channel_callbacks = {}
  connect
  start_monitoring
end

Instance Attribute Details

#channel_callbacksObject

Returns the value of attribute channel_callbacks.



10
11
12
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def channel_callbacks
  @channel_callbacks
end

#channel_namesObject

Returns the value of attribute channel_names.



10
11
12
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def channel_names
  @channel_names
end

#disconnectedObject

Returns the value of attribute disconnected.



10
11
12
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def disconnected
  @disconnected
end

#last_ping_atObject

Returns the value of attribute last_ping_at.



10
11
12
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def last_ping_at
  @last_ping_at
end

#last_pong_atObject

Returns the value of attribute last_pong_at.



10
11
12
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def last_pong_at
  @last_pong_at
end

#ping_intervalObject

Returns the value of attribute ping_interval.



10
11
12
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def ping_interval
  @ping_interval
end

#ping_timeoutObject

Returns the value of attribute ping_timeout.



10
11
12
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def ping_timeout
  @ping_timeout
end

#readyObject

Returns the value of attribute ready.



10
11
12
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def ready
  @ready
end

#websocket_clientObject

Returns the value of attribute websocket_client.



10
11
12
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def websocket_client
  @websocket_client
end

Instance Method Details

#authenticateObject



127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/bitflyer/realtime/websocket.rb', line 127

def authenticate
  debug_log 'Authenticate'
  timestamp = Time.now.to_i
  nonce = Random.new.bytes(16).unpack1('H*')
  signature = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha256'), @secret, timestamp.to_s + nonce)
  auth_params = {
    api_key: @key,
    timestamp: timestamp,
    nonce: nonce,
    signature: signature
  }
  @websocket_client.send "420#{['auth', auth_params].to_json}"
end

#authenticated(json:) ⇒ Object



141
142
143
144
145
146
147
# File 'lib/bitflyer/realtime/websocket.rb', line 141

def authenticated(json:)
  raise "Authentication failed: #{json}" if json != '[null]'

  debug_log 'Authenticated'
  subscribe_channels
  @ready&.call
end

#connectObject



31
32
33
34
35
36
37
38
39
40
# File 'lib/bitflyer/realtime/websocket.rb', line 31

def connect
  @websocket_client = WebSocket::Client::Simple.connect "#{@host}/socket.io/?transport=websocket"
  this = self
  @websocket_client.on(:message) { |payload| this.handle_message(payload: payload) }
  @websocket_client.on(:error) { |error| this.handle_error(error: error) }
  @websocket_client.on(:close) { |error| this.handle_close(error: error) }
rescue SocketError => e
  puts e
  puts e.backtrace.join("\n")
end

#debug_log(message) ⇒ Object



173
174
175
176
177
# File 'lib/bitflyer/realtime/websocket.rb', line 173

def debug_log(message)
  return unless @debug

  p message
end

#disconnectObject



161
162
163
164
# File 'lib/bitflyer/realtime/websocket.rb', line 161

def disconnect
  debug_log 'Disconnecting from server...'
  @websocket_client.close
end

#emit_message(json:) ⇒ Object



166
167
168
169
170
171
# File 'lib/bitflyer/realtime/websocket.rb', line 166

def emit_message(json:)
  channel_name, *messages = JSON.parse json
  return unless channel_name

  messages.each { |message| @channel_callbacks[channel_name.to_sym]&.call(message) }
end

#handle_close(error:) ⇒ Object



108
109
110
111
# File 'lib/bitflyer/realtime/websocket.rb', line 108

def handle_close(error:)
  debug_log error
  @disconnected&.call(error)
end

#handle_error(error:) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/bitflyer/realtime/websocket.rb', line 82

def handle_error(error:)
  debug_log error
  return unless error.is_a? Errno::ECONNRESET

  @disconnected&.call(error)
  reconnect
end

#handle_message(payload:) ⇒ Object

rubocop:disable Metrics/CyclomaticComplexity



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/bitflyer/realtime/websocket.rb', line 90

def handle_message(payload:) # rubocop:disable Metrics/CyclomaticComplexity
  debug_log payload.data
  return unless payload.data =~ /^\d+/

  code, body = payload.data.scan(/^(\d+)(.*)$/)[0]

  case code.to_i
  when 0 then setup_by_response(json: body)
  when 3 then receive_pong
  when 41 then disconnect
  when 42 then emit_message(json: body)
  when 430 then authenticated(json: body)
  end
rescue StandardError => e
  puts e
  puts e.backtrace.join("\n")
end

#receive_pongObject



156
157
158
159
# File 'lib/bitflyer/realtime/websocket.rb', line 156

def receive_pong
  debug_log 'Received pong'
  @last_pong_at = Time.now.to_i
end

#reconnectObject



73
74
75
76
77
78
79
80
# File 'lib/bitflyer/realtime/websocket.rb', line 73

def reconnect
  return if @websocket_client&.open?

  debug_log 'Reconnecting...'

  @websocket_client.close if @websocket_client.open?
  connect
end

#send_pingObject



56
57
58
59
60
61
62
63
# File 'lib/bitflyer/realtime/websocket.rb', line 56

def send_ping
  return unless @last_ping_at && @ping_interval
  return unless Time.now.to_i - @last_ping_at > @ping_interval / 1000

  debug_log 'Sent ping'
  @websocket_client.send '2'
  @last_ping_at = Time.now.to_i
end

#setup_by_response(json:) ⇒ Object

rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/bitflyer/realtime/websocket.rb', line 113

def setup_by_response(json:) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
  body = JSON.parse json
  @ping_interval = body['pingInterval']&.to_i || 25_000
  @ping_timeout  = body['pingTimeout']&.to_i || 60_000
  @last_ping_at = Time.now.to_i
  @last_pong_at = Time.now.to_i
  if @key && @secret
    authenticate
  else
    subscribe_channels
    @ready&.call
  end
end

#start_monitoringObject



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/bitflyer/realtime/websocket.rb', line 42

def start_monitoring
  Thread.new do
    loop do
      sleep 1
      if @websocket_client&.open?
        send_ping
        wait_pong
      else
        reconnect
      end
    end
  end
end

#subscribe(channel_name:, &block) ⇒ Object



24
25
26
27
28
29
# File 'lib/bitflyer/realtime/websocket.rb', line 24

def subscribe(channel_name:, &block)
  debug_log "Subscribe #{channel_name}"
  @channel_names = (@channel_names + [channel_name]).uniq
  @channel_callbacks[channel_name] = block
  @websocket_client.send "42#{['subscribe', channel_name].to_json}"
end

#subscribe_channelsObject



149
150
151
152
153
154
# File 'lib/bitflyer/realtime/websocket.rb', line 149

def subscribe_channels
  @channel_callbacks.each_key do |channel_name|
    debug_log "42#{{ subscribe: channel_name }.to_json}"
    @websocket_client.send "42#{['subscribe', channel_name].to_json}"
  end
end

#wait_pongObject



65
66
67
68
69
70
71
# File 'lib/bitflyer/realtime/websocket.rb', line 65

def wait_pong
  return unless @last_pong_at && @ping_timeout
  return unless Time.now.to_i - @last_pong_at > (@ping_interval + @ping_timeout) / 1000

  debug_log 'Timed out waiting pong'
  @websocket_client.close
end