Method: Franz::Output::HTTP#initialize
- Defined in:
- lib/franz/output/http.rb
#initialize(opts = {}) ⇒ HTTP
Start a new output in the background. We’ll consume from the input queue and ship events via HTTP.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/franz/output/http.rb', line 21 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: { uri: 'http://user:pass@localhost:3000/v2/.theon', flush_size: 500, flush_interval: 10, ssl: { cert_file: nil, key_file: nil, ca_file: nil, verify_mode: nil } } }.deep_merge!(opts) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_output, 0 @logger = opts[:logger] @stop = false @foreground = opts[:foreground] uri = opts[:output].delete(:uri) || opts[:output].delete(:server) @uri = URI(uri) @ssl = if @uri.scheme =~ /https/ opts[:output].delete :ssl end open_uri @flush_size = opts[:output][:flush_size] @flush_interval = opts[:output][:flush_interval] @lock = Mutex.new @messages = [] Thread.new do until @stop @lock.synchronize do true end sleep @flush_interval end end @thread = Thread.new do until @stop event = opts[:input].shift unless opts[:tags].empty? event['tags'] ||= [] event['tags'] += opts[:tags] end payload = JSON::generate event @lock.synchronize do enqueue payload end log.debug \ event: 'publish', raw: event end end log.info event: 'output started' @thread.join if @foreground end |