Module: QueueBus::Publishing
- Included in:
- QueueBus
- Defined in:
- lib/queue_bus/publishing.rb
Overview
The publishing mixin provides the main interactions that users will use to interact with the queue bus. This module is not interacted with directly and instead is included inte the QueueBus module.
Instance Method Summary collapse
- #delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ⇒ Object
- #enqueue_to(queue_name, class_name, hash) ⇒ Object
- #generate_uuid ⇒ Object
- #heartbeat! ⇒ Object
- #publish(event_type, attributes = {}) ⇒ Object
- #publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object
- #publish_metadata(event_type, attributes = {}) ⇒ Object
- #with_global_attributes(attributes) ⇒ Object
Instance Method Details
#delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ⇒ Object
83 84 85 |
# File 'lib/queue_bus/publishing.rb', line 83 def delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ::QueueBus.adapter.enqueue_at(epoch_seconds, queue_name, klass, ::QueueBus::Util.encode(hash || {})) end |
#enqueue_to(queue_name, class_name, hash) ⇒ Object
77 78 79 80 81 |
# File 'lib/queue_bus/publishing.rb', line 77 def enqueue_to(queue_name, class_name, hash) class_name = class_name.name if class_name.is_a?(Class) hash = hash.merge('bus_class_proxy' => class_name.to_s) ::QueueBus.adapter.enqueue(queue_name, ::QueueBus::Worker, ::QueueBus::Util.encode(hash || {})) end |
#generate_uuid ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/queue_bus/publishing.rb', line 44 def generate_uuid require 'securerandom' unless defined?(SecureRandom) SecureRandom.uuid rescue Exception => e # secure random not there # big random number a few times n_bytes = [42].pack('i').size n_bits = n_bytes * 8 max = 2**(n_bits - 2) - 1 "#{rand(max)}-#{rand(max)}-#{rand(max)}" end |
#heartbeat! ⇒ Object
87 88 89 |
# File 'lib/queue_bus/publishing.rb', line 87 def heartbeat! ::QueueBus.adapter.setup_heartbeat!(incoming_queue) end |
#publish(event_type, attributes = {}) ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/queue_bus/publishing.rb', line 56 def publish(event_type, attributes = {}) to_publish = (event_type, attributes) ::QueueBus.log_application("Event published: #{event_type} #{to_publish.inspect}") if local_mode ::QueueBus::Local.publish(to_publish) # TODO: use different adapters else enqueue_to(::QueueBus.incoming_queue, ::QueueBus::Driver, to_publish) end end |
#publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object
67 68 69 70 71 72 73 74 75 |
# File 'lib/queue_bus/publishing.rb', line 67 def publish_at(, event_type, attributes = {}) to_publish = (event_type, attributes) to_publish['bus_delayed_until'] ||= .to_i to_publish.delete('bus_published_at') unless attributes['bus_published_at'] # will be put on when it actually does it to_publish['bus_class_proxy'] = ::QueueBus::Publisher.name.to_s ::QueueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}") delayed_enqueue_to(.to_i, incoming_queue, ::QueueBus::Worker, to_publish) end |
#publish_metadata(event_type, attributes = {}) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/queue_bus/publishing.rb', line 28 def (event_type, attributes = {}) # TODO: "bus_app_key" => application.app_key ? bus_attr = { 'bus_published_at' => Time.now.to_i, 'bus_event_type' => event_type } bus_attr['bus_id'] = "#{Time.now.to_i}-#{generate_uuid}" bus_attr['bus_app_hostname'] = ::QueueBus.hostname if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale bus_attr['bus_locale'] = I18n.locale.to_s end if defined?(Time) && Time.respond_to?(:zone) && Time.zone bus_attr['bus_timezone'] = Time.zone.name end out = bus_attr.merge(attributes || {}) ::QueueBus.before_publish_callback(out) out end |
#with_global_attributes(attributes) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/queue_bus/publishing.rb', line 8 def with_global_attributes(attributes) original_timezone = false original_locale = false if attributes['bus_locale'] && defined?(I18n) && I18n.respond_to?(:locale=) original_locale = I18n.locale if I18n.respond_to?(:locale) I18n.locale = attributes['bus_locale'] end if attributes['bus_timezone'] && defined?(Time) && Time.respond_to?(:zone=) original_timezone = Time.zone if Time.respond_to?(:zone) Time.zone = attributes['bus_timezone'] end yield ensure I18n.locale = original_locale unless original_locale == false Time.zone = original_timezone unless original_timezone == false end |