Class: Strum::Esb::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/strum/esb/message.rb

Overview

Strum Message

Direct Known Subclasses

Action, Event, Info, Notice

Constant Summary collapse

FROZEN_EXCHANGE_OPTIONS =
{ durable: true }.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(exchange:, headers:, payload:, exchange_options: {}, **args) ⇒ Message

Returns a new instance of Message.



28
29
30
31
32
33
34
35
36
# File 'lib/strum/esb/message.rb', line 28

def initialize(exchange:, headers:, payload:, exchange_options: {}, **args)
  @exchange         = exchange
  @headers          = headers
  @payload          = payload
  @args             = args # { content_type: "application/x-protobuf", message_class: Messages::ActionGetUser }
  # Sneakers::CONFIG[:exchange_options] = { type: "headers", durable: true, auto_delete: false, arguments: {} }
  @exchange_options = Sneakers::CONFIG[:exchange_options].merge(exchange_options).merge(FROZEN_EXCHANGE_OPTIONS)
  @rabbit_channel   = Strum::Esb.config.rabbit_channel_pool
end

Instance Attribute Details

#argsObject (readonly)

Returns the value of attribute args.



21
22
23
# File 'lib/strum/esb/message.rb', line 21

def args
  @args
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



21
22
23
# File 'lib/strum/esb/message.rb', line 21

def exchange
  @exchange
end

#exchange_optionsObject (readonly)

Returns the value of attribute exchange_options.



21
22
23
# File 'lib/strum/esb/message.rb', line 21

def exchange_options
  @exchange_options
end

#headersObject (readonly)

Returns the value of attribute headers.



21
22
23
# File 'lib/strum/esb/message.rb', line 21

def headers
  @headers
end

#payloadObject (readonly)

Returns the value of attribute payload.



21
22
23
# File 'lib/strum/esb/message.rb', line 21

def payload
  @payload
end

#rabbit_channelObject (readonly)

Returns the value of attribute rabbit_channel.



21
22
23
# File 'lib/strum/esb/message.rb', line 21

def rabbit_channel
  @rabbit_channel
end

Class Method Details

.publish(exchange:, headers:, payload:, exchange_options: {}, **args) ⇒ Object



10
11
12
13
14
15
16
17
18
# File 'lib/strum/esb/message.rb', line 10

def publish(exchange:, headers:, payload:, exchange_options: {}, **args)
  new(
    exchange: exchange,
    headers: headers,
    payload: payload,
    exchange_options: exchange_options,
    **args
  ).publish
end

Instance Method Details

#extend_headers(properties) ⇒ Object



57
58
59
60
61
# File 'lib/strum/esb/message.rb', line 57

def extend_headers(properties)
  properties[:headers] = {} unless properties[:headers].is_a?(Hash)
  properties[:headers]["pipeline"] ||= Thread.current[:pipeline] if Thread.current[:pipeline]
  properties[:headers]["pipeline-id"] ||= Thread.current[:pipeline_id] if Thread.current[:pipeline_id]
end

#prepare_publication_optionsObject



46
47
48
49
50
51
52
53
54
55
# File 'lib/strum/esb/message.rb', line 46

def prepare_publication_options
  properties = { headers: headers, content_type: args.fetch(:content_type, "application/json") }

  Strum::Esb.config.before_publish_hooks.each { |hook| hook.call(payload, properties) }
  extend_headers(properties)
  args.merge!(properties)

  # [serialized_payload, valid, properties]
  [*Strum::Esb.config.serializer.serialize(payload, args: args), properties]
end

#publishObject



38
39
40
41
42
43
44
# File 'lib/strum/esb/message.rb', line 38

def publish
  rabbit_exchange = rabbit_channel.with { |rabbit_channel| rabbit_channel.headers(exchange, exchange_options) }
  payload, valid_payload, properties = prepare_publication_options
  return unless valid_payload

  rabbit_exchange.publish(payload, **properties)
end