Class: ActiveMessaging::MessageReceiver

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/activemessaging/threaded_poller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(poller, connection, pause = 1) ⇒ MessageReceiver

Returns a new instance of MessageReceiver.



156
157
158
159
160
161
162
163
164
# File 'lib/activemessaging/threaded_poller.rb', line 156

def initialize(poller, connection, pause=1)
  logger.debug("MessageReceiver initialize: poller:#{poller}, connection:#{connection}, pause:#{pause}")

  raise "No connection found for '#{poller.connection}'" unless connection

  self.poller     = poller
  self.connection = connection
  self.pause      = pause
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



154
155
156
# File 'lib/activemessaging/threaded_poller.rb', line 154

def connection
  @connection
end

#pauseObject

Returns the value of attribute pause.



154
155
156
# File 'lib/activemessaging/threaded_poller.rb', line 154

def pause
  @pause
end

#pollerObject

Returns the value of attribute poller.



154
155
156
# File 'lib/activemessaging/threaded_poller.rb', line 154

def poller
  @poller
end

Instance Method Details

#inspectObject



187
188
189
# File 'lib/activemessaging/threaded_poller.rb', line 187

def inspect
  "#<MessageReceiver #{to_s}>"
end

#loggerObject



195
# File 'lib/activemessaging/threaded_poller.rb', line 195

def logger; ::ActiveMessaging.logger; end

#receive(worker) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/activemessaging/threaded_poller.rb', line 166

def receive(worker)
  return unless poller.running

  # logger.debug("***** MessageReceiver calling receive")
  message = self.connection.receive(worker.options)
  # logger.debug("***** MessageReceiver receive returned")

  if message
    logger.debug("ActiveMessaging::MessageReceiver.receive: message:'#{message.inspect}'")
    poller.dispatch!(message, worker)
  else
    if (!poller || !poller.alive? || !poller.running)
      logger.debug("ActiveMessaging::MessageReceiver.receive: terminate")
      self.terminate
    end
    logger.debug("ActiveMessaging::MessageReceiver.receive: no message for worker #{worker.object_id}, retry in #{pause} sec")
    after(pause) { receive(worker) }
  end

end

#to_sObject



191
192
193
# File 'lib/activemessaging/threaded_poller.rb', line 191

def to_s
  @str ||= "#{Process.pid}-#{Thread.current.object_id}:#{self.object_id}"
end