Class: Magellan::Worker::Core
- Inherits:
-
Object
- Object
- Magellan::Worker::Core
- Defined in:
- lib/magellan/worker/core.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
-
#initialize ⇒ Core
constructor
A new instance of Core.
- #initialize! ⇒ Object
- #open_channel ⇒ Object
- #parse_body(payload) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #wait_reconnect ⇒ Object
Constructor Details
#initialize ⇒ Core
Returns a new instance of Core.
14 15 16 17 |
# File 'lib/magellan/worker/core.rb', line 14 def initialize() # ワーカーの設定ファイルを読み込む @config = Magellan::Worker::Config.load_config end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
12 13 14 |
# File 'lib/magellan/worker/core.rb', line 12 def config @config end |
Instance Method Details
#initialize! ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/magellan/worker/core.rb', line 19 def initialize! # 設定ファイルからRabbitMQへの接続設定を作成 connection_settings = { host: @config[:host], port: @config[:port], vhost: @config[:vhost], user: @config[:rabbitmq_user], pass: @config[:rabbitmq_password], timeout: 0.3, heartbeat_interval: 60, } @conn = Bunny.new(connection_settings) @conn.start open_channel @shutdown = false self end |
#open_channel ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/magellan/worker/core.rb', line 41 def open_channel @channel = @conn.create_channel queue_name = @config[:request_queue] exchange_name = @config[:response_exchange] # no_declareを設定しない場合、キューやエクスチェンジに対してdeclareを実行してしまい、 # Acess Contorolにひっかかりエラーとなります # queueとexchange及びbindはmagellan-conductorによって作成済みの想定です @queue = @channel.queue(queue_name, no_declare: true, durable: true) @exchange = @channel.exchange(exchange_name, no_declare: true, durable: true) @executor = Magellan::Worker::Executor.new(@exchange) end |
#parse_body(payload) ⇒ Object
123 124 125 126 127 128 129 |
# File 'lib/magellan/worker/core.rb', line 123 def parse_body(payload) begin MessagePack.unpack(payload) rescue MessagePack::UnpackError JSON.parse(payload) end end |
#run ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/magellan/worker/core.rb', line 75 def run # reset for re-run @shutdown = false # handle SIGTERM from `docker stop` @original_trap_handler = Signal.trap(:TERM) do shutdown end Magellan.logger.info("====== Magellan Worker start ======") until @shutdown @queue.subscribe(block: true, manual_ack: true) do |delivery_info, properties, payload| reply_to = properties.reply_to # ワーカー実行結果返却時のルーティングキー correlation_id = properties.correlation_id # ワーカー実行結果返却時にどのリクエストに対応するメッセージか判別するための識別子 delivery_tag = delivery_info.delivery_tag # ackを返却時に使用 # ワーカーロジック実行中に落ちたとしてもRabbitMQから再送させないために # メッセージを取得した直後にackを返します # 第1引数: ackを返す対象のメッセージを指定 # 第2引数: ackを返す対象以前のメッセージもackを返すか # true: 指定したメッセージ以前に受信したメッセージ全てが対象 # false: 指定したメッセージのみが対象 @channel.basic_ack(delivery_tag, false) begin = parse_body(payload) @executor.execute(reply_to, correlation_id, delivery_tag, ) rescue Magellan.logger.error("Magellan Worker request execution error: #{$!}\n" + $@.join("\n")) end if @shutdown delivery_info.consumer.cancel end end # AMQPの接続が切れると Queue#subscribe から処理が帰るので再接続を待つ break if @shutdown unless wait_reconnect Magellan.logger.info("magellan-rails: AMQP reconnect timeout") break end end Magellan.logger.info("====== Magellan Worker finished ======") ensure if @original_trap_handler Signal.trap(:TERM, @original_trap_handler) end end |
#shutdown ⇒ Object
56 57 58 59 |
# File 'lib/magellan/worker/core.rb', line 56 def shutdown @channel && @channel.work_pool.shutdown @shutdown = true end |
#wait_reconnect ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/magellan/worker/core.rb', line 61 def wait_reconnect limit = Time.now + @config[:amqp_reconnect_timeout] while Time.now <= limit and not(@shutdown) sleep 1.0 if @channel.connection.status == :open @channel.close rescue nil open_channel Magellan.logger.info("== AMQP connection recovered ==") return true end end return false end |