Class: LogStash::Outputs::SumoLogic::Sender
- Inherits:
-
Object
- Object
- LogStash::Outputs::SumoLogic::Sender
- Includes:
- Common
- Defined in:
- lib/logstash/outputs/sumologic/sender.rb
Constant Summary collapse
- STOP_TAG =
"PLUGIN STOPPED"
Constants included from Common
Common::CARBON2, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::STATS_TAG
Instance Method Summary collapse
-
#connect ⇒ Object
def stop_sender.
-
#initialize(client, queue, stats, config) ⇒ Sender
constructor
A new instance of Sender.
-
#start ⇒ Object
def initialize.
-
#stop ⇒ Object
def start_sender.
Methods included from Common
#log_dbg, #log_err, #log_info, #log_warn, #set_logger
Constructor Details
#initialize(client, queue, stats, config) ⇒ Sender
Returns a new instance of Sender.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/logstash/outputs/sumologic/sender.rb', line 18 def initialize(client, queue, stats, config) @client = client @queue = queue @stats = stats @stopping = Concurrent::AtomicBoolean.new(false) @url = config["url"] @sender_max = (config["sender_max"] ||= 1) < 1 ? 1 : config["sender_max"] @sleep_before_requeue = config["sleep_before_requeue"] ||= 30 @stats_enabled = config["stats_enabled"] ||= false @tokens = SizedQueue.new(@sender_max) @sender_max.times { |t| @tokens << t } @header_builder = LogStash::Outputs::SumoLogic::HeaderBuilder.new(config) @headers = @header_builder.build() @stats_headers = @header_builder.build_stats() @compressor = LogStash::Outputs::SumoLogic::Compressor.new(config) end |
Instance Method Details
#connect ⇒ Object
def stop_sender
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/logstash/outputs/sumologic/sender.rb', line 63 def connect() uri = URI.parse(@url) http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = @url.downcase().start_with?("https") request = Net::HTTP::Get.new(uri.request_uri) begin res = http.request(request) if res.code.to_i != 200 log_err( "Server rejected the request", :url => @url, :code => res.code ) false else log_dbg( "Server accepted the request", :url => @url ) true end rescue Exception => ex log_err( "Cannot connect to given url", :url => @url, :exception => ex ) false end end |
#start ⇒ Object
def initialize
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/logstash/outputs/sumologic/sender.rb', line 38 def start() @stopping.make_false() @sender_t = Thread.new { while @stopping.false? content = @queue.deq() send_request(content) end # while @queue.drain().map { |content| send_request(content) } log_info "waiting messages sent out..." while @tokens.size < @sender_max sleep 1 end # while } end |
#stop ⇒ Object
def start_sender
55 56 57 58 59 60 61 |
# File 'lib/logstash/outputs/sumologic/sender.rb', line 55 def stop() log_info "shutting down sender..." @stopping.make_true() @queue.enq(STOP_TAG) @sender_t.join log_info "sender is fully shutted down" end |