Class: Qwirk::Adapter::InMemory::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/qwirk/adapter/in_memory/publisher.rb

Defined Under Namespace

Classes: MyTaskConsumer, MyTaskProducer

Instance Method Summary collapse

Constructor Details

#initialize(adapter_factory, queue_name, topic_name, options, response_options) ⇒ Publisher

Returns a new instance of Publisher.



6
7
8
9
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 6

def initialize(adapter_factory, queue_name, topic_name, options, response_options)
  @queue_name, @topic_name, @options, @response_options = queue_name, topic_name, options, response_options
  @queue = Factory.get_publisher_queue(queue_name, topic_name)
end

Instance Method Details

#create_fail_producer_consumer_pair(task_id, marshaler) ⇒ Object



43
44
45
46
47
48
49
50
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 43

def create_fail_producer_consumer_pair(task_id, marshaler)
  consumer_queue          = Queue.new("#{@queue}Fail:#{task_id}")
  # TODO: Unlimited queue or some form of exception on maximum
  consumer_queue.max_size = -1
  producer = MyTaskProducer.new(@queue, consumer_queue, marshaler, {})
  consumer  = MyTaskConsumer.new(@queue, consumer_queue)
  return producer, consumer
end

#create_producer_consumer_pair(task_id, marshaler) ⇒ Object

See Qwirk::Publisher#create_producer_consumer_pair for the requirements for this method



35
36
37
38
39
40
41
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 35

def create_producer_consumer_pair(task_id, marshaler)
  consumer_queue          = Queue.new("#{@queue}:#{task_id}")
  consumer_queue.max_size = @response_options[:queue_max_size] || 100
  producer                = MyTaskProducer.new(@queue, consumer_queue, marshaler, @response_options)
  consumer                = MyTaskConsumer.new(@queue, consumer_queue)
  return producer, consumer
end

#default_marshal_symObject



11
12
13
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 11

def default_marshal_sym
  :none
end

#publish(marshaled_object, marshaler, task_id, props) ⇒ Object

Publish the given object and return the reply_queue as the adapter_info.



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 16

def publish(marshaled_object, marshaler, task_id, props)
  # Since we're in-memory, we'll just unmarshal the object so there is less info to carry around
  object = marshaler.unmarshal(marshaled_object)
  reply_queue = nil
  if @response_options
    reply_queue = ReplyQueue.new("#{@queue}:#{object.to_s}")
  end
  @queue.write([object, reply_queue])
  # Return the object to get sent to with_response below.
  return reply_queue
end

#with_response(reply_queue) {|reply_queue| ... } ⇒ Object

See Qwirk::PublishHandle#read_response for the requirements for this method.

Yields:

  • (reply_queue)


29
30
31
32
# File 'lib/qwirk/adapter/in_memory/publisher.rb', line 29

def with_response(reply_queue, &block)
  raise "Could not find reply_queue for #{@queue}" unless reply_queue
  yield reply_queue
end