Module: ActionPubsub

Extended by:
ActiveSupport::Autoload
Defined in:
lib/action_pubsub/actors/silent_dead_letter_handler.rb,
lib/action_pubsub.rb,
lib/action_pubsub/event.rb,
lib/action_pubsub/queue.rb,
lib/action_pubsub/actors.rb,
lib/action_pubsub/config.rb,
lib/action_pubsub/errors.rb,
lib/action_pubsub/channel.rb,
lib/action_pubsub/version.rb,
lib/action_pubsub/balancer.rb,
lib/action_pubsub/channels.rb,
lib/action_pubsub/registry.rb,
lib/action_pubsub/exchanges.rb,
lib/action_pubsub/subscriber.rb,
lib/action_pubsub/active_record.rb,
lib/action_pubsub/serialization.rb,
lib/action_pubsub/subscriptions.rb,
lib/action_pubsub/has_subscriptions.rb,
lib/action_pubsub/active_record/events.rb,
lib/action_pubsub/serialization/marshal.rb,
lib/action_pubsub/active_record/on_change.rb,
lib/action_pubsub/active_record/subscriber.rb,
lib/action_pubsub/active_record/publishable.rb,
lib/action_pubsub/active_record/subscription.rb,
lib/action_pubsub/active_record/events/changed.rb,
lib/action_pubsub/active_record/events/created.rb,
lib/action_pubsub/active_record/events/updated.rb,
lib/action_pubsub/active_record/with_connection.rb,
lib/action_pubsub/active_record/events/destroyed.rb

Overview

dead letter routing not working ATM

Defined Under Namespace

Modules: ActiveRecord, Actors, Errors, HasSubscriptions, Serialization Classes: Balancer, Channel, Channels, Config, Event, Exchanges, Queue, Registry, Subscriber, Subscriptions

Constant Summary collapse

VERSION =
"0.2.1"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.configurationObject Also known as: config

Returns the value of attribute configuration.



126
127
128
# File 'lib/action_pubsub.rb', line 126

def configuration
  @configuration
end

Class Method Details

.channel?(channel_path) ⇒ Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/action_pubsub.rb', line 40

def self.channel?(channel_path)
  channels.key?(channel_path)
end

.channelsObject



36
37
38
# File 'lib/action_pubsub.rb', line 36

def self.channels
  @channels ||= ::ActionPubsub::Channels.new
end

.configure(&block) ⇒ Object



32
33
34
# File 'lib/action_pubsub.rb', line 32

def self.configure(&block)
  block.call(config)
end

.deserialize_event(event) ⇒ Object



121
122
123
# File 'lib/action_pubsub.rb', line 121

def self.deserialize_event(event)
  event
end

.destination_tuple_from_path(path_string) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/action_pubsub.rb', line 61

def self.destination_tuple_from_path(path_string)
  segs = path_string.split("/")
  worker_index = segs.pop
  action = segs.pop

  [segs.join("/"), action, worker_index]
end

.destination_tuple_from_sender_path(path_string) ⇒ Object



69
70
71
72
73
# File 'lib/action_pubsub.rb', line 69

def self.destination_tuple_from_sender_path(path_string)
  segs = path_string.split("/")
  action = segs.pop
  [segs.join("/"), action]
end

.disable_all!Object



44
45
46
47
48
49
50
51
# File 'lib/action_pubsub.rb', line 44

def self.disable_all!
  configure do |config|
    config.disabled = true
  end

  subscriptions.all.map{ |_subscription| _subscription << :terminate! }
  self
end

.event_countObject



53
54
55
# File 'lib/action_pubsub.rb', line 53

def self.event_count
  @event_count ||= ::Concurrent::Agent.new(0)
end

.exchangesObject



57
58
59
# File 'lib/action_pubsub.rb', line 57

def self.exchanges
  @exchanges ||= ::ActionPubsub::Exchanges.new
end

.on(*paths, as: nil, &block) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/action_pubsub.rb', line 75

def self.on(*paths, as:nil, &block)
  paths.map do |path|
    target_channel = ::ActionPubsub.channels[path]
    subscription_path = "#{path}:#{as || SecureRandom.uuid}"

    ::ActionPubsub.subscriptions[subscription_path] ||= ::Concurrent::Actor::Utils::AdHoc.spawn(subscription_path) do
      target_channel << :subscribe
      -> message {
        block.call(message)
      }
    end
  end
end

.publish(path, message) ⇒ Object



93
94
95
# File 'lib/action_pubsub.rb', line 93

def self.publish(path, message)
  self[path] << message
end

.publish_event(routing_key, event) ⇒ Object



97
98
99
100
101
102
103
# File 'lib/action_pubsub.rb', line 97

def self.publish_event(routing_key, event)
  #need to loop through exchanges and publish to them
  #maybe there is a better way to do this?
  exchanges[routing_key].keys.each do |queue_name|
    exchanges[routing_key][queue_name] << serialize_event(event)
  end
end

.serialize_event(event) ⇒ Object



105
106
107
# File 'lib/action_pubsub.rb', line 105

def self.serialize_event(event)
  event
end

.silent_dead_letter_handlerObject



113
114
115
# File 'lib/action_pubsub.rb', line 113

def self.silent_dead_letter_handler
  @silent_dead_letter_handler ||= ::ActionPubsub::Actors::SilentDeadLetterHandler.spawn('action_pubsub/silent_dead_letter_handler')
end

.subscription?(path) ⇒ Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/action_pubsub.rb', line 117

def self.subscription?(path)
  subscriptions.key?(path)
end

.subscriptionsObject



109
110
111
# File 'lib/action_pubsub.rb', line 109

def self.subscriptions
  @subscriptions ||= ::ActionPubsub::Subscriptions.new
end

.symbolize_routing_key(routing_key) ⇒ Object



89
90
91
# File 'lib/action_pubsub.rb', line 89

def self.symbolize_routing_key(routing_key)
  :"#{routing_key.split('.').join('_')}"
end