Class: Bricolage::SQSDataSource

Inherits:
DataSource
  • Object
show all
Defined in:
lib/bricolage/sqsdatasource.rb

Defined Under Namespace

Classes: DeleteMessageBuffer, MessageHandler

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(region:, url:, access_key_id: nil, secret_access_key: nil, visibility_timeout:, max_number_of_messages: 10, wait_time_seconds: 20, noop: false) ⇒ SQSDataSource

Returns a new instance of SQSDataSource.



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/bricolage/sqsdatasource.rb', line 13

def initialize(region:, url:, access_key_id: nil, secret_access_key: nil,
    visibility_timeout:, max_number_of_messages: 10, wait_time_seconds: 20, noop: false)
  @region = region
  @url = url
  @access_key_id = access_key_id
  @secret_access_key = secret_access_key
  @visibility_timeout = visibility_timeout
  @max_number_of_messages = max_number_of_messages
  @wait_time_seconds = wait_time_seconds
  @noop = noop
end

Instance Attribute Details

#access_key_idObject (readonly)

Returns the value of attribute access_key_id.



27
28
29
# File 'lib/bricolage/sqsdatasource.rb', line 27

def access_key_id
  @access_key_id
end

#max_number_of_messagesObject (readonly)

Returns the value of attribute max_number_of_messages.



31
32
33
# File 'lib/bricolage/sqsdatasource.rb', line 31

def max_number_of_messages
  @max_number_of_messages
end

#regionObject (readonly)

Returns the value of attribute region.



25
26
27
# File 'lib/bricolage/sqsdatasource.rb', line 25

def region
  @region
end

#secret_access_keyObject (readonly)

Returns the value of attribute secret_access_key.



28
29
30
# File 'lib/bricolage/sqsdatasource.rb', line 28

def secret_access_key
  @secret_access_key
end

#urlObject (readonly)

Returns the value of attribute url.



26
27
28
# File 'lib/bricolage/sqsdatasource.rb', line 26

def url
  @url
end

#visibility_timeoutObject (readonly)

Returns the value of attribute visibility_timeout.



30
31
32
# File 'lib/bricolage/sqsdatasource.rb', line 30

def visibility_timeout
  @visibility_timeout
end

#wait_time_secondsObject (readonly)

Returns the value of attribute wait_time_seconds.



32
33
34
# File 'lib/bricolage/sqsdatasource.rb', line 32

def wait_time_seconds
  @wait_time_seconds
end

Class Method Details

.new_mock(**args) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/bricolage/sqsmock.rb', line 6

def SQSDataSource.new_mock(**args)
  SQSDataSource.new(
    region: 'ap-northeast-1',
    url: 'http://sqs/000000000000/queue-name',
    access_key_id: 'access_key_id_1',
    secret_access_key: 'secret_access_key_1',
    visibility_timeout: 30
  ).tap {|ds|
    logger = NullLogger.new
    #logger = Bricolage::Logger.default
    ds.__send__(:initialize_base, 'name', nil, logger)
    ds.instance_variable_set(:@client, SQSMock::Client.new(**args))
  }
end

Instance Method Details

#clientObject



34
35
36
37
38
39
# File 'lib/bricolage/sqsdatasource.rb', line 34

def client
  @client ||= begin
    c = @noop ? DummySQSClient.new : Aws::SQS::Client.new(region: @region, access_key_id: @access_key_id, secret_access_key: @secret_access_key)
    SQSClientWrapper.new(c, logger: logger)
  end
end

#delete_message(msg) ⇒ Object



184
185
186
187
188
189
# File 'lib/bricolage/sqsdatasource.rb', line 184

def delete_message(msg)
  client.delete_message(
    queue_url: @url,
    receipt_handle: msg.receipt_handle
  )
end

#delete_message_async(msg) ⇒ Object



191
192
193
# File 'lib/bricolage/sqsdatasource.rb', line 191

def delete_message_async(msg)
  delete_message_buffer.put(msg)
end

#handle_messages(handler:, message_class:) ⇒ Object

High-Level Polling Interface



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/bricolage/sqsdatasource.rb', line 45

def handle_messages(handler:, message_class:)
  trap_signals
  polling_loop do
    result = poll or next true
    msgs = message_class.for_sqs_result(result)
    msgs.each do |msg|
      handler.handle(msg)
    end
    handler.after_message_batch
    break if terminating?
    msgs.empty?
  end
end

#initiate_terminateObject



70
71
72
73
# File 'lib/bricolage/sqsdatasource.rb', line 70

def initiate_terminate
  # No I/O allowed in this method
  @terminating = true
end

#pollObject



100
101
102
103
104
105
106
107
108
# File 'lib/bricolage/sqsdatasource.rb', line 100

def poll
  result = receive_messages()
  unless result and result.successful?
    logger.error "ReceiveMessage failed: #{result ? result.error.message : '(result=nil)'}"
    return nil
  end
  logger.info "receive #{result.messages.size} messages"
  result
end

#process_async_delete(now = Time.now) ⇒ Object



195
196
197
# File 'lib/bricolage/sqsdatasource.rb', line 195

def process_async_delete(now = Time.now)
  delete_message_buffer.flush(now)
end

#process_async_delete_forceObject



199
200
201
# File 'lib/bricolage/sqsdatasource.rb', line 199

def process_async_delete_force
  delete_message_buffer.flush_force
end

#put(msg) ⇒ Object



161
162
163
# File 'lib/bricolage/sqsdatasource.rb', line 161

def put(msg)
  send_message(msg)
end

#receive_messagesObject

API-Level Interface



151
152
153
154
155
156
157
158
159
# File 'lib/bricolage/sqsdatasource.rb', line 151

def receive_messages
  result = client.receive_message(
    queue_url: @url,
    max_number_of_messages: @max_number_of_messages,
    visibility_timeout: @visibility_timeout,
    wait_time_seconds: @wait_time_seconds
  )
  result
end

#send_event(name, source: SQSMessage::SQS_EVENT_SOURCE, time: Time.now, **attrs) ⇒ Object



177
178
179
180
181
182
# File 'lib/bricolage/sqsdatasource.rb', line 177

def send_event(name, source: SQSMessage::SQS_EVENT_SOURCE, time: Time.now, **attrs)
  attrs['eventName'] = name
  attrs['eventSource'] = source
  attrs['eventTime'] = time.iso8601
  send_object(attrs)
end

#send_message(msg) ⇒ Object



165
166
167
168
169
170
171
# File 'lib/bricolage/sqsdatasource.rb', line 165

def send_message(msg)
  client.send_message(
    queue_url: @url,
    message_body: { 'Records' => [msg.body] }.to_json,
    delay_seconds: msg.delay_seconds
  )
end

#send_object(obj) ⇒ Object



173
174
175
# File 'lib/bricolage/sqsdatasource.rb', line 173

def send_object(obj)
  send_message(ObjectMessage.new(obj))
end

#terminating?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/bricolage/sqsdatasource.rb', line 75

def terminating?
  @terminating
end