Class: TDL::RemoteBroker
- Inherits:
-
Object
- Object
- TDL::RemoteBroker
- Defined in:
- lib/tdl/queue/transport/remote_broker.rb
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(hostname, port, unique_id, request_timeout_millis) ⇒ RemoteBroker
constructor
A new instance of RemoteBroker.
- #join ⇒ Object
- #respond_to(request, response) ⇒ Object
- #subscribe(handling_strategy) ⇒ Object
Constructor Details
#initialize(hostname, port, unique_id, request_timeout_millis) ⇒ RemoteBroker
Returns a new instance of RemoteBroker.
7 8 9 10 11 12 13 14 15 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 7 def initialize(hostname, port, unique_id, request_timeout_millis) @stomp_client = Stomp::Client.new('', '', hostname, port) @unique_id = unique_id @request_queue = "/queue/#{@unique_id}.req" @response_queue = "/queue/#{@unique_id}.resp" @serialization_provider = JSONRPCSerializationProvider.new @timer = ThreadTimer.new(request_timeout_millis, lambda = ->() { close unless closed? }) @timer.start end |
Instance Method Details
#close ⇒ Object
36 37 38 39 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 36 def close @stomp_client.unsubscribe(@request_queue) @stomp_client.close end |
#closed? ⇒ Boolean
41 42 43 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 41 def closed? @stomp_client.closed? end |
#join ⇒ Object
32 33 34 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 32 def join @stomp_client.join end |
#respond_to(request, response) ⇒ Object
26 27 28 29 30 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 26 def respond_to(request, response) serialized_response = @serialization_provider.serialize(response) @stomp_client.publish(@response_queue, serialized_response) @stomp_client.acknowledge(request.) end |
#subscribe(handling_strategy) ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/tdl/queue/transport/remote_broker.rb', line 17 def subscribe(handling_strategy) @stomp_client.subscribe(@request_queue, {:ack => 'client-individual', 'activemq.prefetchSize' => 1}) do |msg| @timer.stop request = @serialization_provider.deserialize(msg) handling_strategy.process_next_request_from(self, request) @timer.start end end |