Class: Pwwka::Transmitter
- Inherits:
-
Object
- Object
- Pwwka::Transmitter
- Extended by:
- Logging
- Includes:
- Logging
- Defined in:
- lib/pwwka/transmitter.rb
Overview
Constant Summary collapse
- DEFAULT_DELAY_BY_MS =
5000
Constants included from Logging
Instance Attribute Summary collapse
-
#caller_manages_connector ⇒ Object
readonly
Returns the value of attribute caller_manages_connector.
-
#channel_connector ⇒ Object
readonly
Returns the value of attribute channel_connector.
Class Method Summary collapse
-
.send_message!(payload, routing_key, on_error: :raise, delayed: false, delay_by: nil, type: nil, message_id: :auto_generate, headers: nil, channel_connector: nil) ⇒ Object
Send an important message that must go through.
-
.send_message_async(payload, routing_key, delay_by_ms: 0, type: nil, message_id: :auto_generate, headers: nil) ⇒ Object
Enqueue the message with the configured background processor.
-
.send_message_safely(payload, routing_key, delayed: false, delay_by: nil, message_id: :auto_generate) ⇒ Object
deprecated
Deprecated.
This is ignoring a message. ::send_message supports this explicitly.
Instance Method Summary collapse
-
#initialize(channel_connector: nil) ⇒ Transmitter
constructor
A new instance of Transmitter.
- #send_delayed_message!(payload, routing_key, delay_by = DEFAULT_DELAY_BY_MS, type: nil, headers: nil, message_id: :auto_generate) ⇒ Object
- #send_message!(payload, routing_key, type: nil, headers: nil, message_id: :auto_generate) ⇒ Object
Methods included from Logging
Constructor Details
#initialize(channel_connector: nil) ⇒ Transmitter
Returns a new instance of Transmitter.
29 30 31 32 33 34 35 36 37 |
# File 'lib/pwwka/transmitter.rb', line 29 def initialize(channel_connector: nil) if channel_connector @caller_manages_connector = true @channel_connector = channel_connector else @caller_manages_connector = false @channel_connector = ChannelConnector.new(connection_name: "p: #{Pwwka.configuration.app_id} #{Pwwka.configuration.process_name}".strip) end end |
Instance Attribute Details
#caller_manages_connector ⇒ Object (readonly)
Returns the value of attribute caller_manages_connector.
26 27 28 |
# File 'lib/pwwka/transmitter.rb', line 26 def caller_manages_connector @caller_manages_connector end |
#channel_connector ⇒ Object (readonly)
Returns the value of attribute channel_connector.
27 28 29 |
# File 'lib/pwwka/transmitter.rb', line 27 def channel_connector @channel_connector end |
Class Method Details
.send_message!(payload, routing_key, on_error: :raise, delayed: false, delay_by: nil, type: nil, message_id: :auto_generate, headers: nil, channel_connector: nil) ⇒ Object
Send an important message that must go through. This method allows any raised exception to pass through.
- payload
-
Hash of what you’d like to include in your message
- routing_key
-
String routing key for the message
- delayed
-
Boolean send this message later
- delay_by
-
Integer milliseconds to delay the message
- type
-
A string describing the type. This + your configured app_id should be unique to your entire ecosystem.
- message_id
-
If specified (which generally you should not do), sets the id of the message. If omitted, a GUID is used.
- headers
-
A hash of arbitrary headers to include in the AMQP attributes
- on_error
-
What is the behavior of
-
:ignore (aka as send_message_safely)
-
:raise
-
:resque – use Resque to try to send the message later
-
:retry_async – use the configured background job processor to retry sending the message later
Returns true
Raises any exception generated by the innerworkings of this library.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/pwwka/transmitter.rb', line 58 def self.(payload, routing_key, on_error: :raise, delayed: false, delay_by: nil, type: nil, message_id: :auto_generate, headers: nil, channel_connector: nil ) if delayed new(channel_connector: channel_connector).(*[payload, routing_key, delay_by].compact, type: type, headers: headers, message_id: ) else new(channel_connector: channel_connector).(payload, routing_key, type: type, headers: headers, message_id: ) end logf "AFTER Transmitting Message on %{routing_key} -> %{payload}",routing_key: routing_key, payload: payload true rescue => e logf "ERROR Transmitting Message on %{routing_key} -> %{payload} : %{error}", routing_key: routing_key, payload: payload, error: e, at: :error case on_error when :raise raise e when :resque, :retry_async begin (payload, routing_key, delay_by_ms: delayed ? delay_by || DEFAULT_DELAY_BY_MS : 0) rescue => exception warn(exception.) raise e end else # ignore end false end |
.send_message_async(payload, routing_key, delay_by_ms: 0, type: nil, message_id: :auto_generate, headers: nil) ⇒ Object
Enqueue the message with the configured background processor.
- :delay_by_ms
-
Integer milliseconds to delay the message. Default is 0.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/pwwka/transmitter.rb', line 98 def self.(payload, routing_key, delay_by_ms: 0, type: nil, message_id: :auto_generate, headers: nil) background_job_processor = Pwwka.configuration.background_job_processor job = Pwwka.configuration.async_job_klass if background_job_processor == :resque resque_args = [job, payload, routing_key] unless type == nil && == :auto_generate && headers == nil # NOTE: (jdlubrano) # Why can't we pass these options all of the time? Well, if a user # of pwwka has configured their own async_job_klass that only has an # arity of 2 (i.e. payload and routing key), then passing these options # as an additional argument would break the user's application. In # order to maintain compatibility with preceding versions of Pwwka, # we need to ensure that the same arguments passed into this method # result in compatible calls to enqueue any Resque jobs. resque_args << { type: type, message_id: , headers: headers } end if delay_by_ms.zero? Resque.enqueue(*resque_args) else Resque.enqueue_in(delay_by_ms/1000, *resque_args) end elsif background_job_processor == :sidekiq = { delay_by_ms: delay_by_ms, type: type, message_id: , headers: headers } job.perform_async(payload, routing_key, ) end end |
.send_message_safely(payload, routing_key, delayed: false, delay_by: nil, message_id: :auto_generate) ⇒ Object
This is ignoring a message. ::send_message supports this explicitly.
Send a less important message that doesn’t have to go through. This eats any ‘StandardError` and logs it, returning false rather than blowing up.
- payload
-
Hash of what you’d like to include in your message
- routing_key
-
String routing key for the message
- delayed
-
Boolean send this message later
- delay_by
-
Integer milliseconds to delay the message
Returns true if the message was sent, false otherwise
142 143 144 |
# File 'lib/pwwka/transmitter.rb', line 142 def self.(payload, routing_key, delayed: false, delay_by: nil, message_id: :auto_generate) (payload, routing_key, delayed: delayed, delay_by: delay_by, on_error: :ignore) end |
Instance Method Details
#send_delayed_message!(payload, routing_key, delay_by = DEFAULT_DELAY_BY_MS, type: nil, headers: nil, message_id: :auto_generate) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/pwwka/transmitter.rb', line 165 def (payload, routing_key, delay_by = DEFAULT_DELAY_BY_MS, type: nil, headers: nil, message_id: :auto_generate) channel_connector.raise_if_delayed_not_allowed = Pwwka::PublishOptions.new( routing_key: routing_key, message_id: , type: type, headers: headers, expiration: delay_by ) logf "START Transmitting Delayed Message on id[%{id}] %{routing_key} -> %{payload}", id: ., routing_key: routing_key, payload: payload channel_connector.create_delayed_queue channel_connector.delayed_exchange.publish(payload.to_json,.to_h) # if it gets this far it has succeeded logf "END Transmitting Delayed Message on id[%{id}] %{routing_key} -> %{payload}", id: ., routing_key: routing_key, payload: payload true ensure unless caller_manages_connector channel_connector.connection_close end end |
#send_message!(payload, routing_key, type: nil, headers: nil, message_id: :auto_generate) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/pwwka/transmitter.rb', line 146 def (payload, routing_key, type: nil, headers: nil, message_id: :auto_generate) = Pwwka::PublishOptions.new( routing_key: routing_key, message_id: , type: type, headers: headers ) logf "START Transmitting Message on id[%{id}] %{routing_key} -> %{payload}", id: ., routing_key: routing_key, payload: payload channel_connector.topic_exchange.publish(payload.to_json, .to_h) # if it gets this far it has succeeded logf "END Transmitting Message on id[%{id}] %{routing_key} -> %{payload}", id: ., routing_key: routing_key, payload: payload true ensure unless caller_manages_connector channel_connector.connection_close end end |