Class: SnowplowTracker::Emitter

Inherits:
Object
  • Object
show all
Includes:
Contracts
Defined in:
lib/snowplow-tracker/emitters.rb

Direct Known Subclasses

AsyncEmitter

Constant Summary collapse

@@ConfigHash =
({
  :protocol => Maybe[Or['http', 'https']],
  :port => Maybe[Num],
  :method => Maybe[Or['get', 'post']],
  :buffer_size => Maybe[Num],
  :on_success => Maybe[Func[Num => Any]],
  :on_failure => Maybe[Func[Num, Hash => Any]],
  :thread_count => Maybe[Num]
})
@@StrictConfigHash =
And[@@ConfigHash, lambda { |x|
  x.class == Hash and Set.new(x.keys).subset? Set.new(@@ConfigHash.keys)
}]
@@DefaultConfig =
{
  :protocol => 'http',
  :method => 'get'
}

Instance Method Summary collapse

Constructor Details

#initialize(endpoint, config = {}) ⇒ Emitter

Returns a new instance of Emitter.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/snowplow-tracker/emitters.rb', line 50

def initialize(endpoint, config={})
  config = @@DefaultConfig.merge(config)
  @lock = Monitor.new
  @collector_uri = as_collector_uri(endpoint, config[:protocol], config[:port], config[:method])
  @buffer = []
  if not config[:buffer_size].nil?
    @buffer_size = config[:buffer_size]
  elsif config[:method] == 'get'
    @buffer_size = 1
  else
    @buffer_size = 10
  end
  @method = config[:method]
  @on_success = config[:on_success]
  @on_failure = config[:on_failure]
  LOGGER.info("#{self.class} initialized with endpoint #{@collector_uri}")

  self
end

Instance Method Details

#flush(async = true) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/snowplow-tracker/emitters.rb', line 98

def flush(async=true)
  @lock.synchronize do
    send_requests(@buffer)
    @buffer = []
  end
  nil
end

#input(payload) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
# File 'lib/snowplow-tracker/emitters.rb', line 83

def input(payload)
  payload.each { |k,v| payload[k] = v.to_s}
  @lock.synchronize do
    @buffer.push(payload)
    if @buffer.size >= @buffer_size
      flush
    end
  end

  nil
end

#is_good_status_code(status_code) ⇒ Object



217
218
219
# File 'lib/snowplow-tracker/emitters.rb', line 217

def is_good_status_code(status_code)
  status_code.to_i >= 200 && status_code.to_i < 400
end

#send_requests(evts) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/snowplow-tracker/emitters.rb', line 109

def send_requests(evts)
  if evts.size < 1
    LOGGER.info("Skipping sending events since buffer is empty")
    return
  end
  LOGGER.info("Attempting to send #{evts.size} request#{evts.size == 1 ? '' : 's'}")

  evts.each do |event|
    event['stm'] = (Time.now.to_f * 1000).to_i.to_s # add the sent timestamp, overwrite if already exists
  end

  if @method == 'post'
    post_succeeded = false
    begin
      request = http_post(SelfDescribingJson.new(
        'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4',
        evts
      ).to_json)
      post_succeeded = is_good_status_code(request.code)
    rescue StandardError => se
      LOGGER.warn(se)
    end
    if post_succeeded
      unless @on_success.nil?
        @on_success.call(evts.size)
      end
    else
      unless @on_failure.nil?
        @on_failure.call(0, evts)
      end
    end

  elsif @method == 'get'
    success_count = 0
    unsent_requests = []
    evts.each do |evt|
      get_succeeded = false
      begin
        request = http_get(evt)
        get_succeeded = is_good_status_code(request.code)
      rescue StandardError => se
        LOGGER.warn(se)
      end
      if get_succeeded
        success_count += 1
      else
        unsent_requests << evt
      end
    end
    if unsent_requests.size == 0
      unless @on_success.nil?
        @on_success.call(success_count)
      end
    else
      unless @on_failure.nil?
        @on_failure.call(success_count, unsent_requests)
      end
    end
  end

  nil
end