Class: ActiveProjection::EventClient

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/active_projection/event_client.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optionsObject



115
116
117
# File 'lib/active_projection/event_client.rb', line 115

def options
  @options
end

Class Method Details

.start(options) ⇒ Object



7
8
9
10
# File 'lib/active_projection/event_client.rb', line 7

def self.start(options)
  instance.options = options
  instance.start
end

Instance Method Details

#build_event(type, data) ⇒ Object



71
72
73
# File 'lib/active_projection/event_client.rb', line 71

def build_event(type, data)
  Object.const_get(type).new(data.deep_symbolize_keys)
end

#event_channelObject



95
96
97
# File 'lib/active_projection/event_client.rb', line 95

def event_channel
  @event_channel ||= event_connection.create_channel
end

#event_connectionObject



91
92
93
# File 'lib/active_projection/event_client.rb', line 91

def event_connection
  @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s
end

#event_exchangeObject



99
100
101
# File 'lib/active_projection/event_client.rb', line 99

def event_exchange
  @event_exchange ||= event_channel.fanout options[:event_exchange]
end

#event_queueObject



79
80
81
# File 'lib/active_projection/event_client.rb', line 79

def event_queue
  @event_queue ||= event_channel.queue('', auto_delete: true).bind(event_exchange)
end

#event_received(properties, body) ⇒ Object



65
66
67
68
69
# File 'lib/active_projection/event_client.rb', line 65

def event_received(properties, body)
  RELOADER.execute_if_updated
  LOGGER.debug "Received #{properties.type} with #{body}"
  ProjectionTypeRegistry.process properties.headers.deep_symbolize_keys!, build_event(properties.type, JSON.parse(body))
end

#listen_for_eventsObject



30
31
32
33
34
35
# File 'lib/active_projection/event_client.rb', line 30

def listen_for_events
  subscribe_to event_queue do |delivery_info, properties, body|
    event_received properties, body
    send_browser_notification properties.headers[:id]
  end
end

#listen_for_replayed_eventsObject



42
43
44
45
46
# File 'lib/active_projection/event_client.rb', line 42

def listen_for_replayed_events
  subscribe_to replay_queue do |delivery_info, properties, body|
    event_received properties, body unless replay_done? body
  end
end

#min_last_idObject



83
84
85
# File 'lib/active_projection/event_client.rb', line 83

def min_last_id
  ProjectionRepository.last_ids.min || 0
end

#replay_done?(body) ⇒ Boolean

Returns:

  • (Boolean)


56
57
58
59
60
61
62
63
# File 'lib/active_projection/event_client.rb', line 56

def replay_done?(body)
  if 'replay_done' == body
    LOGGER.debug 'Projections should be up to date now'
    replay_queue.unbind(resend_exchange)
    return true
  end
  false
end

#replay_queueObject



75
76
77
# File 'lib/active_projection/event_client.rb', line 75

def replay_queue
  @replay_queue ||= event_channel.queue('', auto_delete: true).bind(resend_exchange)
end

#request_missing_eventsObject



37
38
39
40
# File 'lib/active_projection/event_client.rb', line 37

def request_missing_events
  listen_for_replayed_events
  send_request_for min_last_id
end

#resend_exchangeObject



103
104
105
# File 'lib/active_projection/event_client.rb', line 103

def resend_exchange
  @resend_exchange ||= event_channel.fanout "resend_#{options[:event_exchange]}"
end

#resend_request_exchangeObject



107
108
109
# File 'lib/active_projection/event_client.rb', line 107

def resend_request_exchange
  @resend_request_exchange ||= event_channel.direct "resend_request_#{options[:event_exchange]}"
end

#send_browser_notification(id) ⇒ Object



48
49
50
# File 'lib/active_projection/event_client.rb', line 48

def send_browser_notification(id)
  server_side_events_exchange.publish id.to_s
end

#send_request_for(id) ⇒ Object



52
53
54
# File 'lib/active_projection/event_client.rb', line 52

def send_request_for(id)
  resend_request_exchange.publish id.to_s, routing_key: 'resend_request'
end

#server_side_events_exchangeObject



111
112
113
# File 'lib/active_projection/event_client.rb', line 111

def server_side_events_exchange
  @server_side_events_exchange ||= event_channel.fanout "server_side_#{options[:event_exchange]}"
end

#startObject



12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/active_projection/event_client.rb', line 12

def start
  event_connection.start
  sync_projections
  listen_for_events
  request_missing_events
  event_channel.work_pool.join
rescue Interrupt
  LOGGER.info 'Catching Interrupt'
rescue Exception => e
  LOGGER.error e.message
  LOGGER.error e.backtrace.join("\n")
  raise e
end

#subscribe_to(queue, &block) ⇒ Object



87
88
89
# File 'lib/active_projection/event_client.rb', line 87

def subscribe_to(queue, &block)
  queue.subscribe(&block)
end

#sync_projectionsObject



26
27
28
# File 'lib/active_projection/event_client.rb', line 26

def sync_projections
  ProjectionTypeRegistry.sync_projections
end