Class: WithEvents::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/with_events/stream.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, klass, options = {}) ⇒ Stream

Returns a new instance of Stream.



7
8
9
10
11
12
13
14
15
16
17
# File 'lib/with_events/stream.rb', line 7

def initialize(name, klass, options = {})
  @name = name
  @klass = klass
  @events = []
  @watchers = {}
  @topic = options[:topic]
  @configuration = {}
  @subscribe = options[:subscribe]

  self.class.streams << self
end

Class Attribute Details

.subscribedObject

Returns the value of attribute subscribed.



49
50
51
# File 'lib/with_events/stream.rb', line 49

def subscribed
  @subscribed
end

Instance Attribute Details

#eventsObject (readonly)

Returns the value of attribute events.



5
6
7
# File 'lib/with_events/stream.rb', line 5

def events
  @events
end

#klassObject (readonly)

Returns the value of attribute klass.



5
6
7
# File 'lib/with_events/stream.rb', line 5

def klass
  @klass
end

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/with_events/stream.rb', line 5

def name
  @name
end

#subscribeObject (readonly)

Returns the value of attribute subscribe.



5
6
7
# File 'lib/with_events/stream.rb', line 5

def subscribe
  @subscribe
end

#topicObject (readonly)

Returns the value of attribute topic.



5
6
7
# File 'lib/with_events/stream.rb', line 5

def topic
  @topic
end

#watchersObject (readonly)

Returns the value of attribute watchers.



5
6
7
# File 'lib/with_events/stream.rb', line 5

def watchers
  @watchers
end

Class Method Details

.find(name) ⇒ Object



59
60
61
# File 'lib/with_events/stream.rb', line 59

def find(name)
  streams.find { |s| s.name == name }
end

.find_or_initialize(name, klass, options = {}) ⇒ Object



55
56
57
# File 'lib/with_events/stream.rb', line 55

def find_or_initialize(name, klass, options = {})
  find(name) || new(name, klass, options)
end

.streamsObject



51
52
53
# File 'lib/with_events/stream.rb', line 51

def streams
  @streams ||= []
end

.subscribeObject



63
64
65
66
67
68
69
70
71
# File 'lib/with_events/stream.rb', line 63

def subscribe
  return if subscribed || !streams.find { |s| s.topic && s.subscribe }
  self.subscribed = true

  Aws::Topic.new.subscribe(async: true, timeout: 0) do |message, topic|
    selected = stream_events(message, topic)
    selected.each { |event| notify_event(event, message) }.size.positive?
  end
end

Instance Method Details

#configure_all(options = {}) ⇒ Object



29
30
31
# File 'lib/with_events/stream.rb', line 29

def configure_all(options = {})
  @configuration = options
end

#event(name, options = {}) ⇒ Object



19
20
21
22
# File 'lib/with_events/stream.rb', line 19

def event(name, options = {})
  events <<
    Event.new(name, klass, options.merge(configuration).merge(stream: self))
end

#notify(event, resource) ⇒ Object



38
39
40
41
# File 'lib/with_events/stream.rb', line 38

def notify(event, resource)
  notify_sqs(event, resource) if topic
  notify_watchers(event, resource)
end

#notify_watchers(event, resource) ⇒ Object



43
44
45
46
# File 'lib/with_events/stream.rb', line 43

def notify_watchers(event, resource)
  return if watchers[event.name].nil?
  watchers[event.name].each { |watcher| resource.instance_exec(&watcher) }
end

#on(name, &block) ⇒ Object



33
34
35
36
# File 'lib/with_events/stream.rb', line 33

def on(name, &block)
  watchers[name] ||= []
  watchers[name] << block
end

#reset_configure_allObject



24
25
26
27
# File 'lib/with_events/stream.rb', line 24

def reset_configure_all
  @configuration = {}
  self
end