Module: QueueBus::Publishing
- Included in:
- QueueBus
- Defined in:
- lib/queue_bus/publishing.rb
Overview
The publishing mixin provides the main interactions that users will use to interact with the queue bus. This module is not interacted with directly and instead is included inte the ‘QueueBus` module.
Instance Method Summary collapse
- #delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ⇒ Object
- #enqueue_to(queue_name, class_name, hash) ⇒ Object
- #generate_uuid ⇒ Object
- #heartbeat! ⇒ Object
- #publish(event_type, attributes = {}) ⇒ Object
- #publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object
- #publish_metadata(event_type, attributes = {}) ⇒ Object
- #with_global_attributes(attributes) ⇒ Object
Instance Method Details
#delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ⇒ Object
84 85 86 |
# File 'lib/queue_bus/publishing.rb', line 84 def delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ::QueueBus.adapter.enqueue_at(epoch_seconds, queue_name, klass, ::QueueBus::Util.encode(hash || {})) end |
#enqueue_to(queue_name, class_name, hash) ⇒ Object
78 79 80 81 82 |
# File 'lib/queue_bus/publishing.rb', line 78 def enqueue_to(queue_name, class_name, hash) class_name = class_name.name if class_name.is_a?(Class) hash = hash.merge('bus_class_proxy' => class_name.to_s) ::QueueBus.adapter.enqueue(queue_name, ::QueueBus::Worker, ::QueueBus::Util.encode(hash || {})) end |
#generate_uuid ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/queue_bus/publishing.rb', line 45 def generate_uuid require 'securerandom' unless defined?(SecureRandom) SecureRandom.uuid rescue Exception => e # secure random not there # big random number a few times n_bytes = [42].pack('i').size n_bits = n_bytes * 8 max = 2**(n_bits - 2) - 1 "#{rand(max)}-#{rand(max)}-#{rand(max)}" end |
#heartbeat! ⇒ Object
88 89 90 |
# File 'lib/queue_bus/publishing.rb', line 88 def heartbeat! ::QueueBus.adapter.setup_heartbeat!(incoming_queue) end |
#publish(event_type, attributes = {}) ⇒ Object
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/queue_bus/publishing.rb', line 57 def publish(event_type, attributes = {}) to_publish = (event_type, attributes) ::QueueBus.log_application("Event published: #{event_type} #{to_publish.inspect}") if local_mode ::QueueBus::Local.publish(to_publish) # TODO: use different adapters else enqueue_to(::QueueBus.incoming_queue, ::QueueBus::Driver, to_publish) end end |
#publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object
68 69 70 71 72 73 74 75 76 |
# File 'lib/queue_bus/publishing.rb', line 68 def publish_at(, event_type, attributes = {}) to_publish = (event_type, attributes) to_publish['bus_delayed_until'] ||= .to_i to_publish.delete('bus_published_at') unless attributes['bus_published_at'] # will be put on when it actually does it to_publish['bus_class_proxy'] = ::QueueBus::Publisher.name.to_s ::QueueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{.to_i}") delayed_enqueue_to(.to_i, incoming_queue, ::QueueBus::Worker, to_publish) end |
#publish_metadata(event_type, attributes = {}) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/queue_bus/publishing.rb', line 28 def (event_type, attributes = {}) # TODO: "bus_app_key" => application.app_key ? bus_attr = { 'bus_published_at' => Time.now.to_i, 'bus_event_type' => event_type } bus_attr['bus_id'] = "#{Time.now.to_i}-#{generate_uuid}" bus_attr['bus_app_hostname'] = ::QueueBus.hostname bus_attr['bus_context'] = ::QueueBus.context unless ::QueueBus.context.nil? if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale bus_attr['bus_locale'] = I18n.locale.to_s end if defined?(Time) && Time.respond_to?(:zone) && Time.zone bus_attr['bus_timezone'] = Time.zone.name end out = bus_attr.merge(attributes || {}) ::QueueBus.before_publish_callback(out) out end |
#with_global_attributes(attributes) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/queue_bus/publishing.rb', line 8 def with_global_attributes(attributes) original_timezone = false original_locale = false if attributes['bus_locale'] && defined?(I18n) && I18n.respond_to?(:locale=) original_locale = I18n.locale if I18n.respond_to?(:locale) I18n.locale = attributes['bus_locale'] end if attributes['bus_timezone'] && defined?(Time) && Time.respond_to?(:zone=) original_timezone = Time.zone if Time.respond_to?(:zone) Time.zone = attributes['bus_timezone'] end yield ensure I18n.locale = original_locale unless original_locale == false Time.zone = original_timezone unless original_timezone == false end |