Module: Strum::Esb::Handler

Defined in:
lib/strum/esb/handler.rb

Overview

Sneakers helper for Strum

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



10
11
12
13
14
15
16
# File 'lib/strum/esb/handler.rb', line 10

def self.included(base)
  base.class_eval do
    include Sneakers::Worker
  end
  base.extend ClassMethods
  base.use_application_config
end

Instance Method Details

#after_headers_hook(bind) ⇒ Object



159
# File 'lib/strum/esb/handler.rb', line 159

def after_headers_hook(bind); end

#header(metadata, key) ⇒ Object



87
88
89
# File 'lib/strum/esb/handler.rb', line 87

def header(, key)
  ([:headers] && [:headers][key])&.to_s&.gsub(/[^a-zA-Z0-9]/, "_")&.downcase
end

#work_with_params(deserialized_msg, delivery_info, metadata) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/strum/esb/handler.rb', line 91

def work_with_params(deserialized_msg, delivery_info, )
  Strum::Esb.config.before_handler_hooks.each { |hook| hook.call(deserialized_msg, delivery_info, ) }

  action = header(, "action")
  resource = header(, "resource")
  event = header(, "event")
  state = header(, "state")
  info = header(, "info")
  notice = header(, "notice")
  pipeline = Thread.current[:pipeline] = header(, "pipeline")
  pipeline_id = Thread.current[:pipeline_id] = header(, "pipeline-id")

  after_headers_hook(binding)

  methods_names = if action
                    action_handler_methods(action, resource)
                  elsif event
                    event_handler_methods(resource, event, state)
                  elsif info
                    info_handler_methods(info)
                  elsif notice
                    notice_handler_methods(resource, notice)
                  end

  method_name = ([*methods_names] << "handler").find { |n| respond_to?(n, include_all: true) }

  unless method_name
    logger.error "Handler not found. Message rejected #{metadata[:headers]} with payload #{deserialized_msg}"
    return reject!
  end

  content_type = [:content_type] || "application/json"

  unless self.class.content_type_enabled?(content_type)
    logger.error "Content type #{content_type} is disabled by handler. Message rejected #{metadata[:headers]} with payload #{deserialized_msg}"
    return reject!
  end

  args = { queue_method_name: method_name, grpc_service: self.class.protobuf_service, content_type: content_type }

  payload, valid_payload = Strum::Esb.config.serializer.deserialize(deserialized_msg, args: args)

  unless valid_payload
    logger.error "Payload was unable to be parsed with #{content_type} content type. Message rejected #{metadata[:headers]} with payload #{payload}"
    return reject!
  end

  error = nil
  method_params = method(method_name)
                  .parameters
                  .map do |param|
                    _, param_name = param
                    param_name
                  end
                  .then { |m| m & I[action resource event state info pipeline pipeline_id] }
  handler_params = method_params.each_with_object({}) { |i, res| res[i] = eval(i.to_s) }
  logger.info("Handler #{method_name} found. Payload: #{payload}")
  handler_params.count.positive? ? send(method_name, payload, handler_params) : send(method_name, payload)
  logger.info("Handler #{method_name} executed")
  ack!
rescue StandardError => e
  error = e
  logger.error e
  reject!
ensure
  Strum::Esb.config.after_handler_hooks.each { |hook| hook.call(deserialized_msg, delivery_info, , payload, error) }
end