Class: Actions::Candlepin::ListenOnCandlepinEvents
- Inherits:
-
Base
- Object
- Base
- Actions::Candlepin::ListenOnCandlepinEvents
show all
- Defined in:
- app/lib/actions/candlepin/listen_on_candlepin_events.rb
Defined Under Namespace
Classes: RunOnceCoordinatorLock
Constant Summary
collapse
- Connected =
Algebrick.atom
- Reconnect =
Algebrick.type do
fields! message: String
end
- Event =
Algebrick.type do
fields! message_id: String, subject: String, content: String
end
- Fatal =
Algebrick.type do
fields! backtrace: String, message: String, kind: String
end
- Close =
Algebrick.atom
Class Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Class Attribute Details
.triggered_action ⇒ Object
Returns the value of attribute triggered_action.
55
56
57
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 55
def triggered_action
@triggered_action
end
|
Class Method Details
.ensure_running(world = ForemanTasks.dynflow.world) ⇒ Object
57
58
59
60
61
62
63
64
65
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 57
def ensure_running(world = ForemanTasks.dynflow.world)
world.coordinator.acquire(RunOnceCoordinatorLock.new(world)) do
unless ForemanTasks::Task::DynflowTask.for_action(self).running.any?
@triggered_action = ForemanTasks.trigger(self)
end
end
rescue Dynflow::Coordinator::LockError
return false
end
|
Instance Method Details
#act_on_event(event) ⇒ Object
180
181
182
183
184
185
186
187
188
189
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 180
def act_on_event(event)
begin
output[:connection] = "Connected"
on_event(event)
rescue => e
error!(e)
raise e
end
suspend
end
|
#close_service ⇒ Object
226
227
228
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 226
def close_service
CandlepinListeningService.close
end
|
#close_service_with_error(event) ⇒ Object
108
109
110
111
112
113
114
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 108
def close_service_with_error(event)
error = Exception.new("#{event.kind}: #{event.message}")
error.set_backtrace([event.backtrace])
error!(error)
ensure
close_service
end
|
191
192
193
194
195
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 191
def configured?
SETTINGS[:katello].key?(:qpid) &&
SETTINGS[:katello][:qpid].key?(:url) &&
SETTINGS[:katello][:qpid].key?(:subscriptions_queue_address)
end
|
#connect_listening_service(event) ⇒ Object
116
117
118
119
120
121
122
123
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 116
def connect_listening_service(event)
CandlepinListeningService.instance.close
output[:error] = event.message
suspend do |suspended_action|
CandlepinListeningService.instance.start(SuspendedAction.new(suspended_action))
end
end
|
#error_message(error_id) ⇒ Object
222
223
224
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 222
def error_message(error_id)
CandlepinListeningService.instance.errors[error_id]
end
|
#finish_service ⇒ Object
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 153
def finish_service
log_prefix = "Finishing #{self.class.name} #{task.id}"
action_logger.info(log_prefix)
suspended_action.ask(Close).wait
if self.class.triggered_action
self.class.triggered_action.finished.wait
else
max_attempts = 10
(1..max_attempts).each do |attempt|
task.reload
if !task.pending? || task.paused?
break
else
action_logger.info("#{log_prefix}: attempt #{attempt}/#{max_attempts}")
if attempt == max_attempts
action_logger.info("#{log_prefix} failed, skipping")
else
sleep 1
end
end
end
end
end
|
#initialize_listening_service(suspended_action) ⇒ Object
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 197
def initialize_listening_service(suspended_action)
if configured?
CandlepinListeningService.initialize(world.logger,
SETTINGS[:katello][:qpid][:url],
SETTINGS[:katello][:qpid][:subscriptions_queue_address])
suspended_action.notify_not_connected("initialized...have not connected yet")
else
action_logger.error("katello has not been configured for qpid.url and qpid.subscriptions_queue_address")
suspended_action.notify_finished
end
rescue => e
Rails.logger.error(e.message)
Rails.logger.error(e.backtrace)
error!(e)
end
|
#initialize_service ⇒ Object
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 133
def initialize_service
output[:messages] = 0
output[:last_message] = nil
suspend do |suspended_action|
begin
initialize_listening_service(SuspendedAction.new(suspended_action))
rescue => e
error!(e)
raise e
end
unless Rails.env.test?
world.before_termination do
finish_service
end
end
end
end
|
#on_event(event) ⇒ Object
213
214
215
216
217
218
219
220
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 213
def on_event(event)
Actions::Candlepin::ImportPoolHandler.new(Rails.logger).handle(event)
output[:last_message] = "#{event.message_id} - #{event.subject}"
output[:messages] = event.message_id
rescue => e
close_service
error!(e)
end
|
#plan ⇒ Object
68
69
70
71
72
73
74
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 68
def plan
if already_running?
fail "Action #{self.class.name} is already active"
end
plan_self
end
|
#poll_listening_service(_event) ⇒ Object
125
126
127
128
129
130
131
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 125
def poll_listening_service(_event)
output[:connection] = "Connected"
output[:error] = nil
suspend do |suspended_action|
CandlepinListeningService.instance.poll_for_messages(SuspendedAction.new(suspended_action))
end
end
|
#run(event = nil) ⇒ Object
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'app/lib/actions/candlepin/listen_on_candlepin_events.rb', line 76
def run(event = nil)
match(event,
(on nil do
initialize_service
end),
(on Reconnect do
connect_listening_service(event)
end),
(on Connected do
poll_listening_service(event)
end),
(on Event do
act_on_event(event)
end),
(on Close | Dynflow::Action::Cancellable::Cancel do
close_service
end),
(on Fatal do
close_service_with_error(event)
end),
(on Dynflow::Action::Skip do
end))
rescue => e
action_logger.error(e.message)
close_service
error!(e)
end
|