Class: Fake::Kafka

Inherits:
Object
  • Object
show all
Defined in:
lib/fake/kafka.rb,
lib/fake/kafka/version.rb

Defined Under Namespace

Classes: Batch, Consumer, Message, Producer

Constant Summary collapse

VERSION =
"0.0.2"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*options) ⇒ Kafka

Returns a new instance of Kafka.



11
12
13
14
# File 'lib/fake/kafka.rb', line 11

def initialize(*options)
  @messages = []
  @paused_partitions = {}
end

Instance Attribute Details

#messagesObject (readonly)

Returns the value of attribute messages.



9
10
11
# File 'lib/fake/kafka.rb', line 9

def messages
  @messages
end

#paused_partitionsObject (readonly)

Returns the value of attribute paused_partitions.



9
10
11
# File 'lib/fake/kafka.rb', line 9

def paused_partitions
  @paused_partitions
end

Instance Method Details

#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: -1,, retry_backoff: 0, **options) ⇒ Object

github.com/zendesk/ruby-kafka/blob/v1.0.0/lib/kafka/client.rb#L307 rubocop:disable Lint/UnusedMethodArgument, Metric/ParameterLists, Layout/LineLength



55
56
57
# File 'lib/fake/kafka.rb', line 55

def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: -1, retry_backoff: 0, **options)
  producer(**options)
end

#consumer(*options) ⇒ Object



29
30
31
# File 'lib/fake/kafka.rb', line 29

def consumer(*options)
  Consumer.new(self)
end

#deliver_message(value, topic:, key: nil) ⇒ Object



21
22
23
# File 'lib/fake/kafka.rb', line 21

def deliver_message(value, topic:, key: nil)
  @messages << Message.new(value, key, topic, 0, 0)
end

#messages_in(topic) ⇒ Object



25
26
27
# File 'lib/fake/kafka.rb', line 25

def messages_in(topic)
  messages.select {|message| message.topic == topic }
end

#paused?(topic, partition) ⇒ Boolean

Returns:

  • (Boolean)


16
17
18
19
# File 'lib/fake/kafka.rb', line 16

def paused?(topic, partition)
  @paused_partitions[topic] ||= {}
  !!@paused_partitions[topic][partition]
end

#producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60) ⇒ Object

github.com/zendesk/ruby-kafka/blob/v1.0.0/lib/kafka/client.rb#L248-L261 rubocop:disable Lint/UnusedMethodArgument, Metric/ParameterLists, Layout/LineLength



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fake/kafka.rb', line 35

def producer(
  compression_codec: nil,
  compression_threshold: 1,
  ack_timeout: 5,
  required_acks: :all,
  max_retries: 2,
  retry_backoff: 1,
  max_buffer_size: 1000,
  max_buffer_bytesize: 10_000_000,
  idempotent: false,
  transactional: false,
  transactional_id: nil,
  transactional_timeout: 60
)
  Producer.new(self)
end

#reset!Object

Used to clean in-memory data Useful between test runs



62
63
64
65
# File 'lib/fake/kafka.rb', line 62

def reset!
  @messages = []
  @paused_partitions = {}
end