Class: Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny-pub-sub/subscriber.rb

Defined Under Namespace

Classes: ClientException, ServerException

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscriber_config, results_publisher) ⇒ Subscriber

Returns a new instance of Subscriber.



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/bunny-pub-sub/subscriber.rb', line 55

def initialize(subscriber_config, results_publisher)
  return unless valid_config? subscriber_config
  return unless at_least_one_binding_key_exists? subscriber_config

  subscriber_config[:BINDING_KEYS] = binding_keys_to_array subscriber_config
  return if subscriber_config[:BINDING_KEYS].nil?

  ServerException.publisher = results_publisher
  ClientException.publisher = results_publisher

  @subscriber_config = subscriber_config
  @results_publisher = results_publisher
end

Instance Attribute Details

#cancel_okObject (readonly)

Returns the value of attribute cancel_ok.



32
33
34
# File 'lib/bunny-pub-sub/subscriber.rb', line 32

def cancel_ok
  @cancel_ok
end

Instance Method Details

#cancel_subscriberObject

TODO: Fix this. Probably doesn’t work because



109
110
111
112
113
114
115
116
117
118
# File 'lib/bunny-pub-sub/subscriber.rb', line 109

def cancel_subscriber
  @cancel_ok = @consumer.cancel
  puts 'Consumer cancelled:'
  puts @cancel_ok.inspect
rescue RuntimeError => e
  puts e
ensure
  @channel.close
  @connection.close
end

#client_error!(message, status, _headers = {}, _backtrace = []) ⇒ Object



47
48
49
# File 'lib/bunny-pub-sub/subscriber.rb', line 47

def client_error!(message, status, _headers = {}, _backtrace = [])
  raise ClientException.new message, status
end

#server_error!(message, status, _headers = {}, _backtrace = []) ⇒ Object



51
52
53
# File 'lib/bunny-pub-sub/subscriber.rb', line 51

def server_error!(message, status, _headers = {}, _backtrace = [])
  raise ServerException.new message, status
end

#start_subscriber(callback) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/bunny-pub-sub/subscriber.rb', line 69

def start_subscriber(callback)
  @connection = Bunny.new(
    hostname: @subscriber_config[:RABBITMQ_HOSTNAME],
    username: @subscriber_config[:RABBITMQ_USERNAME],
    password: @subscriber_config[:RABBITMQ_PASSWORD]
  )
  ServicesManager.instance.start_connection(@connection, 6)

  @channel = @connection.create_channel
  # With the created communication @channel, create/join an existing exchange
  # of the TYPE 'topic' and named as 'assessment'
  # Durable exchanges survive broker restart while transient exchanges do not.
  topic_exchange = @channel.topic(@subscriber_config[:EXCHANGE_NAME], durable: true)

  # Use this for making rabbitMQ not give a worker more than 1 jobs
  # if it is already working on one.
  # @channel.prefetch(1)

  queue = @channel.queue(@subscriber_config[:DURABLE_QUEUE_NAME], durable: true)

  @subscriber_config[:BINDING_KEYS].each do |language_environment|
    queue.bind(topic_exchange, routing_key: language_environment)
  end

  begin
    puts ' [*] Waiting for messages. To exit press CTRL+C'

    @consumer = queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, params|
      callback.call(self, @channel, @results_publisher, delivery_info, properties, params)
    end
  rescue Interrupt => _e
    @channel.close
    @connection.close

    exit(0)
  end
end