Class: Actions::Candlepin::CandlepinListeningService
- Inherits:
-
Object
- Object
- Actions::Candlepin::CandlepinListeningService
- Defined in:
- app/lib/actions/candlepin/candlepin_listening_service.rb
Constant Summary collapse
- RECONNECT_ATTEMPTS =
30- TIMEOUT =
Qpid::Messaging::Duration::SECOND
- NO_MESSAGE_AVAILABLE_ERROR_TYPE =
'NoMessageAvailable'
Class Attribute Summary collapse
-
.instance ⇒ Object
readonly
Returns the value of attribute instance.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #create_connection ⇒ Object
- #fetch_message ⇒ Object
-
#initialize(logger, url, address) ⇒ CandlepinListeningService
constructor
A new instance of CandlepinListeningService.
- #poll_for_messages(suspended_action) ⇒ Object
- #retrieve ⇒ Object
- #start(suspended_action) ⇒ Object
Constructor Details
#initialize(logger, url, address) ⇒ CandlepinListeningService
Returns a new instance of CandlepinListeningService.
25 26 27 28 29 30 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 25 def initialize(logger, url, address) @url = url @address = address @connection = create_connection @logger = logger end |
Class Attribute Details
.instance ⇒ Object (readonly)
Returns the value of attribute instance.
13 14 15 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 13 def instance @instance end |
Class Method Details
.close ⇒ Object
19 20 21 22 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 19 def close @instance.close if @instance @instance = nil end |
.initialize(logger, url, address) ⇒ Object
15 16 17 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 15 def initialize(logger, url, address) @instance ||= self.new(logger, url, address) end |
Instance Method Details
#close ⇒ Object
36 37 38 39 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 36 def close @thread.kill if @thread @connection.close end |
#create_connection ⇒ Object
32 33 34 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 32 def create_connection Qpid::Messaging::Connection.new(:url => @url, :options => {:transport => 'ssl'}) end |
#fetch_message ⇒ Object
66 67 68 69 70 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 66 def {:result => retrieve, :error => nil} rescue Actions::Candlepin::ConnectionError => e {:result => nil, :error => e.} end |
#poll_for_messages(suspended_action) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 72 def (suspended_action) @thread.kill if @thread @thread = Thread.new do loop do begin = if [:result] result = [:result] @session.acknowledge(:message => result) suspended_action.(result., result.subject, result.content) elsif [:error] suspended_action.notify_not_connected([:error]) break end sleep 1 rescue => e suspended_action.notify_fatal(e) raise e end end end end |
#retrieve ⇒ Object
41 42 43 44 45 46 47 48 49 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 41 def retrieve return @receiver.fetch(TIMEOUT) rescue => e if e.class.name.include? "TransportFailure" raise ::Actions::Candlepin::ConnectionError, "failed to connect to #{@url}" else raise e unless e.class.name.include? NO_MESSAGE_AVAILABLE_ERROR_TYPE end end |
#start(suspended_action) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 51 def start(suspended_action) unless @connection.open? @connection.open @session = @connection.create_session @receiver = @session.create_receiver(@address) end if @connection.open? suspended_action.notify_connected else suspended_action.notify_not_connected("Not Connected") end rescue ::TransportFailure => e suspended_action.notify_not_connected(e.) end |