Module: ActionSubscriber::DSL

Included in:
Base
Defined in:
lib/action_subscriber/dsl.rb

Instance Method Summary collapse

Instance Method Details

#_run_action_at_least_once_with_filters(env, action) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/action_subscriber/dsl.rb', line 133

def _run_action_at_least_once_with_filters(env, action)
  processed_acknowledgement = false
  rejected_message = false

  _run_action_with_filters(env, action)

  processed_acknowledgement = env.acknowledge
rescue
  ::ActionSubscriber::MessageRetry.redeliver_message_with_backoff(env)
  processed_acknowledgement = env.acknowledge

  raise
ensure
  rejected_message = env.reject if !processed_acknowledgement

  if !rejected_message && !processed_acknowledgement
    $stdout << <<-UNREJECTABLE
      CANNOT ACKNOWLEDGE OR REJECT THE MESSAGE

      This is a exceptional state for ActionSubscriber to enter and puts the current
      Process in the position of "I can't get new work from RabbitMQ, but also
      can't acknowledge or reject the work that I currently have" ... While rare
      this state can happen.

      Instead of continuing to try to process the message ActionSubscriber is
      sending a Kill signal to the current running process to gracefully shutdown
      so that the RabbitMQ server will purge any outstanding acknowledgements. If
      you are running a process monitoring tool (like Upstart) the Subscriber
      process will be restarted and be able to take on new work.

      ** Running a process monitoring tool like Upstart is recommended for this reason **
    UNREJECTABLE

    Process.kill(:TERM, Process.pid)
  end
end

#_run_action_at_most_once_with_filters(env, action) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/action_subscriber/dsl.rb', line 102

def _run_action_at_most_once_with_filters(env, action)
  processed_acknowledgement = false
  rejected_message = false
  processed_acknowledgement = env.acknowledge

  _run_action_with_filters(env, action)
ensure
  rejected_message = env.reject if !processed_acknowledgement

  if !rejected_message && !processed_acknowledgement
    $stdout << <<-UNREJECTABLE
      CANNOT ACKNOWLEDGE OR REJECT THE MESSAGE

      This is a exceptional state for ActionSubscriber to enter and puts the current
      Process in the position of "I can't get new work from RabbitMQ, but also
      can't acknowledge or reject the work that I currently have" ... While rare
      this state can happen.

      Instead of continuing to try to process the message ActionSubscriber is
      sending a Kill signal to the current running process to gracefully shutdown
      so that the RabbitMQ server will purge any outstanding acknowledgements. If
      you are running a process monitoring tool (like Upstart) the Subscriber
      process will be restarted and be able to take on new work.

      ** Running a process monitoring tool like Upstart is recommended for this reason **
    UNREJECTABLE

    Process.kill(:TERM, Process.pid)
  end
end

#_run_action_with_filters(env, action) ⇒ Object



92
93
94
95
96
97
98
99
100
# File 'lib/action_subscriber/dsl.rb', line 92

def _run_action_with_filters(env, action)
  subscriber_instance = self.new(env)
  final_block = Proc.new { subscriber_instance.public_send(action) }

  first_proc = around_filters.reverse.reduce(final_block) do |block, filter|
    Proc.new { subscriber_instance.send(filter, &block) }
  end
  first_proc.call
end

#acknowledge_messages?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/action_subscriber/dsl.rb', line 21

def acknowledge_messages?
  !!@_acknowledge_messages
end

#around_filter(filter_method) ⇒ Object



25
26
27
# File 'lib/action_subscriber/dsl.rb', line 25

def around_filter(filter_method)
  around_filters << filter_method
end

#around_filtersObject



29
30
31
# File 'lib/action_subscriber/dsl.rb', line 29

def around_filters
  @_around_filters ||= []
end

#at_least_once!Object



3
4
5
6
# File 'lib/action_subscriber/dsl.rb', line 3

def at_least_once!
  @_acknowledge_messages = true
  @_at_least_once = true
end

#at_least_once?Boolean

Returns:

  • (Boolean)


8
9
10
# File 'lib/action_subscriber/dsl.rb', line 8

def at_least_once?
  !!@_at_least_once
end

#at_most_once!Object



12
13
14
15
# File 'lib/action_subscriber/dsl.rb', line 12

def at_most_once!
  @_acknowledge_messages = true
  @_at_most_once = true
end

#at_most_once?Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/action_subscriber/dsl.rb', line 17

def at_most_once?
  !!@_at_most_once
end

#exchange_names(*names) ⇒ Object Also known as: exchange

Explicitly set the name of the exchange



35
36
37
38
39
40
41
42
43
44
# File 'lib/action_subscriber/dsl.rb', line 35

def exchange_names(*names)
  @_exchange_names ||= []
  @_exchange_names += names.flatten.map(&:to_s)

  if @_exchange_names.empty?
    return [ ::ActionSubscriber.config.default_exchange ]
  else
    return @_exchange_names.compact.uniq
  end
end

#manual_acknowledgement!Object



47
48
49
50
# File 'lib/action_subscriber/dsl.rb', line 47

def manual_acknowledgement!
  @_acknowledge_messages = true
  @_manual_acknowedgement = true
end

#manual_acknowledgement?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/action_subscriber/dsl.rb', line 52

def manual_acknowledgement?
  !!@_manual_acknowedgement
end

#no_acknowledgement!Object



56
57
58
# File 'lib/action_subscriber/dsl.rb', line 56

def no_acknowledgement!
  @_acknowledge_messages = false
end

#queue_for(method, queue_name) ⇒ Object

Explicitly set the name of a queue for the given method route

Ex.

queue_for :created, "derp.derp"
queue_for :updated, "foo.bar"


66
67
68
69
# File 'lib/action_subscriber/dsl.rb', line 66

def queue_for(method, queue_name)
  @_queue_names ||= {}
  @_queue_names[method] = queue_name
end

#queue_namesObject



71
72
73
# File 'lib/action_subscriber/dsl.rb', line 71

def queue_names
  @_queue_names ||= {}
end

#remote_application_name(name = nil) ⇒ Object Also known as: publisher



75
76
77
78
# File 'lib/action_subscriber/dsl.rb', line 75

def remote_application_name(name = nil)
  @_remote_application_name = name if name
  @_remote_application_name
end

#routing_key_for(method, routing_key_name) ⇒ Object

Explicitly set the whole routing key to use for a given method route.



83
84
85
86
# File 'lib/action_subscriber/dsl.rb', line 83

def routing_key_for(method, routing_key_name)
  @_routing_key_names ||= {}
  @_routing_key_names[method] = routing_key_name
end

#routing_key_namesObject



88
89
90
# File 'lib/action_subscriber/dsl.rb', line 88

def routing_key_names
  @_routing_key_names ||= {}
end

#run_action_with_filters(env, action) ⇒ Object



170
171
172
173
174
175
176
177
178
179
# File 'lib/action_subscriber/dsl.rb', line 170

def run_action_with_filters(env, action)
  case
  when at_least_once?
    _run_action_at_least_once_with_filters(env, action)
  when at_most_once?
    _run_action_at_most_once_with_filters(env, action)
  else
    _run_action_with_filters(env, action)
  end
end