Class: Fake::Kafka
- Inherits:
-
Object
- Object
- Fake::Kafka
- Defined in:
- lib/fake/kafka.rb,
lib/fake/kafka/version.rb
Defined Under Namespace
Classes: Batch, Consumer, Message, Producer
Constant Summary collapse
- VERSION =
"0.0.2"
Instance Attribute Summary collapse
-
#messages ⇒ Object
readonly
Returns the value of attribute messages.
-
#paused_partitions ⇒ Object
readonly
Returns the value of attribute paused_partitions.
Instance Method Summary collapse
-
#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: -1,, retry_backoff: 0, **options) ⇒ Object
github.com/zendesk/ruby-kafka/blob/v1.0.0/lib/kafka/client.rb#L307 rubocop:disable Lint/UnusedMethodArgument, Metric/ParameterLists, Layout/LineLength.
- #consumer(*options) ⇒ Object
- #deliver_message(value, topic:, key: nil) ⇒ Object
-
#initialize(*options) ⇒ Kafka
constructor
A new instance of Kafka.
- #messages_in(topic) ⇒ Object
- #paused?(topic, partition) ⇒ Boolean
-
#producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60) ⇒ Object
github.com/zendesk/ruby-kafka/blob/v1.0.0/lib/kafka/client.rb#L248-L261 rubocop:disable Lint/UnusedMethodArgument, Metric/ParameterLists, Layout/LineLength.
-
#reset! ⇒ Object
Used to clean in-memory data Useful between test runs.
Constructor Details
#initialize(*options) ⇒ Kafka
Returns a new instance of Kafka.
11 12 13 14 |
# File 'lib/fake/kafka.rb', line 11 def initialize(*) = [] @paused_partitions = {} end |
Instance Attribute Details
#messages ⇒ Object (readonly)
Returns the value of attribute messages.
9 10 11 |
# File 'lib/fake/kafka.rb', line 9 def end |
#paused_partitions ⇒ Object (readonly)
Returns the value of attribute paused_partitions.
9 10 11 |
# File 'lib/fake/kafka.rb', line 9 def paused_partitions @paused_partitions end |
Instance Method Details
#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: -1,, retry_backoff: 0, **options) ⇒ Object
github.com/zendesk/ruby-kafka/blob/v1.0.0/lib/kafka/client.rb#L307 rubocop:disable Lint/UnusedMethodArgument, Metric/ParameterLists, Layout/LineLength
55 56 57 |
# File 'lib/fake/kafka.rb', line 55 def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, max_retries: -1, retry_backoff: 0, **) producer(**) end |
#consumer(*options) ⇒ Object
29 30 31 |
# File 'lib/fake/kafka.rb', line 29 def consumer(*) Consumer.new(self) end |
#deliver_message(value, topic:, key: nil) ⇒ Object
21 22 23 |
# File 'lib/fake/kafka.rb', line 21 def (value, topic:, key: nil) << Message.new(value, key, topic, 0, 0) end |
#messages_in(topic) ⇒ Object
25 26 27 |
# File 'lib/fake/kafka.rb', line 25 def (topic) .select {|| .topic == topic } end |
#paused?(topic, partition) ⇒ Boolean
16 17 18 19 |
# File 'lib/fake/kafka.rb', line 16 def paused?(topic, partition) @paused_partitions[topic] ||= {} !!@paused_partitions[topic][partition] end |
#producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60) ⇒ Object
github.com/zendesk/ruby-kafka/blob/v1.0.0/lib/kafka/client.rb#L248-L261 rubocop:disable Lint/UnusedMethodArgument, Metric/ParameterLists, Layout/LineLength
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fake/kafka.rb', line 35 def producer( compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60 ) Producer.new(self) end |
#reset! ⇒ Object
Used to clean in-memory data Useful between test runs
62 63 64 65 |
# File 'lib/fake/kafka.rb', line 62 def reset! = [] @paused_partitions = {} end |