Class: TorqueBox::Messaging::MessageProcessor

Inherits:
Object
  • Object
show all
Includes:
ProcessorMiddleware::DefaultMiddleware
Defined in:
lib/torquebox/messaging/message_processor.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ProcessorMiddleware::DefaultMiddleware

default, #middleware

Constructor Details

#initializeMessageProcessor

Returns a new instance of MessageProcessor.



27
28
29
30
# File 'lib/torquebox/messaging/message_processor.rb', line 27

def initialize
  @message = nil
  @proxy = nil
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object



36
37
38
# File 'lib/torquebox/messaging/message_processor.rb', line 36

def method_missing(method, *args, &block)
  @proxy.send( method, *args, &block )
end

Instance Attribute Details

#messageObject

Returns the value of attribute message.



25
26
27
# File 'lib/torquebox/messaging/message_processor.rb', line 25

def message
  @message
end

Class Method Details

.listArray<TorqueBox::Messaging::MessageProcessorProxy>

List all available message processors for current application.

Returns:



69
70
71
72
73
74
75
76
77
# File 'lib/torquebox/messaging/message_processor.rb', line 69

def list
  processors = []

  TorqueBox::MSC.get_services(/^#{messaging_service_name.canonical_name}\.\".*\"$/) do |service|
    processors << MessageProcessorProxy.new(service.value)
  end

  processors
end

.lookup(destination_name, class_name) ⇒ Object

Lookup a message processor by its destination and class name.

Parameters:

  • The (String)

    destination name (queue, topic) to which a message processor is bound.

  • The (String)

    class name of the message processor implementation.



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/torquebox/messaging/message_processor.rb', line 86

def lookup(destination_name, class_name)
  sn = messaging_service_name.append("#{destination_name}.#{class_name}")

  # Try to find a message procesor for specified parameters
  group = TorqueBox::ServiceRegistry::lookup(sn)

  return MessageProcessorProxy.new(group) if group

  # Ooops, no processor is found. Most probably wrong data.
  return nil
end

Instance Method Details

#initialize_proxy(group) ⇒ Object



32
33
34
# File 'lib/torquebox/messaging/message_processor.rb', line 32

def initialize_proxy(group)
  @proxy = MessageProcessorProxy.new(group)
end

#on_error(error) ⇒ Object



44
45
46
# File 'lib/torquebox/messaging/message_processor.rb', line 44

def on_error(error)
  raise error
end

#on_message(body) ⇒ Object



40
41
42
# File 'lib/torquebox/messaging/message_processor.rb', line 40

def on_message(body)
  throw "Your subclass must implement on_message(body)"
end

#process!(message) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/torquebox/messaging/message_processor.rb', line 48

def process!(message)
  @message = message
  begin
    value = on_message(message.decode)
    reply(value) if synchronous?
  rescue Exception => e
    on_error( e ) 
  end 
end

#reply(value) ⇒ Object



58
59
60
# File 'lib/torquebox/messaging/message_processor.rb', line 58

def reply(value)
  TorqueBox::Messaging::Queue.new(@message.jms_message.jms_destination.queue_name).publish(value, :correlation_id => @message.jms_message.jms_message_id)
end