Class: Logdna::Client
- Inherits:
-
Object
- Object
- Logdna::Client
- Defined in:
- lib/logdna/client.rb
Instance Method Summary collapse
-
#buffer(msg, opts) ⇒ Object
this should always be running synchronously within this thread.
- #check_side_buffer ⇒ Object
- #create_flush_task ⇒ Object
- #encode_message(msg) ⇒ Object
- #exitout ⇒ Object
-
#flush ⇒ Object
this should be running synchronously if @buffer_over_limit i.e.
-
#initialize(request, uri, opts) ⇒ Client
constructor
A new instance of Client.
- #message_hash(msg, opts = {}) ⇒ Object
- #process_buffer(buffer_size) ⇒ Object
- #queue_to_buffer(queue = @queue) ⇒ Object
- #write_to_buffer(msg, opts) ⇒ Object
Constructor Details
#initialize(request, uri, opts) ⇒ Client
Returns a new instance of Client.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/logdna/client.rb', line 10 def initialize(request, uri, opts) @uri = uri # NOTE: buffer is in memory @buffer = StringIO.new @messages = [] @buffer_over_limit = false @side_buffer = StringIO.new @side_messages = [] @lock = Mutex.new @task = nil # NOTE: the byte limit only affects the message, not the entire message_hash @actual_byte_limit = opts[:flushbyte] ||= Resources::FLUSH_BYTE_LIMIT @actual_flush_interval = opts[:flushtime] ||= Resources::FLUSH_INTERVAL @@request = request end |
Instance Method Details
#buffer(msg, opts) ⇒ Object
this should always be running synchronously within this thread
86 87 88 89 90 91 |
# File 'lib/logdna/client.rb', line 86 def buffer(msg, opts) buffer_size = write_to_buffer(msg, opts) unless buffer_size.nil? process_buffer(buffer_size) end end |
#check_side_buffer ⇒ Object
74 75 76 77 78 79 80 81 82 |
# File 'lib/logdna/client.rb', line 74 def check_side_buffer return if @side_buffer.size == 0 @buffer.write(@side_buffer.string) @side_buffer.truncate(0) = @side_messages @side_messages = [] .each { || @messages.push() } end |
#create_flush_task ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/logdna/client.rb', line 56 def create_flush_task return @task unless @task.nil? or !@task.running? t = Concurrent::TimerTask.new(execution_interval: @actual_flush_interval, timeout_interval: Resources::TIMER_OUT) do |task| if @messages.any? # keep running if there are queued messages, but don't flush # because the buffer is being flushed due to being over the limit unless @buffer_over_limit flush() end else # no messages means we can kill the task task.kill end end t.execute end |
#encode_message(msg) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/logdna/client.rb', line 31 def (msg) msg = msg.to_s unless msg.instance_of? String begin msg = msg.encode("UTF-8") rescue Encoding::UndefinedConversionError => e # NOTE: should this be raised or handled silently? # raise e end msg end |
#exitout ⇒ Object
157 158 159 160 161 162 163 164 |
# File 'lib/logdna/client.rb', line 157 def exitout() check_side_buffer if @messages.any? flush() end puts "Exiting LogDNA logger: Logging remaining messages" return end |
#flush ⇒ Object
this should be running synchronously if @buffer_over_limit i.e. called from self.buffer else asynchronously through @task
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/logdna/client.rb', line 126 def flush() if defined? @@request and !@@request.nil? = [] @lock.synchronize do = @messages @buffer.truncate(0) @messages = [] end return if .empty? real = { e: 'ls', ls: , }.to_json @@request.body = real @response = Net::HTTP.start(@uri.hostname, @uri.port, use_ssl: @uri.scheme == 'https') do |http| http.request(@@request) end puts "Result: #{@response.body}" unless .empty? # don't kill @task if this was executed from self.buffer # don't kill @task if there are queued messages unless @buffer_over_limit || @messages.any? || @task.nil? @task.shutdown @task.kill end end end |
#message_hash(msg, opts = {}) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/logdna/client.rb', line 43 def (msg, opts={}) obj = { line: msg, app: opts[:app], level: opts[:level], env: opts[:env], meta: opts[:meta], timestamp: Time.now.to_i, } obj.delete(:meta) if obj[:meta].nil? obj end |
#process_buffer(buffer_size) ⇒ Object
114 115 116 117 118 119 120 121 122 |
# File 'lib/logdna/client.rb', line 114 def process_buffer(buffer_size) if buffer_size > @actual_byte_limit @buffer_over_limit = true flush() @buffer_over_limit = false else @task = create_flush_task end end |
#queue_to_buffer(queue = @queue) ⇒ Object
109 110 111 112 |
# File 'lib/logdna/client.rb', line 109 def queue_to_buffer(queue=@queue) next_object = queue.shift write_to_buffer(next_object[:msg], next_object[:opts]) end |
#write_to_buffer(msg, opts) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/logdna/client.rb', line 93 def write_to_buffer(msg, opts) return if msg.nil? msg = (msg) if @lock.locked? @side_buffer.write(msg) @side_messages.push((msg, opts)) return end check_side_buffer buffer_size = @buffer.write(msg) @messages.push((msg, opts)) buffer_size end |