Class: Peatio::MQ::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/peatio/mq/client.rb

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



24
25
26
27
28
# File 'lib/peatio/mq/client.rb', line 24

def initialize
  Client.connect! unless Peatio::MQ::Client.connection
  @channel = Client.connection.create_channel
  @exchanges = {}
end

Class Attribute Details

.connectionObject

Returns the value of attribute connection.



6
7
8
# File 'lib/peatio/mq/client.rb', line 6

def connection
  @connection
end

Class Method Details

.connect!Object



8
9
10
11
12
13
14
15
16
17
# File 'lib/peatio/mq/client.rb', line 8

def connect!
  options = {
    host:     ENV["RABBITMQ_HOST"] || "0.0.0.0",
    port:     ENV["RABBITMQ_PORT"] || "5672",
    username: ENV["RABBITMQ_USER"],
    password: ENV["RABBITMQ_PASSWORD"],
  }
  @connection = Bunny.new(options)
  @connection.start
end

.disconnectObject



19
20
21
# File 'lib/peatio/mq/client.rb', line 19

def disconnect
  @connection.close
end

Instance Method Details

#exchange(name, type = "topic") ⇒ Object



30
31
32
# File 'lib/peatio/mq/client.rb', line 30

def exchange(name, type="topic")
  @exchanges[name] ||= @channel.exchange(name, type: type)
end

#publish(ex_name, type, id, event, payload) ⇒ Object



34
35
36
37
38
39
# File 'lib/peatio/mq/client.rb', line 34

def publish(ex_name, type, id, event, payload)
  routing_key = [type, id, event].join(".")
  serialized_data = JSON.dump(payload)
  exchange(ex_name).publish(serialized_data, routing_key: routing_key)
  Peatio::Logger.debug { "published event to #{routing_key} " }
end

#subscribe(ex_name, &callback) ⇒ Object



41
42
43
44
45
46
47
48
49
# File 'lib/peatio/mq/client.rb', line 41

def subscribe(ex_name, &callback)
  suffix = "#{Socket.gethostname.split(/-/).last}#{Random.rand(10_000)}"
  queue_name = "ranger.#{suffix}"

  @channel
    .queue(queue_name, durable: false, auto_delete: true)
    .bind(exchange(ex_name), routing_key: "#").subscribe(&callback)
  Peatio::Logger.info "Subscribed to exchange #{ex_name}"
end