Class: Fare::TestMode::MessageList

Inherits:
Object
  • Object
show all
Defined in:
lib/fare/test_mode.rb

Instance Method Summary collapse

Constructor Details

#initializeMessageList

Returns a new instance of MessageList.



6
7
8
# File 'lib/fare/test_mode.rb', line 6

def initialize
  @all_messages_published = {}
end

Instance Method Details

#all(subject, action) ⇒ Object



32
33
34
# File 'lib/fare/test_mode.rb', line 32

def all(subject, action)
  @all_messages_published[[subject.to_s, action.to_s]] || []
end

#clearObject



24
25
26
# File 'lib/fare/test_mode.rb', line 24

def clear
  @all_messages_published.clear
end

#get(subject, action) ⇒ Object



28
29
30
# File 'lib/fare/test_mode.rb', line 28

def get(subject, action)
  all(subject, action).first
end

#given_event(event_or_queue_name, event = nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fare/test_mode.rb', line 46

def given_event(event_or_queue_name, event = nil)
  if event
    queue_name = event_or_queue_name
  else
    event = event_or_queue_name
    queue_name = app_name
  end
  serialized_event = Event.new(event).serialize
  queue = fetch_queue(queue_name.to_s)
  queue.publish(serialized_event)
end

#listObject



40
41
42
43
44
# File 'lib/fare/test_mode.rb', line 40

def list
  @all_messages_published.keys.map { |(subject, action)|
    "* #{subject}##{action}"
  }.join("\n")
end

#queue_adapterObject



10
11
12
# File 'lib/fare/test_mode.rb', line 10

def queue_adapter
  @queue_adapter ||= QueueAdapter.new
end

#register_publish(message) ⇒ Object



18
19
20
21
22
# File 'lib/fare/test_mode.rb', line 18

def register_publish(message)
  event = Event.deserialize(message)
  @all_messages_published[[event.subject.to_s, event.action.to_s]] ||= []
  @all_messages_published[[event.subject.to_s, event.action.to_s]] << event
end

#run(queue_name = app_name, timeout: 2) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fare/test_mode.rb', line 58

def run(queue_name = app_name, timeout: 2)
  subscriber = Subscriber.new(Fare.configuration, name: queue_name)
  Timeout.timeout(timeout) do
    while message = produce_message(subscriber)
      begin
        subscriber.consume(message)
      rescue
        # manually delete in case of errors, so not to retry it
        message.delete
        raise
      end
    end
  end
end

#sizeObject



36
37
38
# File 'lib/fare/test_mode.rb', line 36

def size
  @all_messages_published.size
end

#topic_adapterObject



14
15
16
# File 'lib/fare/test_mode.rb', line 14

def topic_adapter
  @topic_adapter ||= TopicAdapter.new(self)
end