Class: Google::Gax::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/google/gax/bundling.rb

Overview

Coordinates the execution of a single bundle.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_call, bundle_id, bundled_field, bundling_request, subresponse_field: nil) ⇒ Task

Returns a new instance of Task.

Parameters:

  • api_call (Proc)

    used to make an api call when the task is run.

  • bundle_id (String)

    the id of this bundle.

  • bundled_field (String)

    the field used to create the bundled request.

  • bundling_request (Object)

    the request to pass as the arg to the api_call.

  • subresponse_field (String) (defaults to: nil)

    optional field used to demultiplex responses.



100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/google/gax/bundling.rb', line 100

def initialize(api_call,
               bundle_id,
               bundled_field,
               bundling_request,
               subresponse_field: nil)
  @api_call = api_call
  @bundle_id = bundle_id
  @bundled_field = bundled_field
  @bundling_request = bundling_request
  @subresponse_field = subresponse_field
  @inputs = []
  @events = []
end

Instance Attribute Details

#bundle_idString (readonly)

Returns the id of this bundle.

Returns:

  • (String)

    the id of this bundle.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/google/gax/bundling.rb', line 88

class Task
  attr_reader :bundle_id, :bundled_field,
              :subresponse_field

  # @param api_call [Proc] used to make an api call when the task is run.
  # @param bundle_id [String] the id of this bundle.
  # @param bundled_field [String] the field used to create the
  #     bundled request.
  # @param bundling_request [Object] the request to pass as the arg to
  #     the api_call.
  # @param subresponse_field [String] optional field used to demultiplex
  #     responses.
  def initialize(api_call,
                 bundle_id,
                 bundled_field,
                 bundling_request,
                 subresponse_field: nil)
    @api_call = api_call
    @bundle_id = bundle_id
    @bundled_field = bundled_field
    @bundling_request = bundling_request
    @subresponse_field = subresponse_field
    @inputs = []
    @events = []
  end

  # The number of bundled elements in the repeated field.
  # @return [Numeric]
  def element_count
    @inputs.reduce(0) { |acc, elem| acc + elem.count }
  end

  # The size of the request in bytes of the bundled field elements.
  # @return [Numeric]
  def request_bytesize
    @inputs.reduce(0) do |sum, elts|
      sum + elts.reduce(0) do |inner_sum, elt|
        inner_sum + elt.to_s.bytesize
      end
    end
  end

  # Call the task's api_call.
  #
  # The task's func will be called with the bundling requests function.
  def run
    return if @inputs.count == 0
    request = @bundling_request
    request[@bundled_field].clear
    request[@bundled_field].concat(@inputs.flatten)
    if !@subresponse_field.nil?
      run_with_subresponses(request)
    else
      run_with_no_subresponse(request)
    end
  end

  # Helper for #run to run the api call with no subresponses.
  #
  # @param request [Object] the request to pass as the arg to
  #     the api_call.
  def run_with_no_subresponse(request)
    response = @api_call.call(request)
    @events.each do |event|
      event.result = response
    end
  rescue GaxError => err
    @events.each do |event|
      event.result = err
    end
  ensure
    @inputs.clear
    @events.clear
  end

  # Helper for #run to run the api call with subresponses.
  #
  # @param request [Object] the request to pass as the arg to
  #     the api_call.
  # @param subresponse_field subresponse_field.
  def run_with_subresponses(request)
    response = @api_call.call(request)
    in_sizes_sum = 0
    @inputs.each { |elts| in_sizes_sum += elts.count }
    all_subresponses = response[@subresponse_field.to_s]
    if all_subresponses.count != in_sizes_sum
      # TODO: Implement a logging class to handle this.
      # warn DEMUX_WARNING
      @events.each do |event|
        event.result = response
      end
    else
      start = 0
      @inputs.zip(@events).each do |i, event|
        response_copy = response.dup
        subresponses = all_subresponses[start, i.count]
        response_copy[@subresponse_field].clear
        response_copy[@subresponse_field].concat(subresponses)
        start += i.count
        event.result = response_copy
      end
    end
  rescue GaxError => err
    @events.each do |event|
      event.result = err
    end
  ensure
    @inputs.clear
    @events.clear
  end

  # This adds elements to the tasks.
  #
  # @param elts [Array<Object>] an array of elements that can be appended
  #     to the tasks bundle_field.
  # @return [Event] an Event that can be used to wait on the response.
  def extend(elts)
    elts = [*elts]
    @inputs.push(elts)
    event = event_for(elts)
    @events.push(event)
    event
  end

  # Creates an Event that is set when the bundle with elts is sent.
  #
  # @param elts [Array<Object>] an array of elements that can be appended
  #     to the tasks bundle_field.
  # @return [Event] an Event that can be used to wait on the response.
  def event_for(elts)
    event = Event.new
    event.canceller = canceller_for(elts, event)
    event
  end

  # Creates a cancellation proc that removes elts.
  #
  # The returned proc returns true if all elements were successfully removed
  # from @inputs and  @events.
  #
  # @param elts [Array<Object>] an array of elements that can be appended
  #     to the tasks bundle_field.
  # @param [Event] an Event that can be used to wait on the response.
  # @return [Proc] the canceller that when called removes the elts
  #     and events.
  def canceller_for(elts, event)
    proc do
      event_index = @events.find_index(event) || -1
      in_index = @inputs.find_index(elts) || -1
      @events.delete_at(event_index) unless event_index == -1
      @inputs.delete_at(in_index) unless in_index == -1
      if event_index == -1 || in_index == -1
        false
      else
        true
      end
    end
  end

  private :run_with_no_subresponse,
          :run_with_subresponses,
          :event_for,
          :canceller_for
end

#bundled_fieldString (readonly)

Returns the field used to create the bundled request.

Returns:

  • (String)

    the field used to create the bundled request.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/google/gax/bundling.rb', line 88

class Task
  attr_reader :bundle_id, :bundled_field,
              :subresponse_field

  # @param api_call [Proc] used to make an api call when the task is run.
  # @param bundle_id [String] the id of this bundle.
  # @param bundled_field [String] the field used to create the
  #     bundled request.
  # @param bundling_request [Object] the request to pass as the arg to
  #     the api_call.
  # @param subresponse_field [String] optional field used to demultiplex
  #     responses.
  def initialize(api_call,
                 bundle_id,
                 bundled_field,
                 bundling_request,
                 subresponse_field: nil)
    @api_call = api_call
    @bundle_id = bundle_id
    @bundled_field = bundled_field
    @bundling_request = bundling_request
    @subresponse_field = subresponse_field
    @inputs = []
    @events = []
  end

  # The number of bundled elements in the repeated field.
  # @return [Numeric]
  def element_count
    @inputs.reduce(0) { |acc, elem| acc + elem.count }
  end

  # The size of the request in bytes of the bundled field elements.
  # @return [Numeric]
  def request_bytesize
    @inputs.reduce(0) do |sum, elts|
      sum + elts.reduce(0) do |inner_sum, elt|
        inner_sum + elt.to_s.bytesize
      end
    end
  end

  # Call the task's api_call.
  #
  # The task's func will be called with the bundling requests function.
  def run
    return if @inputs.count == 0
    request = @bundling_request
    request[@bundled_field].clear
    request[@bundled_field].concat(@inputs.flatten)
    if !@subresponse_field.nil?
      run_with_subresponses(request)
    else
      run_with_no_subresponse(request)
    end
  end

  # Helper for #run to run the api call with no subresponses.
  #
  # @param request [Object] the request to pass as the arg to
  #     the api_call.
  def run_with_no_subresponse(request)
    response = @api_call.call(request)
    @events.each do |event|
      event.result = response
    end
  rescue GaxError => err
    @events.each do |event|
      event.result = err
    end
  ensure
    @inputs.clear
    @events.clear
  end

  # Helper for #run to run the api call with subresponses.
  #
  # @param request [Object] the request to pass as the arg to
  #     the api_call.
  # @param subresponse_field subresponse_field.
  def run_with_subresponses(request)
    response = @api_call.call(request)
    in_sizes_sum = 0
    @inputs.each { |elts| in_sizes_sum += elts.count }
    all_subresponses = response[@subresponse_field.to_s]
    if all_subresponses.count != in_sizes_sum
      # TODO: Implement a logging class to handle this.
      # warn DEMUX_WARNING
      @events.each do |event|
        event.result = response
      end
    else
      start = 0
      @inputs.zip(@events).each do |i, event|
        response_copy = response.dup
        subresponses = all_subresponses[start, i.count]
        response_copy[@subresponse_field].clear
        response_copy[@subresponse_field].concat(subresponses)
        start += i.count
        event.result = response_copy
      end
    end
  rescue GaxError => err
    @events.each do |event|
      event.result = err
    end
  ensure
    @inputs.clear
    @events.clear
  end

  # This adds elements to the tasks.
  #
  # @param elts [Array<Object>] an array of elements that can be appended
  #     to the tasks bundle_field.
  # @return [Event] an Event that can be used to wait on the response.
  def extend(elts)
    elts = [*elts]
    @inputs.push(elts)
    event = event_for(elts)
    @events.push(event)
    event
  end

  # Creates an Event that is set when the bundle with elts is sent.
  #
  # @param elts [Array<Object>] an array of elements that can be appended
  #     to the tasks bundle_field.
  # @return [Event] an Event that can be used to wait on the response.
  def event_for(elts)
    event = Event.new
    event.canceller = canceller_for(elts, event)
    event
  end

  # Creates a cancellation proc that removes elts.
  #
  # The returned proc returns true if all elements were successfully removed
  # from @inputs and  @events.
  #
  # @param elts [Array<Object>] an array of elements that can be appended
  #     to the tasks bundle_field.
  # @param [Event] an Event that can be used to wait on the response.
  # @return [Proc] the canceller that when called removes the elts
  #     and events.
  def canceller_for(elts, event)
    proc do
      event_index = @events.find_index(event) || -1
      in_index = @inputs.find_index(elts) || -1
      @events.delete_at(event_index) unless event_index == -1
      @inputs.delete_at(in_index) unless in_index == -1
      if event_index == -1 || in_index == -1
        false
      else
        true
      end
    end
  end

  private :run_with_no_subresponse,
          :run_with_subresponses,
          :event_for,
          :canceller_for
end

#subresponse_fieldString (readonly)

Returns tptional field used to demultiplex responses.

Returns:

  • (String)

    tptional field used to demultiplex responses.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/google/gax/bundling.rb', line 88

class Task
  attr_reader :bundle_id, :bundled_field,
              :subresponse_field

  # @param api_call [Proc] used to make an api call when the task is run.
  # @param bundle_id [String] the id of this bundle.
  # @param bundled_field [String] the field used to create the
  #     bundled request.
  # @param bundling_request [Object] the request to pass as the arg to
  #     the api_call.
  # @param subresponse_field [String] optional field used to demultiplex
  #     responses.
  def initialize(api_call,
                 bundle_id,
                 bundled_field,
                 bundling_request,
                 subresponse_field: nil)
    @api_call = api_call
    @bundle_id = bundle_id
    @bundled_field = bundled_field
    @bundling_request = bundling_request
    @subresponse_field = subresponse_field
    @inputs = []
    @events = []
  end

  # The number of bundled elements in the repeated field.
  # @return [Numeric]
  def element_count
    @inputs.reduce(0) { |acc, elem| acc + elem.count }
  end

  # The size of the request in bytes of the bundled field elements.
  # @return [Numeric]
  def request_bytesize
    @inputs.reduce(0) do |sum, elts|
      sum + elts.reduce(0) do |inner_sum, elt|
        inner_sum + elt.to_s.bytesize
      end
    end
  end

  # Call the task's api_call.
  #
  # The task's func will be called with the bundling requests function.
  def run
    return if @inputs.count == 0
    request = @bundling_request
    request[@bundled_field].clear
    request[@bundled_field].concat(@inputs.flatten)
    if !@subresponse_field.nil?
      run_with_subresponses(request)
    else
      run_with_no_subresponse(request)
    end
  end

  # Helper for #run to run the api call with no subresponses.
  #
  # @param request [Object] the request to pass as the arg to
  #     the api_call.
  def run_with_no_subresponse(request)
    response = @api_call.call(request)
    @events.each do |event|
      event.result = response
    end
  rescue GaxError => err
    @events.each do |event|
      event.result = err
    end
  ensure
    @inputs.clear
    @events.clear
  end

  # Helper for #run to run the api call with subresponses.
  #
  # @param request [Object] the request to pass as the arg to
  #     the api_call.
  # @param subresponse_field subresponse_field.
  def run_with_subresponses(request)
    response = @api_call.call(request)
    in_sizes_sum = 0
    @inputs.each { |elts| in_sizes_sum += elts.count }
    all_subresponses = response[@subresponse_field.to_s]
    if all_subresponses.count != in_sizes_sum
      # TODO: Implement a logging class to handle this.
      # warn DEMUX_WARNING
      @events.each do |event|
        event.result = response
      end
    else
      start = 0
      @inputs.zip(@events).each do |i, event|
        response_copy = response.dup
        subresponses = all_subresponses[start, i.count]
        response_copy[@subresponse_field].clear
        response_copy[@subresponse_field].concat(subresponses)
        start += i.count
        event.result = response_copy
      end
    end
  rescue GaxError => err
    @events.each do |event|
      event.result = err
    end
  ensure
    @inputs.clear
    @events.clear
  end

  # This adds elements to the tasks.
  #
  # @param elts [Array<Object>] an array of elements that can be appended
  #     to the tasks bundle_field.
  # @return [Event] an Event that can be used to wait on the response.
  def extend(elts)
    elts = [*elts]
    @inputs.push(elts)
    event = event_for(elts)
    @events.push(event)
    event
  end

  # Creates an Event that is set when the bundle with elts is sent.
  #
  # @param elts [Array<Object>] an array of elements that can be appended
  #     to the tasks bundle_field.
  # @return [Event] an Event that can be used to wait on the response.
  def event_for(elts)
    event = Event.new
    event.canceller = canceller_for(elts, event)
    event
  end

  # Creates a cancellation proc that removes elts.
  #
  # The returned proc returns true if all elements were successfully removed
  # from @inputs and  @events.
  #
  # @param elts [Array<Object>] an array of elements that can be appended
  #     to the tasks bundle_field.
  # @param [Event] an Event that can be used to wait on the response.
  # @return [Proc] the canceller that when called removes the elts
  #     and events.
  def canceller_for(elts, event)
    proc do
      event_index = @events.find_index(event) || -1
      in_index = @inputs.find_index(elts) || -1
      @events.delete_at(event_index) unless event_index == -1
      @inputs.delete_at(in_index) unless in_index == -1
      if event_index == -1 || in_index == -1
        false
      else
        true
      end
    end
  end

  private :run_with_no_subresponse,
          :run_with_subresponses,
          :event_for,
          :canceller_for
end

Instance Method Details

#element_countNumeric

The number of bundled elements in the repeated field.

Returns:

  • (Numeric)


116
117
118
# File 'lib/google/gax/bundling.rb', line 116

def element_count
  @inputs.reduce(0) { |acc, elem| acc + elem.count }
end

#extend(elts) ⇒ Event

This adds elements to the tasks.

Parameters:

  • elts (Array<Object>)

    an array of elements that can be appended to the tasks bundle_field.

Returns:

  • (Event)

    an Event that can be used to wait on the response.



204
205
206
207
208
209
210
# File 'lib/google/gax/bundling.rb', line 204

def extend(elts)
  elts = [*elts]
  @inputs.push(elts)
  event = event_for(elts)
  @events.push(event)
  event
end

#request_bytesizeNumeric

The size of the request in bytes of the bundled field elements.

Returns:

  • (Numeric)


122
123
124
125
126
127
128
# File 'lib/google/gax/bundling.rb', line 122

def request_bytesize
  @inputs.reduce(0) do |sum, elts|
    sum + elts.reduce(0) do |inner_sum, elt|
      inner_sum + elt.to_s.bytesize
    end
  end
end

#runObject

Call the task’s api_call.

The task’s func will be called with the bundling requests function.



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/google/gax/bundling.rb', line 133

def run
  return if @inputs.count == 0
  request = @bundling_request
  request[@bundled_field].clear
  request[@bundled_field].concat(@inputs.flatten)
  if !@subresponse_field.nil?
    run_with_subresponses(request)
  else
    run_with_no_subresponse(request)
  end
end