Class: LogStash::Outputs::ClickHouse

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::HttpClient, Stud::Buffer
Defined in:
lib/logstash/outputs/clickhouse.rb

Instance Method Summary collapse

Instance Method Details

#flush(events, close = false) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/logstash/outputs/clickhouse.rb', line 155

def flush(events, close = false)
  documents = ""  #this is the string of hashes that we push to Fusion as documents

  events.each do |event|
    documents << LogStash::Json.dump(mutate(event.to_hash())) << "\n"
  end

  hosts = get_host_addresses()

  make_request(documents, hosts, @http_query, 1, 1, hosts.sample)
end


51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/outputs/clickhouse.rb', line 51

def print_plugin_info()
  @@plugins = Gem::Specification.find_all { |spec| spec.name =~ /logstash-output-clickhouse/ }
  @plugin_name = @@plugins[0].name
  @plugin_version = @@plugins[0].version
  @logger.info("Running #{@plugin_name} version #{@plugin_version}")

  @logger.info("Initialized clickhouse with settings",
               :flush_size => @flush_size,
               :idle_flush_time => @idle_flush_time,
               :request_tokens => @pool_max,
               :http_hosts => @http_hosts,
               :http_query => @http_query,
               :headers => request_headers)
end

#registerObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/logstash/outputs/clickhouse.rb', line 66

def register
  # Handle this deprecated option. TODO: remove the option
  #@ssl_certificate_validation = @verify_ssl if @verify_ssl

  # We count outstanding requests with this queue
  # This queue tracks the requests to create backpressure
  # When this queue is empty no new requests may be sent,
  # tokens must be added back by the client on success
  @request_tokens = SizedQueue.new(@pool_max)
  @pool_max.times { |t| @request_tokens << true }
  @requests = Array.new

  params = { "query" => "INSERT INTO #{table} FORMAT JSONEachRow" }.merge(@extra_params)
  @http_query = "?#{URI.encode_www_form(params)}"

  @hostnames_pool =
    parse_http_hosts(http_hosts,
                     ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger))

  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger,
  )

  print_plugin_info()
end