Class: ClawDruid

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/claw_druid.rb

Constant Summary collapse

THRESHOLD =
ENV["DEBUG"] ? 5 : 30
OPERATIONS =
{
  '<' => "lessThan",
  '>' => 'greaterThan',
  '=' => 'equalTo'
}
FnAggregates =
{
  "min" => "return Math.min(current, (COLUMN));",
  "max" => "return Math.max(current, (COLUMN));",
  "sum" => "return current + (COLUMN);"
}
TopN =
"topN"
GroupBy =
"groupBy"
TimeSeries =
"timeseries"
TimeBoundary =
"timeBoundary"
SegmentMetaData =
"segmentMetadata"
DataSourceMetaData =
"dataSourceMetadata"
Permit_Properties =
{
  TopN => [:queryType, :dataSource, :intervals, :granularity, :filter, :aggregations, :postAggregations, :dimension, :threshold, :metric, :context],
  GroupBy => [:queryType, :dataSource, :dimensions, :limitSpec, :having, :granularity, :filter, :aggregations, :postAggregations, :intervals, :context],
  TimeSeries => [:queryType, :dataSource, :descending, :intervals, :granularity, :filter, :aggregations, :postAggregations, :context],
  TimeBoundary => [:queryType, :dataSource, :bound, :filter, :context],
  SegmentMetaData => [:queryType, :dataSource, :intervals, :toInclude, :merge, :context, :analysisTypes, :lenientAggregatorMerge],
  DataSourceMetaData => [:queryType, :dataSource, :context],
}

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ ClawDruid

Returns a new instance of ClawDruid.



39
40
41
42
43
44
45
46
# File 'lib/claw_druid.rb', line 39

def initialize(params = {})
  @url        = params[:url]
  @params     = {dataSource: params[:source], granularity: "all", queryType: "select"}
  @threshold  = params[:threshold] || THRESHOLD

  # The page_identifiers of every query, the key is the params.hash of the query, the value is a identifiers like "publisher_daily_report_2017-02-02T00:00:00.000Z_2017-02-04T00:00:00.000Z_2017-03-30T12:10:27.053Z"
  @paging_identifiers = {}
end

Instance Method Details

#count(*columns) ⇒ Object



120
121
122
123
124
125
126
127
128
129
# File 'lib/claw_druid.rb', line 120

def count(*columns)
  @params[:queryType]    ||= TimeSeries
  @params[:aggregations] ||= []
  if columns.empty?
    @params[:aggregations] << { type: "count", name: "count" }
  else
    @params[:aggregations] += columns.map{|column| { type: "cardinality", name: "count(#{column})", fields: [column] } }
  end
  self
end

#deleteObject



295
296
297
298
299
# File 'lib/claw_druid.rb', line 295

def delete
  result = HTTParty.delete(@url)
  puts result.code if ENV["DEBUG"]
  result.body
end

#each(&block) ⇒ Object



281
282
283
# File 'lib/claw_druid.rb', line 281

def each(&block)
  to_a.each(&block)
end

#getObject



289
290
291
292
293
# File 'lib/claw_druid.rb', line 289

def get
  result = HTTParty.get(@url)
  puts result.code if ENV["DEBUG"]
  result.body
end

#group(*dimensions) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/claw_druid.rb', line 48

def group(*dimensions)
  dimensions = dimensions[0] if dimensions.count == 1 && dimensions[0].is_a?(Array)

  @params[:queryType]  = GroupBy

  lookup_dimensions = dimensions.except{|dimension| dimension.is_a? Hash }
  select_lookup(lookup_dimensions)

  if dimensions && dimensions.count > 0
    @params[:dimensions] ||= []
    @params[:dimensions]  += dimensions.map(&:to_s).map(&:strip)
  end
  @params.delete(:metrics)
  self
end

#having(*conditions) ⇒ Object



215
216
217
218
219
220
221
222
223
# File 'lib/claw_druid.rb', line 215

def having(*conditions)
  # Process the ('a = ? and b = ?', 1, 2)
  conditions[0].gsub!(" \?").each_with_index { |v, i| " #{conditions[i + 1]}" }

  havings = having_chain(conditions[0])
  @params[:having] = havings unless havings.blank?
  
  self
end

#limit(limit_count) ⇒ Object



183
184
185
186
187
188
# File 'lib/claw_druid.rb', line 183

def limit(limit_count)
  @params[:limitSpec]         ||= {}
  @params[:limitSpec][:type]  ||= "default"
  @params[:limitSpec][:limit]   = limit_count
  self
end

#map(&block) ⇒ Object



285
286
287
# File 'lib/claw_druid.rb', line 285

def map(&block)
  to_a.map(&block)
end

#max_timeObject



251
252
253
254
255
# File 'lib/claw_druid.rb', line 251

def max_time
  @params[:queryType] = TimeBoundary
  @params[:bound]     = "maxTime"
  self
end

#meta_method(method, columns) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/claw_druid.rb', line 88

def meta_method(method, columns)
  columns = columns[0] if columns.count == 1 and columns[0].is_a?(Array)

  @params[:queryType]    ||= TimeSeries
  @params[:aggregations] ||= []
  @params[:aggregations] += columns.map{|column, naming| 
    naming       ||=  "#{method}(#{column})"
    fnAggregate    =  FnAggregates[method.to_s].gsub("COLUMN", column.to_s)
    if column[/( [\+\-\*\/] )/]
      fields = column.split(/ [\+\-\*\/] /)
      {
        type:         "javascript",
        name:         naming,
        fieldNames:   fields,
        fnAggregate:  "function(current, #{fields.join(', ')}) { #{fnAggregate} }",
        fnCombine:    "function(partialA, partialB) { return partialA + partialB; }",
        fnReset:      "function()                   { return 0; }"
      }
    else
      { type: "double#{method.capitalize}", name: naming, fieldName: column } 
    end
  }
  @params[:aggregations].uniq!
  self
end

#min_timeObject



257
258
259
260
261
# File 'lib/claw_druid.rb', line 257

def min_time
  @params[:queryType] = TimeBoundary
  @params[:bound]     = "minTime"
  self
end

#order(*columns) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/claw_druid.rb', line 161

def order(*columns)
  columns = columns[0] if columns[0].is_a?(Hash) || columns[0].is_a?(Array)
  
  if @params[:queryType] != GroupBy
    @params[:metric]   ||= []
    @params[:metric]    += columns.map{|column, direction| column }
    @params[:descending] = columns.any?{|column, direction| direction.to_s[/desc/]}
  end
  @params[:limitSpec] = {
    type: "default",
    limit: 500000,
    columns: columns.map{|column, direction| 
      {
        dimension: column.to_s,
        direction: direction.to_s[/desc/] ? "descending" : "ascending",
        dimensionOrder: "lexicographic"
      }
    }
  }
  self
end

#page(page_count) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/claw_druid.rb', line 197

def page(page_count)
  if page_count == 1
    @params[:pagingSpec] = {pagingIdentifiers: {}, threshold: @threshold}
  elsif page_count > 1
    current = @params.hash
    @paging_identifiers[current] ||= {0 => {}}

    (1..page_count-1).each do |current_page|
      if begin @paging_identifiers[current][current_page].nil? rescue true end
        query(@params.merge(pagingSpec: {pagingIdentifiers: @paging_identifiers[current][current_page-1], threshold: @threshold}), current_page)
      end
    end if begin @paging_identifiers[current][page_count - 1].nil? rescue true end

    @params[:pagingSpec] = {pagingIdentifiers: @paging_identifiers[current][page_count - 1], threshold: @threshold}
  end
  self
end

#query(params = @params, page_count = nil) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/claw_druid.rb', line 225

def query(params = @params, page_count = nil)
  params = params.slice(*Permit_Properties[params[:queryType]])
  ap params if ENV['DEBUG']
  puts params.to_json if ENV['DEBUG']
  result = HTTParty.post(@url, body: params.to_json, headers: { 'Content-Type' => 'application/json' })
  puts result.code if ENV['DEBUG']

  # The result is a String, try to find the existence of substring 'pagingIdentifiers'.
  if page_count && result["pagingIdentifiers"]
    params.delete(:pagingSpec)
    current = params.hash

    # The pagingIdentifiers is something like { "publisher_daily_report_2017-03-01T00:00:00.000Z_2017-03-11T00:00:00.000Z_2017-04-17T21:04:30.804Z" => -10 }
    @paging_identifiers[current]            ||= {}
    @paging_identifiers[current][page_count]  = JSON.parse(result.body)[0]["result"]["pagingIdentifiers"].transform_values{|value| value + 1}
  end
  # ap JSON.parse(result) if ENV['DEBUG']
  
  result.body
end

#segment_metaObject



268
269
270
271
# File 'lib/claw_druid.rb', line 268

def segment_meta
  @params[:queryType] = SegmentMetaData
  self
end

#select(*columns) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/claw_druid.rb', line 64

def select(*columns)
  # Split the columns like ['sum(column_a) as sum_a, column_b']
  columns = columns[0].split("\, ") if columns.count == 1 && columns[0].is_a?(String) && columns[0]["\, "]
  columns = columns[0]              if columns.count == 1 && columns[0].is_a?(Array)

  return self if columns.all?{|column| column.blank? }

  # Add the 'i' to regex to be case-insensitive, cause the sum, max and min could be SUM, MAX and MIN
  post_columns = columns.except{|column| column[/(sum|max|min|count).+[\+\-\*\/]/i] }
  @params[:postAggregations] = post_columns.map{|post_column| post_chain(post_column) } unless post_columns.blank?

  method_columns = columns.except{|column| column.is_a?(String) && column[/(sum|max|min|count)\(.+\)/i] }
  method_columns.each{|column| method_column(column) }

  lookup_columns = columns.except{|column| column.is_a? Hash }
  select_lookup(lookup_columns)
  
  if columns && columns.count > 0
    @params[:metrics]    ||= []
    @params[:metrics]     += columns.map(&:to_s).map(&:strip)
  end
  self
end

#source_metaObject



263
264
265
266
# File 'lib/claw_druid.rb', line 263

def source_meta
  @params[:queryType] = DataSourceMetaData
  self
end

#time_boundaryObject



246
247
248
249
# File 'lib/claw_druid.rb', line 246

def time_boundary
  @params[:queryType] = TimeBoundary
  self
end

#to_aObject



277
278
279
# File 'lib/claw_druid.rb', line 277

def to_a
  @params[:queryType] == SegmentMetaData ? JSON.parse(query)[0]["columns"] : JSON.parse(query)[0]["result"]["events"]
end

#to_sObject



273
274
275
# File 'lib/claw_druid.rb', line 273

def to_s
  query
end

#top(top_count) ⇒ Object



190
191
192
193
194
195
# File 'lib/claw_druid.rb', line 190

def top(top_count)
  @params[:queryType] = TopN
  @params[:threshold] = top_count
  @params[:metric] = @params.delete(:limitSpec)[:columns][0] if @params[:limitSpec]
  self
end

#where(*conditions) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/claw_druid.rb', line 131

def where(*conditions)
  if conditions[0].is_a?(Hash)
    conditions = conditions[0]
    begin_date = conditions.delete(:begin_date)
    end_date = conditions.delete(:end_date)
    @params[:intervals] = ["#{begin_date}/#{end_date}"]

    conditions = conditions.delete_if{|key, value| value.blank?}.map{|column, values|
      if !values.is_a?(Array)
        { type: "selector", dimension: column, value: values }
      elsif values.count == 1
        { type: "selector", dimension: column, value: values[0] }
      else
        { type: "in", dimension: column, values: values }
      end
    }.compact
  elsif conditions[0].is_a?(String)
    conditions[0].gsub!(" \?").each_with_index { |v, i| " #{conditions[i + 1]}" } if conditions[0][" \?"]
    conditions = [where_chain( conditions[0] )]
  else
    conditions = nil
  end

  unless conditions.blank?
    @params[:filter]          ||= { type: "and", fields: [] }
    @params[:filter][:fields]  += conditions
  end
  self
end