Class: Stomper::Receivers::Threaded

Inherits:
Object
  • Object
show all
Defined in:
lib/stomper/receivers/threaded.rb

Overview

Basic threaded receiver

Defined Under Namespace

Classes: StopReceiver

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Threaded

Creates a new threaded receiver for the supplied Connection. Invoking #start on this receiver will create a new thread that will continually call receive on the connection. Stopping this receiver with #stop will terminate the thread.

Parameters:



21
22
23
24
25
26
# File 'lib/stomper/receivers/threaded.rb', line 21

def initialize(connection)
  @connection = connection
  @running = false
  @run_mutex = ::Mutex.new
  @run_thread = nil
end

Instance Attribute Details

#runningtrue, false (readonly) Also known as: running?

Returns true if the receiver is currently running, false otherwise. If the polling thread is terminated due to a raised exception, this attribute will be false.

Returns:

  • (true, false)


12
13
14
# File 'lib/stomper/receivers/threaded.rb', line 12

def running
  @running
end

Instance Method Details

#startself

Starts the receiver by creating a new thread to continually poll the connection for new Stomp frames. If an error is raised while calling Connection#receive, the polling thread will terminate, and #running? will return false.

Returns:

  • (self)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/stomper/receivers/threaded.rb', line 33

def start
  is_starting = @run_mutex.synchronize { @running = true unless @running }
  if is_starting
    @run_thread = Thread.new do
      begin
        until @connection.receive.nil?
        end
      rescue ::Stomper::Receivers::Threaded::StopReceiver
      rescue Exception => ex
        @running = false
        raise ex
      end
      @running = false
    end
  end
  self
end

#stopself

Stops the receiver by shutting down the polling thread. If an error was raised within the thread, this method will generally re-raise the error. The one exception to this behavior is if the error raised was an instance of IOError and a call to Connection#connected? returns false, in which case the error is ignored. The reason for this is that performing a read operation on a closed stream will raise an IOError. It is likely that when shutting down a connection and its receiver, the polling thread may be blocked on reading from the stream and raise such an error.

Returns:

  • (self)

Raises:

  • (Exception)


61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/stomper/receivers/threaded.rb', line 61

def stop
  stopped = @run_mutex.synchronize { @run_thread.nil? }
  unless stopped
    @run_thread.raise(::Stomper::Receivers::Threaded::StopReceiver.new)
    begin
      @run_thread.join
    rescue ::IOError, ::SystemCallError
      raise if @connection.connected?
    rescue ::Stomper::Receivers::Threaded::StopReceiver => ex
    end
    @run_thread = nil
  end
  self
end