Class: Peatio::Upstream::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/peatio/upstream/base.rb

Constant Summary collapse

DEFAULT_DELAY =
1
WEBSOCKET_CONNECTION_RETRY_DELAY =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Base

Returns a new instance of Base.



11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/peatio/upstream/base.rb', line 11

def initialize(config)
  @host = config["rest"]
  @adapter = config[:faraday_adapter] || :em_synchrony
  @config = config
  @ws_status = false
  @market = config['source']
  @target = config['target']
  @public_trades_cb = []
  @logger = Peatio::Logger.logger
  @peatio_mq = config['amqp']
  mount
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



9
10
11
# File 'lib/peatio/upstream/base.rb', line 9

def logger
  @logger
end

Instance Method Details

#build_error(response) ⇒ Object



109
110
111
112
113
# File 'lib/peatio/upstream/base.rb', line 109

def build_error(response)
  JSON.parse(response.body)
rescue StandardError => e
  "Code: #{response.env.status} Message: #{response.env.reason_phrase}"
end

#mountObject



24
25
26
# File 'lib/peatio/upstream/base.rb', line 24

def mount
  @public_trades_cb << method(:on_trade)
end

#notify_public_trade(trade) ⇒ Object



101
102
103
# File 'lib/peatio/upstream/base.rb', line 101

def notify_public_trade(trade)
  @public_trades_cb.each {|cb| cb&.call(trade) }
end

#on_trade(trade) ⇒ Object



78
79
80
81
82
83
84
85
86
87
# File 'lib/peatio/upstream/base.rb', line 78

def on_trade(trade)
  logger.info { "Publishing trade event: #{trade.inspect}" }
  @peatio_mq.enqueue_event("public", @market, "trades", {trades: [trade]})
  @peatio_mq.publish :trade, trade_json(trade), {
    headers: {
      type:     :upstream,
      market:   @market,
    }
  }
end

#subscribe_orderbook(_market, _ws) ⇒ Object



63
64
65
# File 'lib/peatio/upstream/base.rb', line 63

def subscribe_orderbook(_market, _ws)
  method_not_implemented
end

#subscribe_trades(_market, _ws) ⇒ Object



59
60
61
# File 'lib/peatio/upstream/base.rb', line 59

def subscribe_trades(_market, _ws)
  method_not_implemented
end

#to_sObject



105
106
107
# File 'lib/peatio/upstream/base.rb', line 105

def to_s
  "Exchange::#{self.class} config: #{@opts}"
end

#trade_json(trade) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/peatio/upstream/base.rb', line 89

def trade_json(trade)
  trade.deep_symbolize_keys!
  {
    id: trade[:tid],
    price: trade[:price],
    amount: trade[:amount],
    market_id: @market,
    created_at: Time.at(trade[:date]).utc.iso8601,
    taker_type: trade[:taker_type]
  }
end

#ws_connectObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/peatio/upstream/base.rb', line 28

def ws_connect
  logger.info { "Websocket connecting to #{@ws_url}" }
  raise "websocket url missing for account #{id}" unless @ws_url

  @ws = Faye::WebSocket::Client.new(@ws_url)

  @ws.on(:open) do |_e|
    subscribe_trades(@target, @ws)
    subscribe_orderbook(@target, @ws)
    logger.info { "Websocket connected" }
  end

  @ws.on(:message) do |msg|
    ws_read_message(msg)
  end

  @ws.on(:close) do |e|
    @ws = nil
    @ws_status = false
    logger.error "Websocket disconnected: #{e.code} Reason: #{e.reason}"
    Fiber.new do
      EM::Synchrony.sleep(WEBSOCKET_CONNECTION_RETRY_DELAY)
      ws_connect
    end.resume
  end
end

#ws_connect_publicObject



55
56
57
# File 'lib/peatio/upstream/base.rb', line 55

def ws_connect_public
  ws_connect
end

#ws_read_message(msg) ⇒ Object



71
72
73
74
75
76
# File 'lib/peatio/upstream/base.rb', line 71

def ws_read_message(msg)
  logger.debug {"received websocket message: #{msg.data}" }

  object = JSON.parse(msg.data)
  ws_read_public_message(object)
end

#ws_read_public_message(msg) ⇒ Object



67
68
69
# File 'lib/peatio/upstream/base.rb', line 67

def ws_read_public_message(msg)
  logger.info { "received public message: #{msg}" }
end