Class: EventStoreClient::HTTP::Commands::PersistentSubscriptions::Read

Inherits:
Command
  • Object
show all
Includes:
Configuration
Defined in:
lib/event_store_client/adapters/http/commands/persistent_subscriptions/read.rb

Instance Method Summary collapse

Methods included from Configuration

#config

Methods inherited from Command

inherited

Instance Method Details

#call(stream_name, subscription_name, options: {}) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/event_store_client/adapters/http/commands/persistent_subscriptions/read.rb', line 11

def call(stream_name, subscription_name, options: {})
  count = options[:count] || 20
  long_poll = options[:long_poll].to_i
  headers = long_poll.positive? ? { 'ES-LongPoll' => long_poll.to_s } : {}
  headers['Content-Type'] = 'application/vnd.eventstore.competingatom+json'
  headers['Accept'] = 'application/vnd.eventstore.competingatom+json'
  headers['ES-ResolveLinktos'] = (options[:resolve_links] || true).to_s

  response = connection.call(
    :get,
    "/subscriptions/#{stream_name}/#{subscription_name}/#{count}",
    headers: headers
  )

  return { events: [] } if response.body.nil? || response.body.empty?

  body = JSON.parse(response.body)

  ack_info = body['links'].find { |link| link['relation'] == 'ackAll' }
  return { events: [] } unless ack_info

  skip_decryption = options[:skip_decryption] || false
  body['entries'].map do |entry|
    yield deserialize_event(entry, skip_decryption: skip_decryption)
  end

  Ack.new(connection).call(ack_info['uri'])
  Success()
end