Class: EventQ::QueueWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/eventq/queue_worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeQueueWorker

Returns a new instance of QueueWorker.



10
11
12
13
14
15
# File 'lib/eventq/queue_worker.rb', line 10

def initialize
  @worker_status = EventQ::WorkerStatus.new
  @is_running = false
  @last_gc_flush = Time.now
  @gc_flush_interval = 10
end

Instance Attribute Details

#is_runningObject

Returns the value of attribute is_running.



7
8
9
# File 'lib/eventq/queue_worker.rb', line 7

def is_running
  @is_running
end

#worker_adapterObject (readonly)

Returns the value of attribute worker_adapter.



8
9
10
# File 'lib/eventq/queue_worker.rb', line 8

def worker_adapter
  @worker_adapter
end

#worker_statusObject (readonly)

Returns the value of attribute worker_status.



8
9
10
# File 'lib/eventq/queue_worker.rb', line 8

def worker_status
  @worker_status
end

Instance Method Details

#call_on_error_block(error:, message: nil) ⇒ Object



259
260
261
# File 'lib/eventq/queue_worker.rb', line 259

def call_on_error_block(error:, message: nil)
  call_block(:on_error_block, error, message)
end

#call_on_retry_block(message) ⇒ Object



267
268
269
# File 'lib/eventq/queue_worker.rb', line 267

def call_on_retry_block(message)
  call_block(:on_retry_block, message)
end

#call_on_retry_exceeded_block(message) ⇒ Object



263
264
265
# File 'lib/eventq/queue_worker.rb', line 263

def call_on_retry_exceeded_block(message)
  call_block(:on_retry_exceeded_block, message)
end

#configure(queue, options = {}) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/eventq/queue_worker.rb', line 207

def configure(queue, options = {})
  # default thread count
  @thread_count = 1
  if options.key?(:thread_count)
    @thread_count = options[:thread_count] if options[:thread_count] > 0
  end

  # default sleep time in seconds
  @sleep = 0
  if options.key?(:sleep)
    EventQ.logger.warn("[#{self.class}] - :sleep is deprecated.")
  end

  @fork_count = 0
  if options.key?(:fork_count)
    @fork_count = options[:fork_count]
  end

  if options.key?(:gc_flush_interval)
    @gc_flush_interval = options[:gc_flush_interval]
  end

  # The default is to block the process where the worker starts.
  # You may not want it to block if an application needs to run multiple things at the same time.
  # Example:  Running a background worker and a web service on the same application.
  @block_process = true
  if options.key?(:block_process)
    @block_process = options[:block_process]
  end

  message_list = [
      "Process Count: #{@fork_count}",
      "Thread Count: #{@thread_count}",
      "Interval Sleep: #{@sleep}",
      "GC Flush Interval: #{@gc_flush_interval}",
      "Block process: #{@block_process}"
  ]
  EventQ.logger.info("[#{self.class}] - Configuring. #{message_list.join(' | ')}")
end

#deserialize_message(payload) ⇒ Object



186
187
188
189
# File 'lib/eventq/queue_worker.rb', line 186

def deserialize_message(payload)
  provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider)
  provider.deserialize(payload)
end

#gc_flushObject



196
197
198
199
200
201
# File 'lib/eventq/queue_worker.rb', line 196

def gc_flush
  if Time.now - last_gc_flush > @gc_flush_interval
    GC.start
    @last_gc_flush = Time.now
  end
end

#last_gc_flushObject



203
204
205
# File 'lib/eventq/queue_worker.rb', line 203

def last_gc_flush
  @last_gc_flush
end

#on_error(&block) ⇒ Object



255
256
257
# File 'lib/eventq/queue_worker.rb', line 255

def on_error(&block)
  @on_error_block = block
end

#on_retry(&block) ⇒ Object



251
252
253
# File 'lib/eventq/queue_worker.rb', line 251

def on_retry(&block)
  @on_retry_block = block
end

#on_retry_exceeded(&block) ⇒ Object



247
248
249
# File 'lib/eventq/queue_worker.rb', line 247

def on_retry_exceeded(&block)
  @on_retry_exceeded_block = block
end

#process_message(block, message, retry_attempts, acceptance_args) ⇒ Symbol, MessageArgs

Method to be called by an adapter. This defines the common logic for processing a message.

Parameters:

  • acceptance_args (Array)

    list of arguments that would be used to accept a message by an adapter.

Returns:

  • (Symbol, MessageArgs)

    :accepted, :duplicate, :reject



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/eventq/queue_worker.rb', line 114

def process_message(block, message, retry_attempts, acceptance_args)
  abort = false
  error = false
  status = nil

  message_args = EventQ::MessageArgs.new(
      type: message.type,
      retry_attempts: retry_attempts,
      context: message.context,
      content_type: message.content_type,
      id: message.id,
      sent: message.created
  )

  EventQ.logger.debug("[#{self.class}] - Message received. Id: #{message.id}. Retry Attempts: #{retry_attempts}")

  if (!EventQ::NonceManager.is_allowed?(message.id))
    EventQ.logger.warn("[#{self.class}] - Duplicate Message received. Id: #{message.id}. Ignoring message.")
    status = :duplicate
    return status, message_args
  end

  # begin worker block for queue message
  begin
    block.call(message.content, message_args)

    if message_args.abort == true
      abort = true
      EventQ.logger.debug("[#{self.class}] - Message aborted. Id: #{message.id}.")
    else
      # accept the message as processed
      status = :accepted
      worker_adapter.acknowledge_message(*acceptance_args)
      EventQ.logger.debug("[#{self.class}] - Message acknowledged. Id: #{message.id}.")
    end
  rescue => e
    EventQ.logger.error do
      "[#{self.class}] - Unhandled error while attempting to process a queue message. Id: #{message.id}. " \
      "Error: #{e.message} #{e.backtrace.join("\n")}"
    end

    error = true
    call_on_error_block(error: e, message: message)
  end

  if error || abort
    EventQ::NonceManager.failed(message.id)
    status = :reject
  else
    EventQ::NonceManager.complete(message.id)
  end

  [status, message_args]
end

#running?Boolean

Returns:

  • (Boolean)


182
183
184
# File 'lib/eventq/queue_worker.rb', line 182

def running?
  @is_running
end

#serialize_message(msg) ⇒ Object



191
192
193
194
# File 'lib/eventq/queue_worker.rb', line 191

def serialize_message(msg)
  provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider)
  provider.serialize(msg)
end

#start(queue, options = {}, &block) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/eventq/queue_worker.rb', line 17

def start(queue, options = {}, &block)
  EventQ.logger.info("[#{self.class}] - Preparing to start listening for messages.")

  # Make sure mandatory options are specified
  mandatory = [:worker_adapter, :client]
  missing = mandatory - options.keys
  raise "[#{self.class}] - Missing options. #{missing} must be specified." unless missing.empty?

  @worker_adapter = options[:worker_adapter]
  worker_adapter.context = self

  raise "[#{self.class}] - Worker is already running." if running?

  configure(queue, options)
  worker_adapter.configure(options)

  queue_name = EventQ.create_queue_name(queue)
  EventQ.logger.info("[#{self.class}] - Listening for messages on queue: #{queue_name}}")

  # Allow the worker to be started on a thread or on the main process.
  # Using the thread won't block the parent process, whereas starting on the main process will.
  if @block_process
    start_worker(block, options, queue)
  else
    Thread.new { start_worker(block, options, queue) }
  end
  @is_running = true
end

#start_process(options, queue, block) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/eventq/queue_worker.rb', line 66

def start_process(options, queue, block)
  %w'INT TERM'.each do |sig|
    Signal.trap(sig) {
      stop
      exit
    }
  end

  # need to set it again since we might be in a fork.
  @is_running = true
  tracker = track_process(Process.pid)

  # Execute any specific adapter worker logic before the threads are launched.
  # This could range from setting instance variables, extra options, etc.
  worker_adapter.pre_process(self, options)

  if @thread_count > 0
    @thread_count.times do
      thr = Thread.new do
        start_thread(queue, options, block)
      end

      # Allow the thread to kill the parent process if an error occurs
      thr.abort_on_exception = true
      track_thread(tracker, thr)
    end
  else
    start_thread(queue, options, block)
  end

  # Only on the main process should you be able to not wait on a thread, otherwise
  # any forked process will just immediately quit
  unless options[:wait] == false && options[:fork_count] == 0
    worker_status.threads.each { |thr| thr.thread.join }
  end
end

#start_thread(queue, options, block) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/eventq/queue_worker.rb', line 103

def start_thread(queue, options, block)
  worker_adapter.thread_process_iteration(queue, options, block)
rescue Exception => e # rubocop:disable Lint/RescueException
  EventQ.logger.error(e)
  call_on_error_block(error: e, message: e.message)
  raise Exceptions::WorkerThreadError, e.message, e.backtrace
end

#start_worker(block, options, queue) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/eventq/queue_worker.rb', line 46

def start_worker(block, options, queue)
  if @fork_count > 0
    @fork_count.times do
      pid = fork do
        start_process(options, queue, block)
      end
      # For the parent worker to know about the list of PIDS of the forks, we have to track them after the fork
      # is created. In a fork the collection would be copied and there is no shared reference between processes.
      # So each fork gets its own copy of the @worker_status variable.
      track_process(pid)
    end

    Process.waitall
  else
    # No need to track process/threads separately as we are in the main parent process,
    # and the logic inside start_process will handle it correctly.
    start_process(options, queue, block)
  end
end

#stopObject



169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/eventq/queue_worker.rb', line 169

def stop
  EventQ.logger.info("[#{self.class}] - Stopping.")
  @is_running = false
  # Need to notify all processes(forks) to stop as well.
  worker_status.pids.each do |pid|
    begin
      Process.kill('TERM', pid) if Process.pid != pid
    rescue Errno::ESRCH
      # Continue on stopping if the process already died and can't be found.
    end
  end
end