Class: Jenode::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/jenode/worker.rb

Constant Summary collapse

COMMANDS =
%w(
  pause continue info test set_queue set_sender set_template set_utm_attributes
  set_dont_touch_template set_dkim_key
)
INFO_RESULT_QUEUE_NAME =
"info_result"
TEST_RESULT_QUEUE_NAME =
"test_result"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, ip, number, prefetch) ⇒ Worker

Returns a new instance of Worker.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/jenode/worker.rb', line 15

def initialize(queue_name, ip, number, prefetch)
  @ip            = ip
  @queue_name    = queue_name
  @number        = number
  @prefetch      = prefetch
  @worker_thread = Thread.current
  @stop          = false
  @logger        = Logger.new($stdout)
  @logger.formatter = -> (severity, datetime, progname, msg) {
    date_format = datetime.strftime("%Y-%m-%d %H:%M:%S.%L")
    puts "#{ date_format } - #{ ip } - #{ number } #{ msg }\n"
  }
end

Instance Attribute Details

#consumer_queueObject (readonly)

Returns the value of attribute consumer_queue.



10
11
12
# File 'lib/jenode/worker.rb', line 10

def consumer_queue
  @consumer_queue
end

#ipObject (readonly)

Returns the value of attribute ip.



10
11
12
# File 'lib/jenode/worker.rb', line 10

def ip
  @ip
end

#loggerObject (readonly)

Returns the value of attribute logger.



10
11
12
# File 'lib/jenode/worker.rb', line 10

def logger
  @logger
end

#numberObject (readonly)

Returns the value of attribute number.



10
11
12
# File 'lib/jenode/worker.rb', line 10

def number
  @number
end

#prefetchObject (readonly)

Returns the value of attribute prefetch.



10
11
12
# File 'lib/jenode/worker.rb', line 10

def prefetch
  @prefetch
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



10
11
12
# File 'lib/jenode/worker.rb', line 10

def queue_name
  @queue_name
end

#stopObject (readonly)

Returns the value of attribute stop.



10
11
12
# File 'lib/jenode/worker.rb', line 10

def stop
  @stop
end

#worker_threadObject (readonly)

Returns the value of attribute worker_thread.



10
11
12
# File 'lib/jenode/worker.rb', line 10

def worker_thread
  @worker_thread
end

Instance Method Details

#channelObject



29
30
31
32
33
34
35
# File 'lib/jenode/worker.rb', line 29

def channel
  Thread.current[:channel] ||= begin
    channel = $queue_connection.create_channel
    channel.prefetch = prefetch
    channel
  end
end

#continue(job) ⇒ Object



60
61
62
63
# File 'lib/jenode/worker.rb', line 60

def continue(job)
  @stop = false
  worker_thread.run
end

#exchangeObject



37
38
39
40
41
# File 'lib/jenode/worker.rb', line 37

def exchange
  Thread.current[:exchange] ||= begin
    channel.default_exchange
  end
end

#info(job) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/jenode/worker.rb', line 43

def info(job)
  body = MultiJson.dump(
    ip: ip,
    number: number,
    queue_name: queue_name,
    stop: stop,
    sender: EmailTask.sender,
    utm_attributes: EmailTask.utm_attributes,
    dont_touch_template: EmailTask.dont_touch_template
  )
  exchange.publish(body, routing_key: INFO_RESULT_QUEUE_NAME)
end

#pause(job) ⇒ Object



56
57
58
# File 'lib/jenode/worker.rb', line 56

def pause(job)
  @stop = true
end

#runObject



147
148
149
150
151
152
153
# File 'lib/jenode/worker.rb', line 147

def run
  logger.info("Run jenode")
  subscribe_on_command_queue
  loop do
    subscribe_on_queue
  end
end

#set_dkim_key(job) ⇒ Object



88
89
90
# File 'lib/jenode/worker.rb', line 88

def set_dkim_key(job)
  EmailTask.dkim_key = OpenSSL::PKey::RSA.new(job.payload['data'])
end

#set_dont_touch_template(job) ⇒ Object



80
81
82
# File 'lib/jenode/worker.rb', line 80

def set_dont_touch_template(job)
  EmailTask.dont_touch_template = job.payload['data']
end

#set_queue(job) ⇒ Object



65
66
67
68
69
70
# File 'lib/jenode/worker.rb', line 65

def set_queue(job)
  new_queue_name = job.payload['data']
  return if queue_name == new_queue_name
  @queue_name = new_queue_name
  consumer_queue.cancel
end

#set_sender(job) ⇒ Object



72
73
74
# File 'lib/jenode/worker.rb', line 72

def set_sender(job)
  EmailTask.sender = job.payload['data']
end

#set_template(job) ⇒ Object



84
85
86
# File 'lib/jenode/worker.rb', line 84

def set_template(job)
  EmailTask.template = ERB.new(job.payload['data'])
end

#set_utm_attributes(job) ⇒ Object



76
77
78
# File 'lib/jenode/worker.rb', line 76

def set_utm_attributes(job)
  EmailTask.utm_attributes = job.payload['data']
end

#subscribe_on_command_queueObject



111
112
113
114
115
116
117
118
119
120
# File 'lib/jenode/worker.rb', line 111

def subscribe_on_command_queue
  command_queue = channel.queue("workers.#{ ip.gsub('.', '_') }.#{ number }", exclusive: true)
  consumer_command_queue = command_queue.build_consumer(block: false) do |, payload|
    logger.info("Recived cmd=#{ payload }")
    job = Jenode::Job.new(payload, )
    cmd = job.payload['cmd']
    send(cmd, job)
  end
  command_queue.subscribe_with(consumer_command_queue)
end

#subscribe_on_queueObject



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/jenode/worker.rb', line 122

def subscribe_on_queue
  queue = channel.queue(queue_name, durable: true)
  @consumer_queue = queue.build_consumer(block: true) do |, payload|
    begin
      logger.info("Recived email_task=#{ payload }")
      job = Job.new(payload, )
      if stop
        logger.info("Stop")
        job.reject
        Thread.stop
        logger.info("Run")
      else
        email_task = EmailTask.new(ip, job)
        email_task.perform
      end
    rescue Exception => e
      logger.info("Error: #{ e.message }")
      logger.info("Error: #{ e.backtrace.join("\n") }")

      job.reject
    end
  end
  queue.subscribe_with(@consumer_queue, manual_ack: true)
end

#test(job) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/jenode/worker.rb', line 92

def test(job)
  begin
    email = EmailTask.generate_email_object(job.payload['data'], ip)
    email.send
  rescue Exception => e
    error_message   = e.message
    error_backtrace = e.backtrace
  end

  body = MultiJson.dump(
    ip: ip,
    number: number,
    job: job.payload,
    error_message: error_message,
    error_backtrace: error_backtrace
  )
  exchange.publish(body, routing_key: TEST_RESULT_QUEUE_NAME)
end