Class: ActiveProjection::EventClient
- Inherits:
-
Object
- Object
- ActiveProjection::EventClient
- Includes:
- Singleton
- Defined in:
- lib/active_projection/event_client.rb
Instance Attribute Summary collapse
Class Method Summary collapse
Instance Method Summary collapse
- #build_event(type, data) ⇒ Object
- #event_channel ⇒ Object
- #event_connection ⇒ Object
- #event_exchange ⇒ Object
- #event_queue ⇒ Object
- #event_received(properties, body) ⇒ Object
- #listen_for_events ⇒ Object
- #listen_for_replayed_events ⇒ Object
- #min_last_id ⇒ Object
- #replay_done?(body) ⇒ Boolean
- #replay_queue ⇒ Object
- #request_missing_events ⇒ Object
- #resend_exchange ⇒ Object
- #resend_request_exchange ⇒ Object
- #send_browser_notification(id) ⇒ Object
- #send_request_for(id) ⇒ Object
- #server_side_events_exchange ⇒ Object
- #start ⇒ Object
- #subscribe_to(queue, &block) ⇒ Object
- #sync_projections ⇒ Object
Instance Attribute Details
#options ⇒ Object
115 116 117 |
# File 'lib/active_projection/event_client.rb', line 115 def end |
Class Method Details
.start(options) ⇒ Object
7 8 9 10 |
# File 'lib/active_projection/event_client.rb', line 7 def self.start() instance. = instance.start end |
Instance Method Details
#build_event(type, data) ⇒ Object
71 72 73 |
# File 'lib/active_projection/event_client.rb', line 71 def build_event(type, data) Object.const_get(type).new(data.deep_symbolize_keys) end |
#event_channel ⇒ Object
95 96 97 |
# File 'lib/active_projection/event_client.rb', line 95 def event_channel @event_channel ||= event_connection.create_channel end |
#event_connection ⇒ Object
91 92 93 |
# File 'lib/active_projection/event_client.rb', line 91 def event_connection @event_server ||= Bunny.new URI::Generic.build([:event_connection]).to_s end |
#event_exchange ⇒ Object
99 100 101 |
# File 'lib/active_projection/event_client.rb', line 99 def event_exchange @event_exchange ||= event_channel.fanout [:event_exchange] end |
#event_queue ⇒ Object
79 80 81 |
# File 'lib/active_projection/event_client.rb', line 79 def event_queue @event_queue ||= event_channel.queue('', auto_delete: true).bind(event_exchange) end |
#event_received(properties, body) ⇒ Object
65 66 67 68 69 |
# File 'lib/active_projection/event_client.rb', line 65 def event_received(properties, body) RELOADER.execute_if_updated LOGGER.debug "Received #{properties.type} with #{body}" ProjectionTypeRegistry.process properties.headers.deep_symbolize_keys!, build_event(properties.type, JSON.parse(body)) end |
#listen_for_events ⇒ Object
30 31 32 33 34 35 |
# File 'lib/active_projection/event_client.rb', line 30 def listen_for_events subscribe_to event_queue do |delivery_info, properties, body| event_received properties, body send_browser_notification properties.headers[:id] end end |
#listen_for_replayed_events ⇒ Object
42 43 44 45 46 |
# File 'lib/active_projection/event_client.rb', line 42 def listen_for_replayed_events subscribe_to replay_queue do |delivery_info, properties, body| event_received properties, body unless replay_done? body end end |
#min_last_id ⇒ Object
83 84 85 |
# File 'lib/active_projection/event_client.rb', line 83 def min_last_id ProjectionRepository.last_ids.min || 0 end |
#replay_done?(body) ⇒ Boolean
56 57 58 59 60 61 62 63 |
# File 'lib/active_projection/event_client.rb', line 56 def replay_done?(body) if 'replay_done' == body LOGGER.debug 'Projections should be up to date now' replay_queue.unbind(resend_exchange) return true end false end |
#replay_queue ⇒ Object
75 76 77 |
# File 'lib/active_projection/event_client.rb', line 75 def replay_queue @replay_queue ||= event_channel.queue('', auto_delete: true).bind(resend_exchange) end |
#request_missing_events ⇒ Object
37 38 39 40 |
# File 'lib/active_projection/event_client.rb', line 37 def request_missing_events listen_for_replayed_events send_request_for min_last_id end |
#resend_exchange ⇒ Object
103 104 105 |
# File 'lib/active_projection/event_client.rb', line 103 def resend_exchange @resend_exchange ||= event_channel.fanout "resend_#{options[:event_exchange]}" end |
#resend_request_exchange ⇒ Object
107 108 109 |
# File 'lib/active_projection/event_client.rb', line 107 def resend_request_exchange @resend_request_exchange ||= event_channel.direct "resend_request_#{options[:event_exchange]}" end |
#send_browser_notification(id) ⇒ Object
48 49 50 |
# File 'lib/active_projection/event_client.rb', line 48 def send_browser_notification(id) server_side_events_exchange.publish id.to_s end |
#send_request_for(id) ⇒ Object
52 53 54 |
# File 'lib/active_projection/event_client.rb', line 52 def send_request_for(id) resend_request_exchange.publish id.to_s, routing_key: 'resend_request' end |
#server_side_events_exchange ⇒ Object
111 112 113 |
# File 'lib/active_projection/event_client.rb', line 111 def server_side_events_exchange @server_side_events_exchange ||= event_channel.fanout "server_side_#{options[:event_exchange]}" end |
#start ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/active_projection/event_client.rb', line 12 def start event_connection.start sync_projections listen_for_events request_missing_events event_channel.work_pool.join rescue Interrupt LOGGER.info 'Catching Interrupt' rescue Exception => e LOGGER.error e. LOGGER.error e.backtrace.join("\n") raise e end |
#subscribe_to(queue, &block) ⇒ Object
87 88 89 |
# File 'lib/active_projection/event_client.rb', line 87 def subscribe_to(queue, &block) queue.subscribe(&block) end |
#sync_projections ⇒ Object
26 27 28 |
# File 'lib/active_projection/event_client.rb', line 26 def sync_projections ProjectionTypeRegistry.sync_projections end |