Class: EventStoreClient::Connection
- Inherits:
-
Object
- Object
- EventStoreClient::Connection
- Defined in:
- lib/event_store_client/connection.rb
Instance Method Summary collapse
- #consume_feed(stream, subscription) ⇒ Object
- #delete_stream(stream) ⇒ Object
- #publish(stream:, events:, expected_version: nil) ⇒ Object
- #read(stream, direction: 'forward') ⇒ Object
- #subscribe(stream, name:) ⇒ Object
Instance Method Details
#consume_feed(stream, subscription) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/event_store_client/connection.rb', line 34 def consume_feed(stream, subscription) response = client.consume_feed(stream, subscription) return [] unless response.body body = JSON.parse(response.body) ack_uri = body['links'].find { |link| link['relation'] == 'ackAll' }. try(:[], 'uri') events = body['entries'].map do |entry| event = EventStoreClient::Event.new( id: entry['eventId'], type: entry['eventType'], data: entry['data'] || '{}', metadata: entry['isMetaData'] ? entry['metaData'] : '{}' ) mapper.deserialize(event) end client.ack(ack_uri) events end |
#delete_stream(stream) ⇒ Object
28 |
# File 'lib/event_store_client/connection.rb', line 28 def delete_stream(stream); end |
#publish(stream:, events:, expected_version: nil) ⇒ Object
5 6 7 8 9 10 11 |
# File 'lib/event_store_client/connection.rb', line 5 def publish(stream:, events:, expected_version: nil) serialized_events = events.map { |event| mapper.serialize(event) } client.append_to_stream( stream, serialized_events, expected_version: expected_version ) serialized_events end |
#read(stream, direction: 'forward') ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/event_store_client/connection.rb', line 13 def read(stream, direction: 'forward') response = client.read(stream, start: 0, direction: direction) return [] unless response.body&.present? JSON.parse(response.body)['entries'].map do |entry| event = EventStoreClient::Event.new( id: entry['eventId'], type: entry['eventType'], data: entry['data'], metadata: entry['metaData'] ) mapper.deserialize(event) end end |
#subscribe(stream, name:) ⇒ Object
30 31 32 |
# File 'lib/event_store_client/connection.rb', line 30 def subscribe(stream, name:) client.subscribe_to_stream(stream, name) end |