Class: TDL::RemoteBroker
- Inherits:
-
Object
- Object
- TDL::RemoteBroker
- Defined in:
- lib/tdl/queue/transport/remote_broker.rb
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(hostname, port, request_queue_name, response_queue_name, request_timeout_millis) ⇒ RemoteBroker
constructor
A new instance of RemoteBroker.
- #join ⇒ Object
- #respond_to(request, response) ⇒ Object
- #subscribe(handling_strategy) ⇒ Object
Constructor Details
#initialize(hostname, port, request_queue_name, response_queue_name, request_timeout_millis) ⇒ RemoteBroker
Returns a new instance of RemoteBroker.
7 8 9 10 11 12 13 14 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 7 def initialize(hostname, port, request_queue_name, response_queue_name, request_timeout_millis) @stomp_client = Stomp::Client.new('', '', hostname, port) @request_queue = "/queue/#{request_queue_name}" @response_queue = "/queue/#{response_queue_name}" @serialization_provider = JSONRPCSerializationProvider.new @timer = ThreadTimer.new(request_timeout_millis, lambda = ->() { close unless closed? }) @timer.start end |
Instance Method Details
#close ⇒ Object
35 36 37 38 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 35 def close @stomp_client.unsubscribe(@request_queue) @stomp_client.close end |
#closed? ⇒ Boolean
40 41 42 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 40 def closed? @stomp_client.closed? end |
#join ⇒ Object
31 32 33 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 31 def join @stomp_client.join end |
#respond_to(request, response) ⇒ Object
25 26 27 28 29 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 25 def respond_to(request, response) serialized_response = @serialization_provider.serialize(response) @stomp_client.publish(@response_queue, serialized_response) @stomp_client.acknowledge(request.) end |
#subscribe(handling_strategy) ⇒ Object
16 17 18 19 20 21 22 23 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 16 def subscribe(handling_strategy) @stomp_client.subscribe(@request_queue, {:ack => 'client-individual', 'activemq.prefetchSize' => 1}) do |msg| @timer.stop request = @serialization_provider.deserialize(msg) handling_strategy.process_next_request_from(self, request) @timer.start end end |