Module: QueueBus::Publishing

Included in:
QueueBus
Defined in:
lib/queue_bus/publishing.rb

Overview

The publishing mixin provides the main interactions that users will use to interact with the queue bus. This module is not interacted with directly and instead is included inte the QueueBus module.

Instance Method Summary collapse

Instance Method Details

#delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ⇒ Object



83
84
85
# File 'lib/queue_bus/publishing.rb', line 83

def delayed_enqueue_to(epoch_seconds, queue_name, klass, hash)
  ::QueueBus.adapter.enqueue_at(epoch_seconds, queue_name, klass, ::QueueBus::Util.encode(hash || {}))
end

#enqueue_to(queue_name, class_name, hash) ⇒ Object



77
78
79
80
81
# File 'lib/queue_bus/publishing.rb', line 77

def enqueue_to(queue_name, class_name, hash)
  class_name = class_name.name if class_name.is_a?(Class)
  hash = hash.merge('bus_class_proxy' => class_name.to_s)
  ::QueueBus.adapter.enqueue(queue_name, ::QueueBus::Worker, ::QueueBus::Util.encode(hash || {}))
end

#generate_uuidObject



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/queue_bus/publishing.rb', line 44

def generate_uuid
  require 'securerandom' unless defined?(SecureRandom)
  SecureRandom.uuid
rescue Exception => e
  # secure random not there
  # big random number a few times
  n_bytes = [42].pack('i').size
  n_bits = n_bytes * 8
  max = 2**(n_bits - 2) - 1
  "#{rand(max)}-#{rand(max)}-#{rand(max)}"
end

#heartbeat!Object



87
88
89
# File 'lib/queue_bus/publishing.rb', line 87

def heartbeat!
  ::QueueBus.adapter.setup_heartbeat!(incoming_queue)
end

#publish(event_type, attributes = {}) ⇒ Object



56
57
58
59
60
61
62
63
64
65
# File 'lib/queue_bus/publishing.rb', line 56

def publish(event_type, attributes = {})
  to_publish = (event_type, attributes)

  ::QueueBus.log_application("Event published: #{event_type} #{to_publish.inspect}")
  if local_mode
    ::QueueBus::Local.publish(to_publish) # TODO: use different adapters
  else
    enqueue_to(::QueueBus.incoming_queue, ::QueueBus::Driver, to_publish)
  end
end

#publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object



67
68
69
70
71
72
73
74
75
# File 'lib/queue_bus/publishing.rb', line 67

def publish_at(timestamp_or_epoch, event_type, attributes = {})
  to_publish = (event_type, attributes)
  to_publish['bus_delayed_until'] ||= timestamp_or_epoch.to_i
  to_publish.delete('bus_published_at') unless attributes['bus_published_at'] # will be put on when it actually does it
  to_publish['bus_class_proxy'] = ::QueueBus::Publisher.name.to_s

  ::QueueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}")
  delayed_enqueue_to(timestamp_or_epoch.to_i, incoming_queue, ::QueueBus::Worker, to_publish)
end

#publish_metadata(event_type, attributes = {}) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/queue_bus/publishing.rb', line 28

def (event_type, attributes = {})
  # TODO: "bus_app_key" => application.app_key ?
  bus_attr = { 'bus_published_at' => Time.now.to_i, 'bus_event_type' => event_type }
  bus_attr['bus_id']           = "#{Time.now.to_i}-#{generate_uuid}"
  bus_attr['bus_app_hostname'] = ::QueueBus.hostname
  if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale
    bus_attr['bus_locale']       = I18n.locale.to_s
  end
  if defined?(Time) && Time.respond_to?(:zone) && Time.zone
    bus_attr['bus_timezone']     = Time.zone.name
  end
  out = bus_attr.merge(attributes || {})
  ::QueueBus.before_publish_callback(out)
  out
end

#with_global_attributes(attributes) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/queue_bus/publishing.rb', line 8

def with_global_attributes(attributes)
  original_timezone = false
  original_locale   = false

  if attributes['bus_locale'] && defined?(I18n) && I18n.respond_to?(:locale=)
    original_locale = I18n.locale if I18n.respond_to?(:locale)
    I18n.locale = attributes['bus_locale']
  end

  if attributes['bus_timezone'] && defined?(Time) && Time.respond_to?(:zone=)
    original_timezone = Time.zone if Time.respond_to?(:zone)
    Time.zone = attributes['bus_timezone']
  end

  yield
ensure
  I18n.locale = original_locale   unless original_locale   == false
  Time.zone   = original_timezone unless original_timezone == false
end