Module: QueueBus::Publishing

Included in:
QueueBus
Defined in:
lib/queue_bus/publishing.rb

Instance Method Summary collapse

Instance Method Details

#delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ⇒ Object



76
77
78
# File 'lib/queue_bus/publishing.rb', line 76

def delayed_enqueue_to(epoch_seconds, queue_name, klass, hash)
  ::QueueBus.adapter.enqueue_at(epoch_seconds, queue_name, klass, ::QueueBus::Util.encode(hash || {}))
end

#enqueue_to(queue_name, class_name, hash) ⇒ Object



70
71
72
73
74
# File 'lib/queue_bus/publishing.rb', line 70

def enqueue_to(queue_name, class_name, hash)
  class_name = class_name.name if class_name.is_a?(Class)
  hash = hash.merge("bus_class_proxy" => class_name.to_s)
  ::QueueBus.adapter.enqueue(queue_name, ::QueueBus::Worker, ::QueueBus::Util.encode(hash || {}))
end

#generate_uuidObject



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/queue_bus/publishing.rb', line 36

def generate_uuid
  require 'securerandom' unless defined?(SecureRandom)
  return SecureRandom.uuid

  rescue Exception => e
    # secure random not there
    # big random number a few times
    n_bytes = [42].pack('i').size
    n_bits = n_bytes * 8
    max = 2 ** (n_bits - 2) - 1
    return "#{rand(max)}-#{rand(max)}-#{rand(max)}"
end

#heartbeat!Object



80
81
82
# File 'lib/queue_bus/publishing.rb', line 80

def heartbeat!
  ::QueueBus.adapter.setup_heartbeat!(incoming_queue)
end

#publish(event_type, attributes = {}) ⇒ Object



49
50
51
52
53
54
55
56
57
58
# File 'lib/queue_bus/publishing.rb', line 49

def publish(event_type, attributes = {})
  to_publish = (event_type, attributes)

  ::QueueBus.log_application("Event published: #{event_type} #{to_publish.inspect}")
  if local_mode
    ::QueueBus::Local.publish(to_publish) # TODO: use different adapters
  else
    enqueue_to(::QueueBus.incoming_queue, ::QueueBus::Driver, to_publish)
  end
end

#publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object



60
61
62
63
64
65
66
67
68
# File 'lib/queue_bus/publishing.rb', line 60

def publish_at(timestamp_or_epoch, event_type, attributes = {})
  to_publish = (event_type, attributes)
  to_publish["bus_delayed_until"] ||= timestamp_or_epoch.to_i
  to_publish.delete("bus_published_at") unless attributes["bus_published_at"] # will be put on when it actually does it
  to_publish["bus_class_proxy"] = ::QueueBus::Publisher.name.to_s

  ::QueueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}")
  delayed_enqueue_to(timestamp_or_epoch.to_i, incoming_queue, ::QueueBus::Worker, to_publish)
end

#publish_metadata(event_type, attributes = {}) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/queue_bus/publishing.rb', line 24

def (event_type, attributes={})
  # TODO: "bus_app_key" => application.app_key ?
  bus_attr = {"bus_published_at" => Time.now.to_i, "bus_event_type" => event_type}
  bus_attr["bus_id"]           = "#{Time.now.to_i}-#{generate_uuid}"
  bus_attr["bus_app_hostname"] = ::QueueBus.hostname
  bus_attr["bus_locale"]       = I18n.locale.to_s if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale
  bus_attr["bus_timezone"]     = Time.zone.name   if defined?(Time) && Time.respond_to?(:zone)   && Time.zone
  out = bus_attr.merge(attributes || {})
  ::QueueBus.before_publish_callback(out)
  out
end

#with_global_attributes(attributes) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/queue_bus/publishing.rb', line 4

def with_global_attributes(attributes)
  original_timezone = false
  original_locale   = false

  if attributes["bus_locale"] && defined?(I18n) && I18n.respond_to?(:locale=)
    original_locale = I18n.locale if I18n.respond_to?(:locale)
    I18n.locale = attributes["bus_locale"]
  end

  if attributes["bus_timezone"] && defined?(Time) && Time.respond_to?(:zone=)
    original_timezone = Time.zone if Time.respond_to?(:zone)
    Time.zone = attributes["bus_timezone"]
  end

  yield
ensure
  I18n.locale = original_locale   unless original_locale   == false
  Time.zone   = original_timezone unless original_timezone == false
end