Class: RailwayIpc::Rabbitmq::Adapter

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/railway_ipc/rabbitmq/adapter.rb

Defined Under Namespace

Classes: TimeoutError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(exchange_name:, amqp_url: ENV['RAILWAY_RABBITMQ_CONNECTION_URL'], queue_name: '', options: {}) ⇒ Adapter

Returns a new instance of Adapter.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 19

def initialize(exchange_name:, amqp_url: ENV['RAILWAY_RABBITMQ_CONNECTION_URL'], queue_name: '', options: {})
  @queue_name = queue_name
  @exchange_name = exchange_name
  settings = AMQ::Settings.parse_amqp_url(amqp_url)
  vhost = settings[:vhost] || '/'
  @connection = Bunny.new({
    host: settings[:host],
    user: settings[:user],
    pass: settings[:pass],
    port: settings[:port],
    vhost: vhost,
    automatic_recovery: false,
    logger: RailwayIpc.logger
  }.merge(options))
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



8
9
10
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8

def channel
  @channel
end

#connectionObject (readonly)

Returns the value of attribute connection.



8
9
10
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8

def connection
  @connection
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



8
9
10
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8

def exchange
  @exchange
end

#exchange_nameObject (readonly)

Returns the value of attribute exchange_name.



8
9
10
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8

def exchange_name
  @exchange_name
end

#queueObject (readonly)

Returns the value of attribute queue.



8
9
10
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8

def queue
  @queue
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



8
9
10
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8

def queue_name
  @queue_name
end

Instance Method Details

#bind_queue_to_exchangeObject



90
91
92
93
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 90

def bind_queue_to_exchange
  queue.bind(exchange)
  self
end

#check_for_message(timeout: 10, time_elapsed: 0, &block) ⇒ Object

Raises:



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 49

def check_for_message(timeout: 10, time_elapsed: 0, &block)
  raise TimeoutError.new if time_elapsed >= timeout

  block ||= ->(result) { result }

  result = queue.pop
  return block.call(*result) if result.compact.any?

  sleep 1
  check_for_message(timeout: timeout, time_elapsed: time_elapsed + 1, &block)
end

#connectObject



61
62
63
64
65
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 61

def connect
  connection.start
  @channel = connection.channel
  self
end

#create_exchange(strategy: :fanout, options: { durable: true }) ⇒ Object



73
74
75
76
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 73

def create_exchange(strategy: :fanout, options: { durable: true })
  @exchange = Bunny::Exchange.new(connection.channel, strategy, exchange_name, options)
  self
end

#create_queue(options = { durable: true }) ⇒ Object



85
86
87
88
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 85

def create_queue(options={ durable: true })
  @queue = @channel.queue(queue_name, options)
  self
end

#delete_exchangeObject



78
79
80
81
82
83
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 78

def delete_exchange
  # rubocop:disable Style/SafeNavigation
  exchange.delete if exchange
  # rubocop:enable Style/SafeNavigation
  self
end

#delete_queueObject



95
96
97
98
99
100
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 95

def delete_queue
  # rubocop:disable Style/SafeNavigation
  queue.delete if queue
  # rubocop:enable Style/SafeNavigation
  self
end

#disconnectObject



67
68
69
70
71
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 67

def disconnect
  channel.close
  connection.close
  self
end

#publish(message, options = {}) ⇒ Object



35
36
37
38
39
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 35

def publish(message, options={})
  # rubocop:disable Style/SafeNavigation
  exchange.publish(message, options) if exchange
  # rubocop:enable Style/SafeNavigation
end

#reply(message, from) ⇒ Object



41
42
43
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 41

def reply(message, from)
  channel.default_exchange.publish(message, routing_key: from)
end

#subscribe(&block) ⇒ Object



45
46
47
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 45

def subscribe(&block)
  queue.subscribe(&block)
end