Class: Franz::Output::HTTP
- Inherits:
-
Object
- Object
- Franz::Output::HTTP
- Defined in:
- lib/franz/output/http.rb
Overview
HTTP output for Franz.
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ HTTP
constructor
Start a new output in the background.
-
#join ⇒ Object
Join the Output thread.
-
#stop ⇒ Object
Stop the Output thread.
Constructor Details
#initialize(opts = {}) ⇒ HTTP
Start a new output in the background. We’ll consume from the input queue and ship events via HTTP.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/franz/output/http.rb', line 21 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: { uri: 'http://user:pass@localhost:3000/v2/.theon', flush_size: 500, flush_interval: 10, ssl: { cert_file: nil, key_file: nil, ca_file: nil, verify_mode: nil } } }.deep_merge!(opts) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_output, 0 @logger = opts[:logger] @stop = false @foreground = opts[:foreground] uri = opts[:output].delete(:uri) || opts[:output].delete(:server) @uri = URI(uri) @ssl = if @uri.scheme =~ /https/ opts[:output].delete :ssl end open_uri @flush_size = opts[:output][:flush_size] @flush_interval = opts[:output][:flush_interval] @lock = Mutex.new @messages = [] Thread.new do until @stop @lock.synchronize do true end sleep @flush_interval end end @thread = Thread.new do until @stop event = opts[:input].shift unless opts[:tags].empty? event['tags'] ||= [] event['tags'] += opts[:tags] end payload = JSON::generate event @lock.synchronize do enqueue payload end log.debug \ event: 'publish', raw: event end end log.info event: 'output started' @thread.join if @foreground end |
Instance Method Details
#join ⇒ Object
Join the Output thread. Effectively only once.
95 96 97 98 99 |
# File 'lib/franz/output/http.rb', line 95 def join return if @foreground @foreground = true @thread.join end |
#stop ⇒ Object
Stop the Output thread. Effectively only once.
103 104 105 106 107 108 |
# File 'lib/franz/output/http.rb', line 103 def stop return if @foreground @foreground = true @thread.kill log.info event: 'output stopped' end |