Class: RubyEventStore::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/client.rb

Defined Under Namespace

Classes: Within

Instance Method Summary collapse

Constructor Details

#initialize(repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new) ⇒ Client

Returns a new instance of Client.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/ruby_event_store/client.rb', line 7

def initialize(
  repository: InMemoryRepository.new,
  mapper: Mappers::BatchMapper.new,
  subscriptions: nil,
  dispatcher: nil,
  message_broker: nil,
  clock: default_clock,
  correlation_id_generator: default_correlation_id_generator,
  event_type_resolver: EventTypeResolver.new
)
  @repository = repository
  @mapper = batch_mapper?(mapper) ? mapper : Mappers::BatchMapper.new(mapper)
  @broker =
    message_broker ||
      Broker.new(subscriptions: subscriptions || Subscriptions.new, dispatcher: dispatcher || Dispatcher.new)
  @clock = clock
  @metadata = Concurrent::ThreadLocalVar.new
  @correlation_id_generator = correlation_id_generator
  @event_type_resolver = event_type_resolver

  if (subscriptions || dispatcher)
    warn <<~EOW
      Passing subscriptions and dispatcher to #{self.class} has been deprecated.

      Pass it using message_broker argument. For example:

      event_store = RubyEventStore::Client.new(
        message_broker: RubyEventStore::Broker.new(
          subscriptions: RubyEventStore::Subscriptions.new,
          dispatcher: RubyEventStore::Dispatcher.new
        )
      )
    EOW
    warn <<~EOW if (message_broker)

        Because message_broker has been provided,
        arguments passed by subscriptions or dispatcher will be ignored.
      EOW
  end
end

Instance Method Details

#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists new event(s) without notifying any subscribed handlers

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)


84
85
86
87
88
89
90
91
# File 'lib/ruby_event_store/client.rb', line 84

def append(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  append_records_to_stream(
    transform((events)),
    stream_name: stream_name,
    expected_version: expected_version,
  )
  self
end

#delete_stream(stream_name) ⇒ self

Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.

Parameters:

  • stream_name (String)

    name of the stream to be cleared.

Returns:

  • (self)


111
112
113
114
# File 'lib/ruby_event_store/client.rb', line 111

def delete_stream(stream_name)
  repository.delete_stream(Stream.new(stream_name))
  self
end

#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event

Deserialize event which was serialized for async event handlers Read more

Returns:

  • (Event)

    deserialized event



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/ruby_event_store/client.rb', line 310

def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil)
  extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 }

  mapper.records_to_events(
    [
      SerializedRecord.new(
        event_type: event_type,
        event_id: event_id,
        data: data,
        metadata: ,
        timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load()],
        valid_at: valid_at || timestamp_,
      ).deserialize(serializer),
    ],
  ).first
end

#event_in_stream?(event_id, stream_name) ⇒ Boolean

Checks whether event is linked in given stream

Parameters:

  • event_id (String)
  • stream_name (String)

Returns:

  • (Boolean)

    true if event is linked to given stream, false otherwise



163
164
165
166
# File 'lib/ruby_event_store/client.rb', line 163

def event_in_stream?(event_id, stream_name)
  stream = Stream.new(stream_name)
  stream.global? ? repository.has_event?(event_id) : repository.event_in_stream?(event_id, stream)
end

#global_position(event_id) ⇒ Integer

Gets position of the event in global stream

The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.

Parameters:

  • event_id (String)

Returns:

  • (Integer)

    nonnegno ative integer position of event in global stream

Raises:



154
155
156
# File 'lib/ruby_event_store/client.rb', line 154

def global_position(event_id)
  repository.global_position(event_id)
end

#inspectObject



369
370
371
# File 'lib/ruby_event_store/client.rb', line 369

def inspect
  "#<#{self.class}:0x#{__id__.to_s(16)}>"
end

Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.

Parameters:

  • event_ids (String, Array<String>)

    ids of events

  • stream_name (String)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)


100
101
102
103
# File 'lib/ruby_event_store/client.rb', line 100

def link(event_ids, stream_name:, expected_version: :any)
  repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version))
  self
end

#metadataHash

Read additional metadata which will be added for published events Read more

Returns:

  • (Hash)


331
332
333
# File 'lib/ruby_event_store/client.rb', line 331

def 
  @metadata.value || EMPTY_HASH
end

#overwrite(events_or_event) ⇒ self

Overwrite existing event(s) with the same ID.

Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. Read more

Examples:

Add data and metadata to existing events


events = event_store.read.limit(10).to_a
events.each do |ev|
  ev.data[:tenant_id] = 1
  ev.[:server_id] = "eu-west-2"
end
event_store.overwrite(events)

Change event type


events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev|
  NewType.new(
    event_id: ev.event_id,
    data: ev.data,
    metadata: ev.,
  )
end
event_store.overwrite(events)

Parameters:

  • events (Array<Event>, Event)

    event(s) to serialize and overwrite again

Returns:

  • (self)


364
365
366
367
# File 'lib/ruby_event_store/client.rb', line 364

def overwrite(events_or_event)
  repository.update_messages(transform(Array(events_or_event)))
  self
end

#position_in_stream(event_id, stream_name) ⇒ Integer

Gets position of the event in given stream

The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.

Parameters:

  • event_id (String)
  • stream_name (String)

Returns:

  • (Integer)

    nonnegative integer position of event in stream

Raises:

  • (EventNotInStream)


141
142
143
# File 'lib/ruby_event_store/client.rb', line 141

def position_in_stream(event_id, stream_name)
  repository.position_in_stream(event_id, Stream.new(stream_name))
end

#publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists events and notifies subscribed handlers about them

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/ruby_event_store/client.rb', line 59

def publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any)
  enriched_events = (events)
  records = transform(enriched_events)
  append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version)
  enriched_events.zip(records) do |event, record|
    (correlation_id: event..fetch(:correlation_id), causation_id: event.event_id) do
      if broker.public_method(:call).arity == 3
        broker.call(topic || event.event_type, event, record)
      else
        warn <<~EOW
          Message broker shall support topics.
          Topic WILL BE IGNORED in the current broker.
          Modify the broker implementation to pass topic as an argument to broker.call method.
        EOW
        broker.call(event, record)
      end
    end
  end
  self
end

#readSpecification

Starts building a query specification for reading events. / More info.

Returns:



120
121
122
# File 'lib/ruby_event_store/client.rb', line 120

def read
  Specification.new(SpecificationReader.new(repository, mapper))
end

#rescue_from_double_json_serialization!Object



48
49
50
51
# File 'lib/ruby_event_store/client.rb', line 48

def rescue_from_double_json_serialization!
  return unless repository.respond_to? :rescue_from_double_json_serialization!
  repository.rescue_from_double_json_serialization!
end

#streams_of(event_id) ⇒ Array<Stream>

Gets list of streams where event is stored or linked

Returns:

  • (Array<Stream>)

    where event is stored or linked



127
128
129
# File 'lib/ruby_event_store/client.rb', line 127

def streams_of(event_id)
  repository.streams_of(event_id)
end

#subscribe(subscriber, to: ) ⇒ Proc #subscribe(to: , &subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for published events of provided type.

Overloads:

  • #subscribe(subscriber, to: ) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>) (defaults to: )

      types of events to subscribe

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe(to: , &subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>) (defaults to: )

      types of events to subscribe

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)


180
181
182
183
184
# File 'lib/ruby_event_store/client.rb', line 180

def subscribe(subscriber = nil, to:, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  subscriber ||= proc
  broker.add_subscription(subscriber, to.map { |event_klass| event_type_resolver.call(event_klass) })
end

#subscribe_to_all_events(subscriber) ⇒ Proc #subscribe_to_all_events(&subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for all published events

Overloads:

  • #subscribe_to_all_events(subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe_to_all_events(&subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)


196
197
198
199
# File 'lib/ruby_event_store/client.rb', line 196

def subscribe_to_all_events(subscriber = nil, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  broker.add_global_subscription(subscriber || proc)
end

#subscribers_for(event_class) ⇒ Array<Object, Class>

Get list of handlers subscribed to an event

Parameters:

  • to (Class, String)

    type of events to get list of sybscribed handlers

Returns:

  • (Array<Object, Class>)


205
206
207
# File 'lib/ruby_event_store/client.rb', line 205

def subscribers_for(event_class)
  broker.all_subscriptions_for(event_type_resolver.call(event_class))
end

#with_metadata(metadata_for_block, &block) ⇒ Object

Set additional metadata for all events published within the provided block Read more

Parameters:

  • metadata (Hash)

    metadata to set for events

  • block (Proc)

    block of code during which the metadata will be added

Returns:

  • (Object)

    last value returned by the provided block



298
299
300
301
302
303
304
# File 'lib/ruby_event_store/client.rb', line 298

def (, &block)
   = 
  self. = .merge()
  block.call if block_given?
ensure
  self. = 
end

#within(&block) ⇒ Within

Use for starting temporary subscriptions. Read more

Parameters:

  • block (Proc)

    block of code during which the temporary subscriptions will be active

Returns:

  • (Within)

    builder object which collects temporary subscriptions

Raises:

  • (ArgumentError)


287
288
289
290
# File 'lib/ruby_event_store/client.rb', line 287

def within(&block)
  raise ArgumentError if block.nil?
  Within.new(block, broker, event_type_resolver)
end