Class: TorqueBox::Messaging::Task

Inherits:
Object
  • Object
show all
Includes:
FutureStatus, ProcessorMiddleware::DefaultMiddleware
Defined in:
lib/torquebox/messaging/task.rb

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ProcessorMiddleware::DefaultMiddleware

default, #middleware

Methods included from FutureStatus

#__future

Class Method Details

.async(method, payload = {}, options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/torquebox/messaging/task.rb', line 36

def self.async(method, payload = {}, options = {})
  queue = Queue.new( queue_name )
  future = Future.new( queue )
  message = {
    :method => method,
    :payload => payload,
    :future_id => future.correlation_id,
    :future_queue => queue_name
  }
  options[:encoding] = :marshal
  queue.publish( message, options )
  
  future
rescue javax.naming.NameNotFoundException => ex
  raise RuntimeError.new("The queue for #{self.name} is not available. Did you disable it by setting its concurrency to 0?")
end

.queue_name(name = self.name[0...-4]) ⇒ Object



31
32
33
34
# File 'lib/torquebox/messaging/task.rb', line 31

def self.queue_name( name = self.name[0...-4] )
  suffix = org.torquebox.core.util.StringUtils.underscore(name)
  "/queues/torquebox/#{ENV['TORQUEBOX_APP_NAME']}/tasks/#{suffix}"
end

Instance Method Details

#process!(message) ⇒ Object



53
54
55
56
57
58
# File 'lib/torquebox/messaging/task.rb', line 53

def process!(message)
  hash = message.decode
  FutureResponder.new( Queue.new( hash[:future_queue] ), hash[:future_id] ).respond do
    self.send hash[:method].to_sym, hash[:payload]
  end
end