Class: Magellan::Worker::Core
- Inherits:
-
Object
- Object
- Magellan::Worker::Core
- Defined in:
- lib/magellan/worker/core.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
-
#initialize ⇒ Core
constructor
A new instance of Core.
- #initialize! ⇒ Object
- #parse_body(payload) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize ⇒ Core
Returns a new instance of Core.
14 15 16 17 |
# File 'lib/magellan/worker/core.rb', line 14 def initialize() # ワーカーの設定ファイルを読み込む @config = Magellan::Worker::Config.load_config end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
12 13 14 |
# File 'lib/magellan/worker/core.rb', line 12 def config @config end |
Instance Method Details
#initialize! ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/magellan/worker/core.rb', line 19 def initialize! # 設定ファイルからRabbitMQへの接続設定を作成 connection_settings = { host: @config[:host], port: @config[:port], vhost: @config[:vhost], user: @config[:rabbitmq_user], pass: @config[:rabbitmq_password], timeout: 0.3 } @conn = Bunny.new(connection_settings) @conn.start @channel = @conn.create_channel queue_name = @config[:request_queue] exchange_name = @config[:response_exchange] # no_declareを設定しない場合、キューやエクスチェンジに対してdeclareを実行してしまい、 # Acess Contorolにひっかかりエラーとなります # queueとexchange及びbindはmagellan-conductorによって作成済みの想定です @queue = @channel.queue(queue_name, no_declare: true) @exchange = @channel.exchange(exchange_name, no_declare: true) @executor = Magellan::Worker::Executor.new(@exchange) @shutdown = false self end |
#parse_body(payload) ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/magellan/worker/core.rb', line 95 def parse_body(payload) begin MessagePack.unpack(payload) rescue MessagePack::UnpackError JSON.parse(payload) end end |
#run ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/magellan/worker/core.rb', line 55 def run # reset for re-run @shutdown = false # handle SIGTERM from `docker stop` @original_trap_handler = Signal.trap(:TERM) do shutdown end Magellan.logger.info("====== Magellan Worker start ======") @queue.subscribe(block: true, manual_ack: true) do |delivery_info, properties, payload| reply_to = properties.reply_to # ワーカー実行結果返却時のルーティングキー correlation_id = properties.correlation_id # ワーカー実行結果返却時にどのリクエストに対応するメッセージか判別するための識別子 delivery_tag = delivery_info.delivery_tag # ackを返却時に使用 # ワーカーロジック実行中に落ちたとしてもRabbitMQから再送させないために # メッセージを取得した直後にackを返します # 第1引数: ackを返す対象のメッセージを指定 # 第2引数: ackを返す対象以前のメッセージもackを返すか # true: 指定したメッセージ以前に受信したメッセージ全てが対象 # false: 指定したメッセージのみが対象 @channel.basic_ack(delivery_tag, false) begin = parse_body(payload) @executor.execute(reply_to, correlation_id, delivery_tag, ) rescue Magellan.logger.error("Magellan Worker request execution error: #{$!}\n" + $@.join("\n")) end if @shutdown delivery_info.consumer.cancel end end Magellan.logger.info("====== Magellan Worker finished ======") ensure if @original_trap_handler Signal.trap(:TERM, @original_trap_handler) end end |
#shutdown ⇒ Object
50 51 52 53 |
# File 'lib/magellan/worker/core.rb', line 50 def shutdown @channel && @channel.work_pool.shutdown @shutdown = true end |