Class: EventStoreClient::Client
- Inherits:
-
Object
- Object
- EventStoreClient::Client
- Defined in:
- lib/event_store_client/client.rb
Constant Summary collapse
- NoCallMethodOnSubscriber =
Class.new(StandardError)
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#service_name ⇒ Object
Returns the value of attribute service_name.
Instance Method Summary collapse
- #pool(interval: 5) ⇒ Object
- #publish(stream:, events:, expected_version: nil) ⇒ Object
- #read(stream, direction: 'forward') ⇒ Object
- #stop_pooling ⇒ Object
- #subscribe(subscriber, to: [], pooling: true) ⇒ Object
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
62 63 64 |
# File 'lib/event_store_client/client.rb', line 62 def connection @connection end |
#service_name ⇒ Object
Returns the value of attribute service_name.
62 63 64 |
# File 'lib/event_store_client/client.rb', line 62 def service_name @service_name end |
Instance Method Details
#pool(interval: 5) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/event_store_client/client.rb', line 23 def pool(interval: 5) return if @pooling_started @pooling_started = true thread1 = Thread.new do loop do create_pid_file Thread.handle_interrupt(Interrupt => :never) { begin Thread.handle_interrupt(Interrupt => :immediate) { broker.call(subscriptions) } rescue Exception => e # When the thread had been interrupted or broker.call returned an error sleep(interval) # wait for events to be processed delete_pid_file error_handler.call(e) if error_handler ensure # this code is run always Thread.stop end } end end thread2 = Thread.new do loop { sleep 1; break unless thread1.alive?; thread1.run } end @threads = [thread1, thread2] nil end |
#publish(stream:, events:, expected_version: nil) ⇒ Object
9 10 11 |
# File 'lib/event_store_client/client.rb', line 9 def publish(stream:, events:, expected_version: nil) connection.publish(stream: stream, events: events, expected_version: expected_version) end |
#read(stream, direction: 'forward') ⇒ Object
13 14 15 |
# File 'lib/event_store_client/client.rb', line 13 def read(stream, direction: 'forward') connection.read(stream, direction: direction) end |
#stop_pooling ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/event_store_client/client.rb', line 53 def stop_pooling return if @threads.none? @threads.each do |thread| thread.kill end @pooling_started = false nil end |
#subscribe(subscriber, to: [], pooling: true) ⇒ Object
17 18 19 20 21 |
# File 'lib/event_store_client/client.rb', line 17 def subscribe(subscriber, to: [], pooling: true) raise NoCallMethodOnSubscriber unless subscriber.respond_to?(:call) @subscriptions.create(subscriber, to) pool if pooling end |