Class: LogStash::Outputs::ClickHouse
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::ClickHouse
- Includes:
- PluginMixins::HttpClient, Stud::Buffer
- Defined in:
- lib/logstash/outputs/clickhouse.rb
Instance Method Summary collapse
Instance Method Details
#flush(events, close = false) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/logstash/outputs/clickhouse.rb', line 155 def flush(events, close = false) documents = "" #this is the string of hashes that we push to Fusion as documents events.each do |event| documents << LogStash::Json.dump(mutate(event.to_hash())) << "\n" end hosts = get_host_addresses() make_request(documents, hosts, @http_query, 1, 1, hosts.sample) end |
#print_plugin_info ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/logstash/outputs/clickhouse.rb', line 51 def print_plugin_info() @@plugins = Gem::Specification.find_all { |spec| spec.name =~ /logstash-output-clickhouse/ } @plugin_name = @@plugins[0].name @plugin_version = @@plugins[0].version @logger.info("Running #{@plugin_name} version #{@plugin_version}") @logger.info("Initialized clickhouse with settings", :flush_size => @flush_size, :idle_flush_time => @idle_flush_time, :request_tokens => @pool_max, :http_hosts => @http_hosts, :http_query => @http_query, :headers => request_headers) end |
#register ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/logstash/outputs/clickhouse.rb', line 66 def register # Handle this deprecated option. TODO: remove the option #@ssl_certificate_validation = @verify_ssl if @verify_ssl # We count outstanding requests with this queue # This queue tracks the requests to create backpressure # When this queue is empty no new requests may be sent, # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times { |t| @request_tokens << true } @requests = Array.new params = { "query" => "INSERT INTO #{table} FORMAT JSONEachRow" }.merge(@extra_params) @http_query = "?#{URI.encode_www_form(params)}" @hostnames_pool = parse_http_hosts(http_hosts, ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger)) buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger, ) print_plugin_info() end |