Class: Logdna::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/logdna/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(request, uri, opts) ⇒ Client

Returns a new instance of Client.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/logdna/client.rb', line 10

def initialize(request, uri, opts)
  @uri = uri

  # NOTE: buffer is in memory
  @buffer = StringIO.new
  @messages = []
  @buffer_over_limit = false

  @side_buffer = StringIO.new
  @side_messages = []

  @lock = Mutex.new
  @task = nil

  # NOTE: the byte limit only affects the message, not the entire message_hash
  @actual_byte_limit = opts[:flushbyte] ||= Resources::FLUSH_BYTE_LIMIT
  @actual_flush_interval = opts[:flushtime] ||= Resources::FLUSH_INTERVAL

  @@request = request
end

Instance Method Details

#buffer(msg, opts) ⇒ Object

this should always be running synchronously within this thread



86
87
88
89
90
91
# File 'lib/logdna/client.rb', line 86

def buffer(msg, opts)
  buffer_size = write_to_buffer(msg, opts)
  unless buffer_size.nil?
    process_buffer(buffer_size)
  end
end

#check_side_bufferObject



74
75
76
77
78
79
80
81
82
# File 'lib/logdna/client.rb', line 74

def check_side_buffer
  return if @side_buffer.size == 0

  @buffer.write(@side_buffer.string)
  @side_buffer.truncate(0)
  queued_side_messages = @side_messages
  @side_messages = []
  queued_side_messages.each { |message_hash_obj| @messages.push(message_hash_obj) }
end

#create_flush_taskObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/logdna/client.rb', line 56

def create_flush_task
  return @task unless @task.nil? or !@task.running?

  t = Concurrent::TimerTask.new(execution_interval: @actual_flush_interval, timeout_interval: Resources::TIMER_OUT) do |task|
    if @messages.any?
      # keep running if there are queued messages, but don't flush
      # because the buffer is being flushed due to being over the limit
      unless @buffer_over_limit
        flush()
      end
    else
      # no messages means we can kill the task
      task.kill
    end
  end
  t.execute
end

#encode_message(msg) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/logdna/client.rb', line 31

def encode_message(msg)
  msg = msg.to_s unless msg.instance_of? String

  begin
      msg = msg.encode("UTF-8")
  rescue Encoding::UndefinedConversionError => e
    # NOTE: should this be raised or handled silently?
    # raise e
  end
  msg
end

#exitoutObject



157
158
159
160
161
162
163
164
# File 'lib/logdna/client.rb', line 157

def exitout()
  check_side_buffer
  if @messages.any?
    flush()
  end
  puts "Exiting LogDNA logger: Logging remaining messages"
  return
end

#flushObject

this should be running synchronously if @buffer_over_limit i.e. called from self.buffer else asynchronously through @task



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/logdna/client.rb', line 126

def flush()
  if defined? @@request and !@@request.nil?
    request_messages = []
    @lock.synchronize do
      request_messages = @messages
      @buffer.truncate(0)
      @messages = []
    end
    return if request_messages.empty?

    real = {
      e: 'ls',
      ls: request_messages,
    }.to_json

    @@request.body = real
    @response = Net::HTTP.start(@uri.hostname, @uri.port, use_ssl: @uri.scheme == 'https') do |http|
      http.request(@@request)
    end

    puts "Result: #{@response.body}" unless request_messages.empty?

    # don't kill @task if this was executed from self.buffer
    # don't kill @task if there are queued messages
    unless @buffer_over_limit || @messages.any? || @task.nil?
      @task.shutdown
      @task.kill
    end
  end
end

#message_hash(msg, opts = {}) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/logdna/client.rb', line 43

def message_hash(msg, opts={})
  obj = {
    line: msg,
    app: opts[:app],
    level: opts[:level],
    env: opts[:env],
    meta: opts[:meta],
    timestamp: Time.now.to_i,
  }
  obj.delete(:meta) if obj[:meta].nil?
  obj
end

#process_buffer(buffer_size) ⇒ Object



114
115
116
117
118
119
120
121
122
# File 'lib/logdna/client.rb', line 114

def process_buffer(buffer_size)
  if buffer_size > @actual_byte_limit
    @buffer_over_limit = true
    flush()
    @buffer_over_limit = false
  else
    @task = create_flush_task
  end
end

#queue_to_buffer(queue = @queue) ⇒ Object



109
110
111
112
# File 'lib/logdna/client.rb', line 109

def queue_to_buffer(queue=@queue)
  next_object = queue.shift
  write_to_buffer(next_object[:msg], next_object[:opts])
end

#write_to_buffer(msg, opts) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/logdna/client.rb', line 93

def write_to_buffer(msg, opts)
  return if msg.nil?
  msg = encode_message(msg)

  if @lock.locked?
    @side_buffer.write(msg)
    @side_messages.push(message_hash(msg, opts))
    return
  end

  check_side_buffer
  buffer_size = @buffer.write(msg)
  @messages.push(message_hash(msg, opts))
  buffer_size
end