Class: LogStash::Outputs::SumoLogic::Sender

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/logstash/outputs/sumologic/sender.rb

Constant Summary collapse

STOP_TAG =
"PLUGIN STOPPED"

Constants included from Common

Common::CARBON2, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::STATS_TAG

Instance Method Summary collapse

Methods included from Common

#log_dbg, #log_err, #log_info, #log_warn, #set_logger

Constructor Details

#initialize(client, queue, stats, config) ⇒ Sender

Returns a new instance of Sender.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/logstash/outputs/sumologic/sender.rb', line 18

def initialize(client, queue, stats, config)
  @client = client
  @queue = queue
  @stats = stats
  @stopping = Concurrent::AtomicBoolean.new(false)
  @url = config["url"]
  @sender_max = (config["sender_max"] ||= 1) < 1 ? 1 : config["sender_max"]
  @sleep_before_requeue = config["sleep_before_requeue"] ||= 30
  @stats_enabled = config["stats_enabled"] ||= false

  @tokens = SizedQueue.new(@sender_max)
  @sender_max.times { |t| @tokens << t }

  @header_builder = LogStash::Outputs::SumoLogic::HeaderBuilder.new(config)
  @headers = @header_builder.build()
  @stats_headers = @header_builder.build_stats()
  @compressor = LogStash::Outputs::SumoLogic::Compressor.new(config)

end

Instance Method Details

#connectObject

def stop_sender



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/logstash/outputs/sumologic/sender.rb', line 63

def connect()
  uri = URI.parse(@url)
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = @url.downcase().start_with?("https")
  request = Net::HTTP::Get.new(uri.request_uri)
  begin
    res = http.request(request)
    if res.code.to_i != 200
      log_err(
        "Server rejected the request",
        :url => @url,
        :code => res.code
      )
      false
    else
      log_dbg(
        "Server accepted the request",
        :url => @url
      )
      true
    end
  rescue Exception => ex
    log_err(
      "Cannot connect to given url",
      :url => @url,
      :exception => ex
    )
    false
  end
end

#startObject

def initialize



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/logstash/outputs/sumologic/sender.rb', line 38

def start()
  @stopping.make_false()
  @sender_t = Thread.new {
    while @stopping.false?
      content = @queue.deq()
      send_request(content)
    end # while
    @queue.drain().map { |content| 
      send_request(content)
    }
    log_info "waiting messages sent out..."
    while @tokens.size < @sender_max
      sleep 1
    end # while
  }
end

#stopObject

def start_sender



55
56
57
58
59
60
61
# File 'lib/logstash/outputs/sumologic/sender.rb', line 55

def stop()
  log_info "shutting down sender..."
  @stopping.make_true()
  @queue.enq(STOP_TAG)
  @sender_t.join
  log_info "sender is fully shutted down"
end