Class: Actions::Candlepin::CandlepinListeningService

Inherits:
Object
  • Object
show all
Defined in:
app/lib/actions/candlepin/candlepin_listening_service.rb

Constant Summary collapse

RECONNECT_ATTEMPTS =
30
TIMEOUT =
Qpid::Messaging::Duration::SECOND
NO_MESSAGE_AVAILABLE_ERROR_TYPE =
'NoMessageAvailable'

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, url, address) ⇒ CandlepinListeningService

Returns a new instance of CandlepinListeningService.



25
26
27
28
29
30
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 25

def initialize(logger, url, address)
  @url = url
  @address = address
  @connection = create_connection
  @logger = logger
end

Class Attribute Details

.instanceObject (readonly)

Returns the value of attribute instance.



13
14
15
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 13

def instance
  @instance
end

Class Method Details

.closeObject



19
20
21
22
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 19

def close
  @instance.close if @instance
  @instance = nil
end

.initialize(logger, url, address) ⇒ Object



15
16
17
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 15

def initialize(logger, url, address)
  @instance ||= self.new(logger, url, address)
end

Instance Method Details

#closeObject



36
37
38
39
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 36

def close
  @thread.kill if @thread
  @connection.close
end

#create_connectionObject



32
33
34
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 32

def create_connection
  Qpid::Messaging::Connection.new(:url => @url, :options => {:transport => 'ssl'})
end

#fetch_messageObject



66
67
68
69
70
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 66

def fetch_message
  {:result => retrieve, :error => nil}
rescue Actions::Candlepin::ConnectionError => e
  {:result => nil, :error => e.message}
end

#poll_for_messages(suspended_action) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 72

def poll_for_messages(suspended_action)
  @thread.kill if @thread
  @thread = Thread.new do
    loop do
      begin
        message = fetch_message
        if message[:result]
          result = message[:result]
          @session.acknowledge(:message => result)
          suspended_action.notify_message_received(result.message_id, result.subject, result.content)
        elsif message[:error]
          suspended_action.notify_not_connected(message[:error])
          break
        end
        sleep 1
      rescue => e
        suspended_action.notify_fatal(e)
        raise e
      end
    end
  end
end

#retrieveObject



41
42
43
44
45
46
47
48
49
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 41

def retrieve
  return @receiver.fetch(TIMEOUT)
rescue => e
  if e.class.name.include? "TransportFailure"
    raise ::Actions::Candlepin::ConnectionError, "failed to connect to #{@url}"
  else
    raise e unless e.class.name.include? NO_MESSAGE_AVAILABLE_ERROR_TYPE
  end
end

#start(suspended_action) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'app/lib/actions/candlepin/candlepin_listening_service.rb', line 51

def start(suspended_action)
  unless @connection.open?
    @connection.open
    @session = @connection.create_session
    @receiver = @session.create_receiver(@address)
  end
  if @connection.open?
    suspended_action.notify_connected
  else
    suspended_action.notify_not_connected("Not Connected")
  end
rescue ::TransportFailure => e
  suspended_action.notify_not_connected(e.message)
end