Class: Qwirk::Adapter::InMemory::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/qwirk/adapter/in_memory/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, marshaler, queue) ⇒ Worker

Returns a new instance of Worker.



8
9
10
11
12
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 8

def initialize(name, marshaler, queue)
  @name       = name
  @marshaler  = marshaler
  @queue      = queue
end

Instance Attribute Details

#stoppedObject (readonly)

Returns the value of attribute stopped.



6
7
8
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 6

def stopped
  @stopped
end

Instance Method Details

#acknowledge_message(message) ⇒ Object



19
20
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 19

def acknowledge_message(message)
end

#handle_failure(message, exception, fail_queue_name) ⇒ Object



37
38
39
40
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 37

def handle_failure(message, exception, fail_queue_name)
  # TODO: Mode for persisting to flat file?
  Qwirk.logger.warn("Dropping message that failed: #{message}")
end

#message_to_object(msg) ⇒ Object



32
33
34
35
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 32

def message_to_object(msg)
  # The publisher has already unmarshaled the object to save hassle here.
  return msg
end

#ready_to_stop?Boolean

If the worker_config has been commanded to stop, workers will continue processing messages until this returns true

Returns:

  • (Boolean)


50
51
52
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 50

def ready_to_stop?
  @queue.stopped?
end

#receive_messageObject



14
15
16
17
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 14

def receive_message
  message, @reply_queue = @queue.read(self)
  return message
end

#send_exception(original_message, e) ⇒ Object



27
28
29
30
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 27

def send_exception(original_message, e)
  # TODO: I think exceptions should be recreated fully so no need for marshal/unmarshal?
  do_send_response(original_message, Qwirk::RemoteException.new(e))
end

#send_response(original_message, marshaled_object) ⇒ Object



22
23
24
25
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 22

def send_response(original_message, marshaled_object)
  # We unmarshal so our workers get consistent messages regardless of the adapter
  do_send_response(original_message, @marshaler.unmarshal(marshaled_object))
end

#stopObject



42
43
44
45
46
47
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 42

def stop
  return if @stopped
  @stopped = true
  Qwirk.logger.debug { "Stopping #{self}" }
  @queue.interrupt_read
end

#to_sObject



54
55
56
# File 'lib/qwirk/adapter/in_memory/worker.rb', line 54

def to_s
  "#{@name} (InMemory)"
end