Class: ActiveEvent::ReplayServer

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/active_event/replay_server.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optionsObject



88
89
90
# File 'lib/active_event/replay_server.rb', line 88

def options
  @options
end

Class Method Details

.start(options, id) ⇒ Object



8
9
10
11
# File 'lib/active_event/replay_server.rb', line 8

def self.start(options, id)
  instance.options = options
  instance.start id
end

.update(id) ⇒ Object



13
14
15
# File 'lib/active_event/replay_server.rb', line 13

def self.update(id)
  instance.queue << id
end

Instance Method Details

#event_channelObject



80
81
82
# File 'lib/active_event/replay_server.rb', line 80

def event_channel
  @event_channel ||= event_connection.create_channel
end

#event_connectionObject



76
77
78
# File 'lib/active_event/replay_server.rb', line 76

def event_connection
  @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s
end

#has_next_event?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/active_event/replay_server.rb', line 72

def has_next_event?
  @events.length > 0
end

#new_id?Boolean

Returns:

  • (Boolean)


48
49
50
51
52
53
54
55
56
57
# File 'lib/active_event/replay_server.rb', line 48

def new_id?
  unless queue.empty?
    new_id = queue.pop
    if new_id < @last_id
      @last_id = new_id
      return true
    end
  end
  false
end

#next_eventObject



66
67
68
69
70
# File 'lib/active_event/replay_server.rb', line 66

def next_event
  e = @events.shift
  @last_id = e.id
  e
end

#queueObject



24
25
26
# File 'lib/active_event/replay_server.rb', line 24

def queue
  @queue ||= Queue.new
end

#republish(event) ⇒ Object



59
60
61
62
63
64
# File 'lib/active_event/replay_server.rb', line 59

def republish(event)
  type = event.event
  body = event.data.to_json
  resend_exchange.publish body, {type: type, headers: {id: event.id, created_at: event.created_at, replayed: true}}
  puts "Republished #{type} with #{body}"
end

#republish_eventsObject



40
41
42
43
44
45
46
# File 'lib/active_event/replay_server.rb', line 40

def republish_events
  while has_next_event?
    return if new_id?
    republish next_event
    Thread.pass
  end
end

#resend_exchangeObject



84
85
86
# File 'lib/active_event/replay_server.rb', line 84

def resend_exchange
  @resend_exchange ||= event_channel.fanout "resend_#{options[:event_exchange]}"
end

#send_done_messageObject



28
29
30
# File 'lib/active_event/replay_server.rb', line 28

def send_done_message
  resend_exchange.publish 'replay_done'
end

#start(id) ⇒ Object



17
18
19
20
21
22
# File 'lib/active_event/replay_server.rb', line 17

def start(id)
  event_connection.start
  @last_id = id
  start_republishing
  send_done_message
end

#start_republishingObject



32
33
34
35
36
37
38
# File 'lib/active_event/replay_server.rb', line 32

def start_republishing
  loop do
    @events = EventRepository.after_id(@last_id).to_a
    return if @events.empty?
    republish_events
  end
end