Class: ClawDruid
Constant Summary collapse
- THRESHOLD =
ENV["DEBUG"] ? 5 : 30
- OPERATIONS =
{ '<' => "lessThan", '>' => 'greaterThan', '=' => 'equalTo' }
- FnAggregates =
{ "min" => "return Math.min(current, (COLUMN));", "max" => "return Math.max(current, (COLUMN));", "sum" => "return current + (COLUMN);" }
- TopN =
"topN"- GroupBy =
"groupBy"- TimeSeries =
"timeseries"- TimeBoundary =
"timeBoundary"- SegmentMetaData =
"segmentMetadata"- DataSourceMetaData =
"dataSourceMetadata"- Permit_Properties =
{ TopN => [:queryType, :dataSource, :intervals, :granularity, :filter, :aggregations, :postAggregations, :dimension, :threshold, :metric, :context], GroupBy => [:queryType, :dataSource, :dimensions, :limitSpec, :having, :granularity, :filter, :aggregations, :postAggregations, :intervals, :context], TimeSeries => [:queryType, :dataSource, :descending, :intervals, :granularity, :filter, :aggregations, :postAggregations, :context], TimeBoundary => [:queryType, :dataSource, :bound, :filter, :context], SegmentMetaData => [:queryType, :dataSource, :intervals, :toInclude, :merge, :context, :analysisTypes, :lenientAggregatorMerge], DataSourceMetaData => [:queryType, :dataSource, :context], }
Instance Method Summary collapse
- #count(*columns) ⇒ Object
- #delete ⇒ Object
- #each(&block) ⇒ Object
- #get ⇒ Object
- #group(*dimensions) ⇒ Object
- #having(*conditions) ⇒ Object
-
#initialize(params = {}) ⇒ ClawDruid
constructor
A new instance of ClawDruid.
- #limit(limit_count) ⇒ Object
- #map(&block) ⇒ Object
- #max_time ⇒ Object
- #meta_method(method, columns) ⇒ Object
- #min_time ⇒ Object
- #order(*columns) ⇒ Object
- #page(page_count) ⇒ Object
- #query(params = @params, page_count = nil) ⇒ Object
- #segment_meta ⇒ Object
- #select(*columns) ⇒ Object
- #source_meta ⇒ Object
- #time_boundary ⇒ Object
- #to_a ⇒ Object
- #to_s ⇒ Object
- #top(top_count) ⇒ Object
- #where(*conditions) ⇒ Object
Constructor Details
#initialize(params = {}) ⇒ ClawDruid
Returns a new instance of ClawDruid.
39 40 41 42 43 44 45 46 |
# File 'lib/claw_druid.rb', line 39 def initialize(params = {}) @url = params[:url] @params = {dataSource: params[:source], granularity: "all", queryType: "select"} @threshold = params[:threshold] || THRESHOLD # The page_identifiers of every query, the key is the params.hash of the query, the value is a identifiers like "publisher_daily_report_2017-02-02T00:00:00.000Z_2017-02-04T00:00:00.000Z_2017-03-30T12:10:27.053Z" @paging_identifiers = {} end |
Instance Method Details
#count(*columns) ⇒ Object
120 121 122 123 124 125 126 127 128 129 |
# File 'lib/claw_druid.rb', line 120 def count(*columns) @params[:queryType] ||= TimeSeries @params[:aggregations] ||= [] if columns.empty? @params[:aggregations] << { type: "count", name: "count" } else @params[:aggregations] += columns.map{|column| { type: "cardinality", name: "count(#{column})", fields: [column] } } end self end |
#delete ⇒ Object
295 296 297 298 299 |
# File 'lib/claw_druid.rb', line 295 def delete result = HTTParty.delete(@url) puts result.code if ENV["DEBUG"] result.body end |
#each(&block) ⇒ Object
281 282 283 |
# File 'lib/claw_druid.rb', line 281 def each(&block) to_a.each(&block) end |
#get ⇒ Object
289 290 291 292 293 |
# File 'lib/claw_druid.rb', line 289 def get result = HTTParty.get(@url) puts result.code if ENV["DEBUG"] result.body end |
#group(*dimensions) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/claw_druid.rb', line 48 def group(*dimensions) dimensions = dimensions[0] if dimensions.count == 1 && dimensions[0].is_a?(Array) @params[:queryType] = GroupBy lookup_dimensions = dimensions.except{|dimension| dimension.is_a? Hash } select_lookup(lookup_dimensions) if dimensions && dimensions.count > 0 @params[:dimensions] ||= [] @params[:dimensions] += dimensions.map(&:to_s).map(&:strip) end @params.delete(:metrics) self end |
#having(*conditions) ⇒ Object
215 216 217 218 219 220 221 222 223 |
# File 'lib/claw_druid.rb', line 215 def having(*conditions) # Process the ('a = ? and b = ?', 1, 2) conditions[0].gsub!(" \?").each_with_index { |v, i| " #{conditions[i + 1]}" } havings = having_chain(conditions[0]) @params[:having] = havings unless havings.blank? self end |
#limit(limit_count) ⇒ Object
183 184 185 186 187 188 |
# File 'lib/claw_druid.rb', line 183 def limit(limit_count) @params[:limitSpec] ||= {} @params[:limitSpec][:type] ||= "default" @params[:limitSpec][:limit] = limit_count self end |
#map(&block) ⇒ Object
285 286 287 |
# File 'lib/claw_druid.rb', line 285 def map(&block) to_a.map(&block) end |
#max_time ⇒ Object
251 252 253 254 255 |
# File 'lib/claw_druid.rb', line 251 def max_time @params[:queryType] = TimeBoundary @params[:bound] = "maxTime" self end |
#meta_method(method, columns) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/claw_druid.rb', line 88 def (method, columns) columns = columns[0] if columns.count == 1 and columns[0].is_a?(Array) @params[:queryType] ||= TimeSeries @params[:aggregations] ||= [] @params[:aggregations] += columns.map{|column, naming| naming ||= "#{method}(#{column})" fnAggregate = FnAggregates[method.to_s].gsub("COLUMN", column.to_s) if column[/( [\+\-\*\/] )/] fields = column.split(/ [\+\-\*\/] /) { type: "javascript", name: naming, fieldNames: fields, fnAggregate: "function(current, #{fields.join(', ')}) { #{fnAggregate} }", fnCombine: "function(partialA, partialB) { return partialA + partialB; }", fnReset: "function() { return 0; }" } else { type: "double#{method.capitalize}", name: naming, fieldName: column } end } @params[:aggregations].uniq! self end |
#min_time ⇒ Object
257 258 259 260 261 |
# File 'lib/claw_druid.rb', line 257 def min_time @params[:queryType] = TimeBoundary @params[:bound] = "minTime" self end |
#order(*columns) ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/claw_druid.rb', line 161 def order(*columns) columns = columns[0] if columns[0].is_a?(Hash) || columns[0].is_a?(Array) if @params[:queryType] != GroupBy @params[:metric] ||= [] @params[:metric] += columns.map{|column, direction| column } @params[:descending] = columns.any?{|column, direction| direction.to_s[/desc/]} end @params[:limitSpec] = { type: "default", limit: 500000, columns: columns.map{|column, direction| { dimension: column.to_s, direction: direction.to_s[/desc/] ? "descending" : "ascending", dimensionOrder: "lexicographic" } } } self end |
#page(page_count) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/claw_druid.rb', line 197 def page(page_count) if page_count == 1 @params[:pagingSpec] = {pagingIdentifiers: {}, threshold: @threshold} elsif page_count > 1 current = @params.hash @paging_identifiers[current] ||= {0 => {}} (1..page_count-1).each do |current_page| if begin @paging_identifiers[current][current_page].nil? rescue true end query(@params.merge(pagingSpec: {pagingIdentifiers: @paging_identifiers[current][current_page-1], threshold: @threshold}), current_page) end end if begin @paging_identifiers[current][page_count - 1].nil? rescue true end @params[:pagingSpec] = {pagingIdentifiers: @paging_identifiers[current][page_count - 1], threshold: @threshold} end self end |
#query(params = @params, page_count = nil) ⇒ Object
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/claw_druid.rb', line 225 def query(params = @params, page_count = nil) params = params.slice(*Permit_Properties[params[:queryType]]) ap params if ENV['DEBUG'] puts params.to_json if ENV['DEBUG'] result = HTTParty.post(@url, body: params.to_json, headers: { 'Content-Type' => 'application/json' }) puts result.code if ENV['DEBUG'] # The result is a String, try to find the existence of substring 'pagingIdentifiers'. if page_count && result["pagingIdentifiers"] params.delete(:pagingSpec) current = params.hash # The pagingIdentifiers is something like { "publisher_daily_report_2017-03-01T00:00:00.000Z_2017-03-11T00:00:00.000Z_2017-04-17T21:04:30.804Z" => -10 } @paging_identifiers[current] ||= {} @paging_identifiers[current][page_count] = JSON.parse(result.body)[0]["result"]["pagingIdentifiers"].transform_values{|value| value + 1} end # ap JSON.parse(result) if ENV['DEBUG'] result.body end |
#segment_meta ⇒ Object
268 269 270 271 |
# File 'lib/claw_druid.rb', line 268 def @params[:queryType] = SegmentMetaData self end |
#select(*columns) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/claw_druid.rb', line 64 def select(*columns) # Split the columns like ['sum(column_a) as sum_a, column_b'] columns = columns[0].split("\, ") if columns.count == 1 && columns[0].is_a?(String) && columns[0]["\, "] columns = columns[0] if columns.count == 1 && columns[0].is_a?(Array) return self if columns.all?{|column| column.blank? } # Add the 'i' to regex to be case-insensitive, cause the sum, max and min could be SUM, MAX and MIN post_columns = columns.except{|column| column[/(sum|max|min|count).+[\+\-\*\/]/i] } @params[:postAggregations] = post_columns.map{|post_column| post_chain(post_column) } unless post_columns.blank? method_columns = columns.except{|column| column.is_a?(String) && column[/(sum|max|min|count)\(.+\)/i] } method_columns.each{|column| method_column(column) } lookup_columns = columns.except{|column| column.is_a? Hash } select_lookup(lookup_columns) if columns && columns.count > 0 @params[:metrics] ||= [] @params[:metrics] += columns.map(&:to_s).map(&:strip) end self end |
#source_meta ⇒ Object
263 264 265 266 |
# File 'lib/claw_druid.rb', line 263 def @params[:queryType] = DataSourceMetaData self end |
#time_boundary ⇒ Object
246 247 248 249 |
# File 'lib/claw_druid.rb', line 246 def time_boundary @params[:queryType] = TimeBoundary self end |
#to_a ⇒ Object
277 278 279 |
# File 'lib/claw_druid.rb', line 277 def to_a @params[:queryType] == SegmentMetaData ? JSON.parse(query)[0]["columns"] : JSON.parse(query)[0]["result"]["events"] end |
#to_s ⇒ Object
273 274 275 |
# File 'lib/claw_druid.rb', line 273 def to_s query end |
#top(top_count) ⇒ Object
190 191 192 193 194 195 |
# File 'lib/claw_druid.rb', line 190 def top(top_count) @params[:queryType] = TopN @params[:threshold] = top_count @params[:metric] = @params.delete(:limitSpec)[:columns][0] if @params[:limitSpec] self end |
#where(*conditions) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/claw_druid.rb', line 131 def where(*conditions) if conditions[0].is_a?(Hash) conditions = conditions[0] begin_date = conditions.delete(:begin_date) end_date = conditions.delete(:end_date) @params[:intervals] = ["#{begin_date}/#{end_date}"] conditions = conditions.delete_if{|key, value| value.blank?}.map{|column, values| if !values.is_a?(Array) { type: "selector", dimension: column, value: values } elsif values.count == 1 { type: "selector", dimension: column, value: values[0] } else { type: "in", dimension: column, values: values } end }.compact elsif conditions[0].is_a?(String) conditions[0].gsub!(" \?").each_with_index { |v, i| " #{conditions[i + 1]}" } if conditions[0][" \?"] conditions = [where_chain( conditions[0] )] else conditions = nil end unless conditions.blank? @params[:filter] ||= { type: "and", fields: [] } @params[:filter][:fields] += conditions end self end |