Module: QueueBus::Publishing

Included in:
QueueBus
Defined in:
lib/queue_bus/publishing.rb

Instance Method Summary collapse

Instance Method Details

#delayed_enqueue_to(epoch_seconds, queue_name, klass, hash) ⇒ Object



75
76
77
# File 'lib/queue_bus/publishing.rb', line 75

def delayed_enqueue_to(epoch_seconds, queue_name, klass, hash)
  ::QueueBus.adapter.enqueue_at(epoch_seconds, queue_name, klass, ::QueueBus::Util.encode(hash || {}))
end

#enqueue_to(queue_name, klass, hash) ⇒ Object



71
72
73
# File 'lib/queue_bus/publishing.rb', line 71

def enqueue_to(queue_name, klass, hash)
  ::QueueBus.adapter.enqueue(queue_name, klass, ::QueueBus::Util.encode(hash || {}))
end

#generate_uuidObject



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/queue_bus/publishing.rb', line 36

def generate_uuid
  require 'securerandom' unless defined?(SecureRandom)
  return SecureRandom.uuid

  rescue Exception => e
    # secure random not there
    # big random number a few times
    n_bytes = [42].pack('i').size
    n_bits = n_bytes * 8
    max = 2 ** (n_bits - 2) - 1
    return "#{rand(max)}-#{rand(max)}-#{rand(max)}"
end

#heartbeat!Object



79
80
81
# File 'lib/queue_bus/publishing.rb', line 79

def heartbeat!
  ::QueueBus.adapter.setup_heartbeat!(incoming_queue)
end

#publish(event_type, attributes = {}) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/queue_bus/publishing.rb', line 49

def publish(event_type, attributes = {})
  to_publish = (event_type, attributes)
  to_publish["bus_class_proxy"] = ::QueueBus::Driver.name.to_s

  ::QueueBus.log_application("Event published: #{event_type} #{to_publish.inspect}")
  if local_mode
    ::QueueBus::Local.perform(to_publish)
  else
    enqueue_to(::QueueBus.incoming_queue, ::QueueBus::Worker, to_publish)
  end
end

#publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object



61
62
63
64
65
66
67
68
69
# File 'lib/queue_bus/publishing.rb', line 61

def publish_at(timestamp_or_epoch, event_type, attributes = {})
  to_publish = (event_type, attributes)
  to_publish["bus_delayed_until"] ||= timestamp_or_epoch.to_i
  to_publish.delete("bus_published_at") unless attributes["bus_published_at"] # will be put on when it actually does it
  to_publish["bus_class_proxy"] = ::QueueBus::Publisher.name.to_s

  ::QueueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}")
  delayed_enqueue_to(timestamp_or_epoch.to_i, incoming_queue, ::QueueBus::Worker, to_publish)
end

#publish_metadata(event_type, attributes = {}) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/queue_bus/publishing.rb', line 24

def (event_type, attributes={})
  # TODO: "bus_app_key" => application.app_key ?
  bus_attr = {"bus_published_at" => Time.now.to_i, "bus_event_type" => event_type}
  bus_attr["bus_id"]           = "#{Time.now.to_i}-#{generate_uuid}"
  bus_attr["bus_app_hostname"] = ::QueueBus.hostname
  bus_attr["bus_locale"]       = I18n.locale.to_s if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale
  bus_attr["bus_timezone"]     = Time.zone.name   if defined?(Time) && Time.respond_to?(:zone)   && Time.zone
  out = bus_attr.merge(attributes || {})
  ::QueueBus.before_publish_callback(out)
  out
end

#with_global_attributes(attributes) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/queue_bus/publishing.rb', line 4

def with_global_attributes(attributes)
  original_timezone = false
  original_locale   = false

  if attributes["bus_locale"] && defined?(I18n) && I18n.respond_to?(:locale=)
    original_locale = I18n.locale if I18n.respond_to?(:locale)
    I18n.locale = attributes["bus_locale"]
  end

  if attributes["bus_timezone"] && defined?(Time) && Time.respond_to?(:zone=)
    original_timezone = Time.zone if Time.respond_to?(:zone)
    Time.zone = attributes["bus_timezone"]
  end

  yield
ensure
  I18n.locale = original_locale   unless original_locale   == false
  Time.zone   = original_timezone unless original_timezone == false
end