Class: PgEventstore::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_eventstore/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Client

Returns a new instance of Client.

Parameters:



14
15
16
# File 'lib/pg_eventstore/client.rb', line 14

def initialize(config)
  @config = config
end

Instance Attribute Details

#config=(value) ⇒ PgEventstore::Config



10
11
12
# File 'lib/pg_eventstore/client.rb', line 10

def config
  @config
end

Instance Method Details

#append_to_stream(stream, events_or_event, options: {}, middlewares: nil) ⇒ PgEventstore::Event+

Append the event or multiple events to the stream. This operation is atomic, meaning that no other event can be appended by parallel process between the given events.

Parameters:

Options Hash (options:):

  • :expected_revision (Integer)

    provide your own revision number

  • :expected_revision (Symbol)

    provide one of next values: :any, :no_stream or :stream_exists

Returns:

Raises:



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/pg_eventstore/client.rb', line 28

def append_to_stream(stream, events_or_event, options: {}, middlewares: nil)
  result =
    Commands::Append.new(
      Queries.new(
        partitions: partition_queries,
        events: event_queries(middlewares(middlewares)),
        transactions: transaction_queries
      )
    ).call(stream, *events_or_event, options: options)
  events_or_event.is_a?(Array) ? result : result.first
end

Links event from one stream into another stream. You can later access it by providing :resolve_link_tos option when reading from a stream. Only existing events can be linked.

Parameters:

  • stream (PgEventstore::Stream)
  • events_or_event (PgEventstore::Event, Array<PgEventstore::Event>)
  • options (Hash) (defaults to: {})
  • middlewares (Array) (defaults to: [])

    provide a list of middleware names to use. Defaults to empty array, meaning no middlewares will be applied to the “link” event

Options Hash (options:):

  • :expected_revision (Integer)

    provide your own revision number

  • :expected_revision (Symbol)

    provide one of next values: :any, :no_stream or :stream_exists

Returns:

Raises:



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/pg_eventstore/client.rb', line 155

def link_to(stream, events_or_event, options: {}, middlewares: [])
  result =
    Commands::LinkTo.new(
      Queries.new(
        partitions: partition_queries,
        events: event_queries(middlewares(middlewares)),
        transactions: transaction_queries
      )
    ).call(stream, *events_or_event, options: options)
  events_or_event.is_a?(Array) ? result : result.first
end

#multiple(&blk) ⇒ Object

Allows you to make several different commands atomic by wrapping then into a block. Order of events, produced by multiple commands, belonging to different streams - is unbreakable. So, if you append event1 to stream1 and event2 to stream2 using this method, then thet appear in the same order in the “all” stream. Example:

PgEventstore.client.multiple do
  PgEventstore.client.read(...)
  PgEventstore.client.append_to_stream(...)
  PgEventstore.client.append_to_stream(...)
end

Returns:

  • the result of the given block



51
52
53
# File 'lib/pg_eventstore/client.rb', line 51

def multiple(&blk)
  Commands::Multiple.new(Queries.new(transactions: transaction_queries)).call(&blk)
end

#read(stream, options: {}, middlewares: nil) ⇒ Array<PgEventstore::Event>

Read events from the specific stream or from “all” stream.

Parameters:

  • stream (PgEventstore::Stream)
  • options (Hash) (defaults to: {})

    request options

  • middlewares (Array, nil) (defaults to: nil)

    provide a list of middleware names to override a config’s middlewares

Options Hash (options:):

  • :direction (String)

    read direction. Allowed values are “Forwards”, “Backwards”, “asc”, “desc”, :asc, :desc

  • :from_revision (Integer)

    a starting revision number. **Use this option when stream name is a normal stream name**

  • :from_position (Integer)

    a starting global position number. **Use this option when reading from “all” stream**

  • :max_count (Integer)

    max number of events to return in one response. Defaults to config.max_count

  • :resolve_link_tos (Boolean)

    When using projections to create new events you can set whether the generated events are pointers to existing events. Setting this option to true tells PgEventstore to return the original event instead a link event.

  • :filter (Hash)

    provide it to filter events. You can filter by: stream and by event type. Filtering by stream is only available when reading from “all” stream. Examples:

    # Filtering by stream's context. This will return all events which #context is 'User
    PgEventstore.client.read(
      PgEventstore::Stream.all_stream,
      options: { filter: { streams: [{ context: 'User' }] } }
    )
    
    # Filtering by several stream's contexts. This will return all events which #context is either 'User' or
    # 'Profile'
    PgEventstore.client.read(
      PgEventstore::Stream.all_stream,
      options: { filter: { streams: [{ context: 'User' }, { context: 'Profile' }] } }
    )
    
    # Filtering by a mix of specific stream and a context. This will return all events which #context is 'User' or
    # events belonging to the stream with { context: 'Profile', stream_name: 'ProfileFields', stream_id: '123' }
    PgEventstore.client.read(
      PgEventstore::Stream.all_stream,
      options: {
        filter: {
          streams: [
            { context: 'User' },
            { context: 'Profile', stream_name: 'ProfileFields', stream_id: '123' }
          ]
        }
      }
    )
    
    # Filtering a mix of context and event type
    PgEventstore.client.read(
      PgEventstore::Stream.all_stream,
      options: { filter: { streams: [{ context: 'User' }], event_types: ['MyAwesomeEvent'] } }
    )
    
    # Filtering by specific event when reading from the specific stream
    PgEventstore.client.read(stream, options: { filter: { event_types: ['MyAwesomeEvent'] } })
    

Returns:

Raises:



109
110
111
112
113
# File 'lib/pg_eventstore/client.rb', line 109

def read(stream, options: {}, middlewares: nil)
  Commands::Read.
    new(Queries.new(partitions: partition_queries, events: event_queries(middlewares(middlewares)))).
    call(stream, options: { max_count: config.max_count }.merge(options))
end

#read_grouped(stream, options: {}, middlewares: nil) ⇒ Array<PgEventstore::Event>

Takes a stream, determines a list of even types in it and returns most recent(or very first - depending on :direction option) events, one of each type. If :event_types filter is provided - uses it instead of automatic event types lookup logic. The result size is almost always less than or equal to event types list size, so passing :max_count option does not make any effect. In case if event of same type appears in different context/stream name - it will be counted as a different event, thus, may appear several times in the result.

Parameters:

  • stream (PgEventstore::Stream)
  • options (Hash) (defaults to: {})

    request options

  • middlewares (Array, nil) (defaults to: nil)

Returns:

See Also:

  • for the detailed docs


137
138
139
140
141
142
# File 'lib/pg_eventstore/client.rb', line 137

def read_grouped(stream, options: {}, middlewares: nil)
  cmd_class = stream.all_stream? ? Commands::AllStreamReadGrouped : Commands::RegularStreamReadGrouped
  cmd_class.
    new(Queries.new(partitions: partition_queries, events: event_queries(middlewares(middlewares)))).
    call(stream, options: options)
end

#read_paginated(stream, options: {}, middlewares: nil) ⇒ Enumerator

Returns enumerator will yield PgEventstore::Event.

Parameters:

  • stream (PgEventstore::Stream)
  • options (Hash) (defaults to: {})

    request options

  • middlewares (Array, nil) (defaults to: nil)

Returns:

  • (Enumerator)

    enumerator will yield PgEventstore::Event

See Also:

  • for the detailed docs


120
121
122
123
124
125
# File 'lib/pg_eventstore/client.rb', line 120

def read_paginated(stream, options: {}, middlewares: nil)
  cmd_class = stream.system? ? Commands::SystemStreamReadPaginated : Commands::RegularStreamReadPaginated
  cmd_class.
    new(Queries.new(partitions: partition_queries, events: event_queries(middlewares(middlewares)))).
    call(stream, options: { max_count: config.max_count }.merge(options))
end