Class: TDL::RemoteBroker

Inherits:
Object
  • Object
show all
Defined in:
lib/tdl/queue/transport/remote_broker.rb

Instance Method Summary collapse

Constructor Details

#initialize(hostname, port, request_queue_name, response_queue_name, request_timeout_millis) ⇒ RemoteBroker

Returns a new instance of RemoteBroker.



7
8
9
10
11
12
13
14
# File 'lib/tdl/queue/transport/remote_broker.rb', line 7

def initialize(hostname, port, request_queue_name, response_queue_name, request_timeout_millis)
  @stomp_client = Stomp::Client.new('', '', hostname, port)
  @request_queue = "/queue/#{request_queue_name}"
  @response_queue = "/queue/#{response_queue_name}"
  @serialization_provider = JSONRPCSerializationProvider.new
  @timer = ThreadTimer.new(request_timeout_millis, lambda = ->() { close unless closed? })
  @timer.start
end

Instance Method Details

#closeObject



35
36
37
38
# File 'lib/tdl/queue/transport/remote_broker.rb', line 35

def close
  @stomp_client.unsubscribe(@request_queue)
  @stomp_client.close
end

#closed?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/tdl/queue/transport/remote_broker.rb', line 40

def closed?
  @stomp_client.closed?
end

#joinObject



31
32
33
# File 'lib/tdl/queue/transport/remote_broker.rb', line 31

def join
  @stomp_client.join
end

#respond_to(request, response) ⇒ Object



25
26
27
28
29
# File 'lib/tdl/queue/transport/remote_broker.rb', line 25

def respond_to(request, response)
  serialized_response = @serialization_provider.serialize(response)
  @stomp_client.publish(@response_queue, serialized_response)
  @stomp_client.acknowledge(request.original_message)
end

#subscribe(handling_strategy) ⇒ Object



16
17
18
19
20
21
22
23
# File 'lib/tdl/queue/transport/remote_broker.rb', line 16

def subscribe(handling_strategy)
  @stomp_client.subscribe(@request_queue, {:ack => 'client-individual', 'activemq.prefetchSize' => 1}) do |msg|
    @timer.stop
    request = @serialization_provider.deserialize(msg)
    handling_strategy.process_next_request_from(self, request)
    @timer.start
  end
end