Class: Jenode::Worker
- Inherits:
-
Object
- Object
- Jenode::Worker
- Defined in:
- lib/jenode/worker.rb
Constant Summary collapse
- COMMANDS =
%w( pause continue info test set_queue set_sender set_template set_utm_attributes set_dont_touch_template set_dkim_key )- INFO_RESULT_QUEUE_NAME =
"info_result"- TEST_RESULT_QUEUE_NAME =
"test_result"
Instance Attribute Summary collapse
-
#consumer_queue ⇒ Object
readonly
Returns the value of attribute consumer_queue.
-
#ip ⇒ Object
readonly
Returns the value of attribute ip.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#number ⇒ Object
readonly
Returns the value of attribute number.
-
#prefetch ⇒ Object
readonly
Returns the value of attribute prefetch.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#stop ⇒ Object
readonly
Returns the value of attribute stop.
-
#worker_thread ⇒ Object
readonly
Returns the value of attribute worker_thread.
Instance Method Summary collapse
- #channel ⇒ Object
- #continue(job) ⇒ Object
- #exchange ⇒ Object
- #info(job) ⇒ Object
-
#initialize(queue_name, ip, number, prefetch) ⇒ Worker
constructor
A new instance of Worker.
- #pause(job) ⇒ Object
- #run ⇒ Object
- #set_dkim_key(job) ⇒ Object
- #set_dont_touch_template(job) ⇒ Object
- #set_queue(job) ⇒ Object
- #set_sender(job) ⇒ Object
- #set_template(job) ⇒ Object
- #set_utm_attributes(job) ⇒ Object
- #subscribe_on_command_queue ⇒ Object
- #subscribe_on_queue ⇒ Object
- #test(job) ⇒ Object
Constructor Details
#initialize(queue_name, ip, number, prefetch) ⇒ Worker
Returns a new instance of Worker.
15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/jenode/worker.rb', line 15 def initialize(queue_name, ip, number, prefetch) @ip = ip @queue_name = queue_name @number = number @prefetch = prefetch @worker_thread = Thread.current @stop = false @logger = Logger.new($stdout) @logger.formatter = -> (severity, datetime, progname, msg) { date_format = datetime.strftime("%Y-%m-%d %H:%M:%S.%L") puts "#{ date_format } - #{ ip } - #{ number } #{ msg }\n" } end |
Instance Attribute Details
#consumer_queue ⇒ Object (readonly)
Returns the value of attribute consumer_queue.
10 11 12 |
# File 'lib/jenode/worker.rb', line 10 def consumer_queue @consumer_queue end |
#ip ⇒ Object (readonly)
Returns the value of attribute ip.
10 11 12 |
# File 'lib/jenode/worker.rb', line 10 def ip @ip end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
10 11 12 |
# File 'lib/jenode/worker.rb', line 10 def logger @logger end |
#number ⇒ Object (readonly)
Returns the value of attribute number.
10 11 12 |
# File 'lib/jenode/worker.rb', line 10 def number @number end |
#prefetch ⇒ Object (readonly)
Returns the value of attribute prefetch.
10 11 12 |
# File 'lib/jenode/worker.rb', line 10 def prefetch @prefetch end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
10 11 12 |
# File 'lib/jenode/worker.rb', line 10 def queue_name @queue_name end |
#stop ⇒ Object (readonly)
Returns the value of attribute stop.
10 11 12 |
# File 'lib/jenode/worker.rb', line 10 def stop @stop end |
#worker_thread ⇒ Object (readonly)
Returns the value of attribute worker_thread.
10 11 12 |
# File 'lib/jenode/worker.rb', line 10 def worker_thread @worker_thread end |
Instance Method Details
#channel ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/jenode/worker.rb', line 29 def channel Thread.current[:channel] ||= begin channel = $queue_connection.create_channel channel.prefetch = prefetch channel end end |
#continue(job) ⇒ Object
60 61 62 63 |
# File 'lib/jenode/worker.rb', line 60 def continue(job) @stop = false worker_thread.run end |
#exchange ⇒ Object
37 38 39 40 41 |
# File 'lib/jenode/worker.rb', line 37 def exchange Thread.current[:exchange] ||= begin channel.default_exchange end end |
#info(job) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/jenode/worker.rb', line 43 def info(job) body = MultiJson.dump( ip: ip, number: number, queue_name: queue_name, stop: stop, sender: EmailTask.sender, utm_attributes: EmailTask.utm_attributes, dont_touch_template: EmailTask.dont_touch_template ) exchange.publish(body, routing_key: INFO_RESULT_QUEUE_NAME) end |
#pause(job) ⇒ Object
56 57 58 |
# File 'lib/jenode/worker.rb', line 56 def pause(job) @stop = true end |
#run ⇒ Object
147 148 149 150 151 152 153 |
# File 'lib/jenode/worker.rb', line 147 def run logger.info("Run jenode") subscribe_on_command_queue loop do subscribe_on_queue end end |
#set_dkim_key(job) ⇒ Object
88 89 90 |
# File 'lib/jenode/worker.rb', line 88 def set_dkim_key(job) EmailTask.dkim_key = OpenSSL::PKey::RSA.new(job.payload['data']) end |
#set_dont_touch_template(job) ⇒ Object
80 81 82 |
# File 'lib/jenode/worker.rb', line 80 def set_dont_touch_template(job) EmailTask.dont_touch_template = job.payload['data'] end |
#set_queue(job) ⇒ Object
65 66 67 68 69 70 |
# File 'lib/jenode/worker.rb', line 65 def set_queue(job) new_queue_name = job.payload['data'] return if queue_name == new_queue_name @queue_name = new_queue_name consumer_queue.cancel end |
#set_sender(job) ⇒ Object
72 73 74 |
# File 'lib/jenode/worker.rb', line 72 def set_sender(job) EmailTask.sender = job.payload['data'] end |
#set_template(job) ⇒ Object
84 85 86 |
# File 'lib/jenode/worker.rb', line 84 def set_template(job) EmailTask.template = ERB.new(job.payload['data']) end |
#set_utm_attributes(job) ⇒ Object
76 77 78 |
# File 'lib/jenode/worker.rb', line 76 def set_utm_attributes(job) EmailTask.utm_attributes = job.payload['data'] end |
#subscribe_on_command_queue ⇒ Object
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/jenode/worker.rb', line 111 def subscribe_on_command_queue command_queue = channel.queue("workers.#{ ip.gsub('.', '_') }.#{ number }", exclusive: true) consumer_command_queue = command_queue.build_consumer(block: false) do |, payload| logger.info("Recived cmd=#{ payload }") job = Jenode::Job.new(payload, ) cmd = job.payload['cmd'] send(cmd, job) end command_queue.subscribe_with(consumer_command_queue) end |
#subscribe_on_queue ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/jenode/worker.rb', line 122 def subscribe_on_queue queue = channel.queue(queue_name, durable: true) @consumer_queue = queue.build_consumer(block: true) do |, payload| begin logger.info("Recived email_task=#{ payload }") job = Job.new(payload, ) if stop logger.info("Stop") job.reject Thread.stop logger.info("Run") else email_task = EmailTask.new(ip, job) email_task.perform end rescue Exception => e logger.info("Error: #{ e.message }") logger.info("Error: #{ e.backtrace.join("\n") }") job.reject end end queue.subscribe_with(@consumer_queue, manual_ack: true) end |
#test(job) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/jenode/worker.rb', line 92 def test(job) begin email = EmailTask.generate_email_object(job.payload['data'], ip) email.send rescue Exception => e = e. error_backtrace = e.backtrace end body = MultiJson.dump( ip: ip, number: number, job: job.payload, error_message: , error_backtrace: error_backtrace ) exchange.publish(body, routing_key: TEST_RESULT_QUEUE_NAME) end |