23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/deimos/monkey_patches/ruby_kafka_heartbeat.rb', line 23
def consumer(
group_id:,
session_timeout: 30,
offset_commit_interval: 10,
offset_commit_threshold: 0,
heartbeat_interval: 10,
offset_retention_time: nil,
fetcher_max_queue_size: 100
)
cluster = initialize_cluster
instrumenter = DecoratingInstrumenter.new(@instrumenter,
group_id: group_id)
retention_time = (offset_retention_time && offset_retention_time * 1_000) || -1
group = ConsumerGroup.new(
cluster: cluster,
logger: @logger,
group_id: group_id,
session_timeout: session_timeout,
retention_time: retention_time,
instrumenter: instrumenter
)
fetcher = Fetcher.new(
cluster: initialize_cluster,
group: group,
logger: @logger,
instrumenter: instrumenter,
max_queue_size: fetcher_max_queue_size
)
offset_manager = OffsetManager.new(
cluster: cluster,
group: group,
fetcher: fetcher,
logger: @logger,
commit_interval: offset_commit_interval,
commit_threshold: offset_commit_threshold,
offset_retention_time: offset_retention_time
)
heartbeat = Heartbeat.new(
group: group,
interval: heartbeat_interval,
instrumenter: instrumenter
)
Consumer.new(
cluster: cluster,
logger: @logger,
instrumenter: instrumenter,
group: group,
offset_manager: offset_manager,
fetcher: fetcher,
session_timeout: session_timeout,
heartbeat: heartbeat
)
end
|