Class: TDAnalytics::BatchConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/thinkingdata-ruby/batch_consumer.rb

Overview

BatchConsumer 批量同步的发送数据. 有数据时,首先会加入本地缓冲区,当条数到达上限后会发起上报

Constant Summary collapse

DEFAULT_LENGTH =

默认缓冲区大小

20
MAX_LENGTH =
2000

Instance Method Summary collapse

Constructor Details

#initialize(server_url, app_id, max_buffer_length = DEFAULT_LENGTH) ⇒ BatchConsumer



12
13
14
15
16
17
18
19
# File 'lib/thinkingdata-ruby/batch_consumer.rb', line 12

def initialize(server_url, app_id, max_buffer_length = DEFAULT_LENGTH)
  @server_uri = URI.parse(server_url)
  @server_uri.path = '/sync_server'
  @app_id = app_id
  @compress = true
  @max_length = [max_buffer_length, MAX_LENGTH].min
  @buffers = []
end

Instance Method Details

#_set_compress(compress) ⇒ Object



21
22
23
# File 'lib/thinkingdata-ruby/batch_consumer.rb', line 21

def _set_compress(compress)
  @compress = compress
end

#add(message) ⇒ Object



25
26
27
28
# File 'lib/thinkingdata-ruby/batch_consumer.rb', line 25

def add(message)
  @buffers << message
  flush if @buffers.length >= @max_length
end

#closeObject



30
31
32
# File 'lib/thinkingdata-ruby/batch_consumer.rb', line 30

def close
  flush
end

#flushObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/thinkingdata-ruby/batch_consumer.rb', line 34

def flush
  begin
    @buffers.each_slice(@max_length) do |chunk|
      if @compress
        wio = StringIO.new("w")
        gzip_io = Zlib::GzipWriter.new(wio)
        gzip_io.write(chunk.to_json)
        gzip_io.close
        data = wio.string
      else
        data = chunk.to_json
      end
      compress_type = @compress ? 'gzip' : 'none'
      headers = {'Content-Type' => 'application/plaintext',
                 'appid' => @app_id,
                 'compress' => compress_type,
                 'TA-Integration-Type'=>'Ruby',
                 'TA-Integration-Version'=>TDAnalytics::VERSION,
                 'TA-Integration-Count'=>@buffers.count,
                 'TA_Integration-Extra'=>'batch'}
      request = CaseSensitivePost.new(@server_uri.request_uri, headers)
      request.body = data

      begin
        response_code, response_body = _request(@server_uri, request)
      rescue => e
        raise ConnectionError.new("Could not connect to TA server, with error \"#{e.message}\".")
      end

      result = {}
      if response_code.to_i == 200
        begin
          result = JSON.parse(response_body.to_s)
        rescue JSON::JSONError
          raise ServerError.new("Could not interpret TA server response: '#{response_body}'")
        end
      end

      if result['code'] != 0
        raise ServerError.new("Could not write to TA, server responded with #{response_code} returning: '#{response_body}'")
      end
    end
  rescue
    raise
  end
  @buffers = []
end