Module: QueueBus::Publishing
- Included in:
- QueueBus
- Defined in:
- lib/queue_bus/publishing.rb
Instance Method Summary collapse
- #delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ⇒ Object
- #enqueue_to(queue_name, class_name, hash) ⇒ Object
- #generate_uuid ⇒ Object
- #heartbeat! ⇒ Object
- #publish(event_type, attributes = {}) ⇒ Object
- #publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object
- #publish_metadata(event_type, attributes = {}) ⇒ Object
- #with_global_attributes(attributes) ⇒ Object
Instance Method Details
#delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ⇒ Object
76 77 78 |
# File 'lib/queue_bus/publishing.rb', line 76 def delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ::QueueBus.adapter.enqueue_at(epoch_seconds, queue_name, klass, ::QueueBus::Util.encode(hash || {})) end |
#enqueue_to(queue_name, class_name, hash) ⇒ Object
70 71 72 73 74 |
# File 'lib/queue_bus/publishing.rb', line 70 def enqueue_to(queue_name, class_name, hash) class_name = class_name.name if class_name.is_a?(Class) hash = hash.merge("bus_class_proxy" => class_name.to_s) ::QueueBus.adapter.enqueue(queue_name, ::QueueBus::Worker, ::QueueBus::Util.encode(hash || {})) end |
#generate_uuid ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/queue_bus/publishing.rb', line 36 def generate_uuid require 'securerandom' unless defined?(SecureRandom) return SecureRandom.uuid rescue Exception => e # secure random not there # big random number a few times n_bytes = [42].pack('i').size n_bits = n_bytes * 8 max = 2 ** (n_bits - 2) - 1 return "#{rand(max)}-#{rand(max)}-#{rand(max)}" end |
#heartbeat! ⇒ Object
80 81 82 |
# File 'lib/queue_bus/publishing.rb', line 80 def heartbeat! ::QueueBus.adapter.setup_heartbeat!(incoming_queue) end |
#publish(event_type, attributes = {}) ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/queue_bus/publishing.rb', line 49 def publish(event_type, attributes = {}) to_publish = (event_type, attributes) ::QueueBus.log_application("Event published: #{event_type} #{to_publish.inspect}") if local_mode ::QueueBus::Local.publish(to_publish) # TODO: use different adapters else enqueue_to(::QueueBus.incoming_queue, ::QueueBus::Driver, to_publish) end end |
#publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object
60 61 62 63 64 65 66 67 68 |
# File 'lib/queue_bus/publishing.rb', line 60 def publish_at(, event_type, attributes = {}) to_publish = (event_type, attributes) to_publish["bus_delayed_until"] ||= .to_i to_publish.delete("bus_published_at") unless attributes["bus_published_at"] # will be put on when it actually does it to_publish["bus_class_proxy"] = ::QueueBus::Publisher.name.to_s ::QueueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{.to_i}") delayed_enqueue_to(.to_i, incoming_queue, ::QueueBus::Worker, to_publish) end |
#publish_metadata(event_type, attributes = {}) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/queue_bus/publishing.rb', line 24 def (event_type, attributes={}) # TODO: "bus_app_key" => application.app_key ? bus_attr = {"bus_published_at" => Time.now.to_i, "bus_event_type" => event_type} bus_attr["bus_id"] = "#{Time.now.to_i}-#{generate_uuid}" bus_attr["bus_app_hostname"] = ::QueueBus.hostname bus_attr["bus_locale"] = I18n.locale.to_s if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale bus_attr["bus_timezone"] = Time.zone.name if defined?(Time) && Time.respond_to?(:zone) && Time.zone out = bus_attr.merge(attributes || {}) ::QueueBus.before_publish_callback(out) out end |
#with_global_attributes(attributes) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/queue_bus/publishing.rb', line 4 def with_global_attributes(attributes) original_timezone = false original_locale = false if attributes["bus_locale"] && defined?(I18n) && I18n.respond_to?(:locale=) original_locale = I18n.locale if I18n.respond_to?(:locale) I18n.locale = attributes["bus_locale"] end if attributes["bus_timezone"] && defined?(Time) && Time.respond_to?(:zone=) original_timezone = Time.zone if Time.respond_to?(:zone) Time.zone = attributes["bus_timezone"] end yield ensure I18n.locale = original_locale unless original_locale == false Time.zone = original_timezone unless original_timezone == false end |