Class: Pwwka::Transmitter

Inherits:
Object
  • Object
show all
Extended by:
Logging
Includes:
Logging
Defined in:
lib/pwwka/transmitter.rb

Overview

Primary interface for sending messages.

Example:

# Send a message, blowing up if there's any problem
Pwwka::Transmitter.send_message!({ user_id: @user.id }, "users.user.activated")

# Send a message, logging if there's any problem
Pwwka::Transmitter.send_message_safely({ user_id: @user.id }, "users.user.activated")

Constant Summary collapse

DEFAULT_DELAY_BY_MS =
5000

Constants included from Logging

Logging::LEVELS

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

logf, logger

Constructor Details

#initialize(channel_connector: nil) ⇒ Transmitter

Returns a new instance of Transmitter.



29
30
31
32
33
34
35
36
37
# File 'lib/pwwka/transmitter.rb', line 29

def initialize(channel_connector: nil)
  if channel_connector
    @caller_manages_connector = true
    @channel_connector = channel_connector
  else
    @caller_manages_connector = false
    @channel_connector = ChannelConnector.new(connection_name: "p: #{Pwwka.configuration.app_id} #{Pwwka.configuration.process_name}".strip)
  end
end

Instance Attribute Details

#caller_manages_connectorObject (readonly)

Returns the value of attribute caller_manages_connector.



26
27
28
# File 'lib/pwwka/transmitter.rb', line 26

def caller_manages_connector
  @caller_manages_connector
end

#channel_connectorObject (readonly)

Returns the value of attribute channel_connector.



27
28
29
# File 'lib/pwwka/transmitter.rb', line 27

def channel_connector
  @channel_connector
end

Class Method Details

.send_message!(payload, routing_key, on_error: :raise, delayed: false, delay_by: nil, type: nil, message_id: :auto_generate, headers: nil, channel_connector: nil) ⇒ Object

Send an important message that must go through. This method allows any raised exception to pass through.

payload

Hash of what you’d like to include in your message

routing_key

String routing key for the message

delayed

Boolean send this message later

delay_by

Integer milliseconds to delay the message

type

A string describing the type. This + your configured app_id should be unique to your entire ecosystem.

message_id

If specified (which generally you should not do), sets the id of the message. If omitted, a GUID is used.

headers

A hash of arbitrary headers to include in the AMQP attributes

on_error

What is the behavior of

  • :ignore (aka as send_message_safely)

  • :raise

  • :resque – use Resque to try to send the message later

  • :retry_async – use the configured background job processor to retry sending the message later

Returns true

Raises any exception generated by the innerworkings of this library.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/pwwka/transmitter.rb', line 58

def self.send_message!(payload, routing_key,
                       on_error: :raise,
                       delayed: false,
                       delay_by: nil,
                       type: nil,
                       message_id: :auto_generate,
                       headers: nil,
                       channel_connector: nil
                      )
  if delayed
    new(channel_connector: channel_connector).send_delayed_message!(*[payload, routing_key, delay_by].compact, type: type, headers: headers, message_id: message_id)
  else
    new(channel_connector: channel_connector).send_message!(payload, routing_key, type: type, headers: headers, message_id: message_id)
  end
  logf "AFTER Transmitting Message on %{routing_key} -> %{payload}",routing_key: routing_key, payload: payload
  true
rescue => e

  logf "ERROR Transmitting Message on %{routing_key} -> %{payload} : %{error}", routing_key: routing_key, payload: payload, error: e, at: :error

  case on_error

    when :raise
      raise e

    when :resque, :retry_async
      begin
        send_message_async(payload, routing_key, delay_by_ms: delayed ? delay_by || DEFAULT_DELAY_BY_MS : 0)
      rescue => exception
        warn(exception.message)
        raise e
      end

    else # ignore
  end
  false
end

.send_message_async(payload, routing_key, delay_by_ms: 0, type: nil, message_id: :auto_generate, headers: nil) ⇒ Object

Enqueue the message with the configured background processor.

  • :delay_by_ms

    Integer milliseconds to delay the message. Default is 0.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/pwwka/transmitter.rb', line 98

def self.send_message_async(payload, routing_key,
                            delay_by_ms: 0,
                            type: nil,
                            message_id: :auto_generate,
                            headers: nil)
  background_job_processor = Pwwka.configuration.background_job_processor
  job = Pwwka.configuration.async_job_klass

  if background_job_processor == :resque
    resque_args = [job, payload, routing_key]

    unless type == nil && message_id == :auto_generate && headers == nil
      # NOTE: (jdlubrano)
      # Why can't we pass these options all of the time?  Well, if a user
      # of pwwka has configured their own async_job_klass that only has an
      # arity of 2 (i.e. payload and routing key), then passing these options
      # as an additional argument would break the user's application.  In
      # order to maintain compatibility with preceding versions of Pwwka,
      # we need to ensure that the same arguments passed into this method
      # result in compatible calls to enqueue any Resque jobs.
      resque_args << { type: type, message_id: message_id, headers: headers }
    end

    if delay_by_ms.zero?
      Resque.enqueue(*resque_args)
    else
      Resque.enqueue_in(delay_by_ms/1000, *resque_args)
    end
  elsif background_job_processor == :sidekiq
    options = { delay_by_ms: delay_by_ms, type: type, message_id: message_id, headers: headers }
    job.perform_async(payload, routing_key, options)
  end
end

.send_message_safely(payload, routing_key, delayed: false, delay_by: nil, message_id: :auto_generate) ⇒ Object

Deprecated.

This is ignoring a message. ::send_message supports this explicitly.

Send a less important message that doesn’t have to go through. This eats any ‘StandardError` and logs it, returning false rather than blowing up.

payload

Hash of what you’d like to include in your message

routing_key

String routing key for the message

delayed

Boolean send this message later

delay_by

Integer milliseconds to delay the message

Returns true if the message was sent, false otherwise



142
143
144
# File 'lib/pwwka/transmitter.rb', line 142

def self.send_message_safely(payload, routing_key, delayed: false, delay_by: nil, message_id: :auto_generate)
  send_message!(payload, routing_key, delayed: delayed, delay_by: delay_by, on_error: :ignore)
end

Instance Method Details

#send_delayed_message!(payload, routing_key, delay_by = DEFAULT_DELAY_BY_MS, type: nil, headers: nil, message_id: :auto_generate) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/pwwka/transmitter.rb', line 165

def send_delayed_message!(payload, routing_key, delay_by = DEFAULT_DELAY_BY_MS, type: nil, headers: nil, message_id: :auto_generate)
  channel_connector.raise_if_delayed_not_allowed
  publish_options = Pwwka::PublishOptions.new(
    routing_key: routing_key,
    message_id: message_id,
    type: type,
    headers: headers,
    expiration: delay_by
  )
  logf "START Transmitting Delayed Message on id[%{id}] %{routing_key} -> %{payload}", id: publish_options.message_id, routing_key: routing_key, payload: payload
  channel_connector.create_delayed_queue
  channel_connector.delayed_exchange.publish(payload.to_json,publish_options.to_h)
  # if it gets this far it has succeeded
  logf "END Transmitting Delayed Message on id[%{id}] %{routing_key} -> %{payload}", id: publish_options.message_id, routing_key: routing_key, payload: payload
  true
ensure
  unless caller_manages_connector
    channel_connector.connection_close
  end
end

#send_message!(payload, routing_key, type: nil, headers: nil, message_id: :auto_generate) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/pwwka/transmitter.rb', line 146

def send_message!(payload, routing_key, type: nil, headers: nil, message_id: :auto_generate)
  publish_options = Pwwka::PublishOptions.new(
    routing_key: routing_key,
    message_id: message_id,
    type: type,
    headers: headers
  )
  logf "START Transmitting Message on id[%{id}] %{routing_key} -> %{payload}", id: publish_options.message_id, routing_key: routing_key, payload: payload
  channel_connector.topic_exchange.publish(payload.to_json, publish_options.to_h)
  # if it gets this far it has succeeded
  logf "END Transmitting Message on id[%{id}] %{routing_key} -> %{payload}", id: publish_options.message_id, routing_key: routing_key, payload: payload
  true
ensure
  unless caller_manages_connector
    channel_connector.connection_close
  end
end