Class: NatsWork::Client

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/natswork/client.rb,
lib/natswork/client/version.rb

Constant Summary collapse

VERSION =
'0.0.1'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/natswork/client.rb', line 23

def initialize
  @configuration = Configuration.instance
  @connection_pool = nil
  @jetstream_manager = nil
  @mutex = Mutex.new
  @result_store = {}
  @result_callbacks = {}
  @scheduled_jobs = []
  @result_expiration_times = {}
  start_result_expiration_thread
end

Instance Attribute Details

#configurationObject (readonly)

Returns the value of attribute configuration.



21
22
23
# File 'lib/natswork/client.rb', line 21

def configuration
  @configuration
end

#connection_poolObject (readonly)

Returns the value of attribute connection_pool.



21
22
23
# File 'lib/natswork/client.rb', line 21

def connection_pool
  @connection_pool
end

#jetstream_managerObject (readonly)

Returns the value of attribute jetstream_manager.



21
22
23
# File 'lib/natswork/client.rb', line 21

def jetstream_manager
  @jetstream_manager
end

Class Method Details

.batch(jobs) ⇒ Object



85
86
87
# File 'lib/natswork/client.rb', line 85

def self.batch(jobs)
  instance.batch(jobs)
end

.perform_async(options) ⇒ Object



73
74
75
# File 'lib/natswork/client.rb', line 73

def self.perform_async(options)
  instance.push(options)
end

.perform_at(time, options) ⇒ Object



81
82
83
# File 'lib/natswork/client.rb', line 81

def self.perform_at(time, options)
  instance.perform_at(time, options)
end

.perform_in(delay, options) ⇒ Object



77
78
79
# File 'lib/natswork/client.rb', line 77

def self.perform_in(delay, options)
  instance.perform_in(delay, options)
end

.perform_sync(options) ⇒ Object



69
70
71
# File 'lib/natswork/client.rb', line 69

def self.perform_sync(options)
  instance.perform_sync(options)
end

.push(options) ⇒ Object

Main API for job dispatching



65
66
67
# File 'lib/natswork/client.rb', line 65

def self.push(options)
  instance.push(options)
end

.startObject

Connection management



56
57
58
# File 'lib/natswork/client.rb', line 56

def self.start
  instance.start_connection
end

.stopObject



60
61
62
# File 'lib/natswork/client.rb', line 60

def self.stop
  instance.stop_connection
end

Instance Method Details

#batch(jobs) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/natswork/client.rb', line 180

def batch(jobs)
  ensure_connected!

  batch_id = SecureRandom.uuid
  job_ids = []

  connection_pool.with_connection do |connection|
    jobs.each do |job_options|
      # Merge batch metadata into existing metadata
       = (job_options[:metadata] || {}).merge(
        'batch_id' => batch_id,
        'batch_size' => jobs.size
      )

      message = build_message(job_options)
      message..merge!()
      message.validate!

      subject = subject_for_queue(message.queue)

      if configuration.use_jetstream
        js = connection.jetstream
        js.publish(subject, message.to_json)
      else
        connection.publish(subject, message.to_json)
      end

      job_ids << message.job_id
    end
  end

  { batch_id: batch_id, job_ids: job_ids }
end

#cancel_job(job_id) ⇒ Object



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/natswork/client.rb', line 234

def cancel_job(job_id)
  # Publish cancellation message
  connection_pool.with_connection do |connection|
    cancel_subject = "#{configuration.namespace}.control.cancel"

    cancel_message = {
      type: 'job.cancel',
      job_id: job_id,
      timestamp: Time.now.iso8601
    }

    connection.publish(cancel_subject, cancel_message.to_json)
  end
  true
end

#configure {|@configuration| ... } ⇒ Object

Yields:



35
36
37
38
# File 'lib/natswork/client.rb', line 35

def configure
  yield @configuration if block_given?
  reset_connection!
end

#job_status(job_id) ⇒ Object

Job status and management



215
216
217
218
219
# File 'lib/natswork/client.rb', line 215

def job_status(job_id)
  @mutex.synchronize do
    @result_store[job_id]
  end
end

#on_result(job_id, &block) ⇒ Object



250
251
252
253
254
255
# File 'lib/natswork/client.rb', line 250

def on_result(job_id, &block)
  @mutex.synchronize do
    @result_callbacks[job_id] ||= []
    @result_callbacks[job_id] << block
  end
end

#perform_at(time, options) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/natswork/client.rb', line 157

def perform_at(time, options)
  ensure_connected!

  message = build_message(options)
  message.['scheduled_at'] = time.iso8601
  message.['scheduled_for'] = time.to_i
  message.validate!

  # Store scheduled job for later processing
  @mutex.synchronize do
    @scheduled_jobs << {
      message: message,
      scheduled_for: time,
      status: 'pending'
    }
  end

  # Start scheduler thread if not running
  ensure_scheduler_running

  message.job_id
end

#perform_in(delay, options) ⇒ Object



153
154
155
# File 'lib/natswork/client.rb', line 153

def perform_in(delay, options)
  perform_at(Time.now + delay, options)
end

#perform_sync(options) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/natswork/client.rb', line 112

def perform_sync(options)
  ensure_connected!

  message = build_message(options)
  message.validate!

  result = nil
  error = nil

  connection_pool.with_connection do |connection|
    subject = subject_for_queue(message.queue)
    timeout_seconds = options[:timeout] || configuration.sync_timeout || 5

    # Use NATS request-reply pattern
    response = connection.connection.request(subject, message.to_json, timeout: timeout_seconds)

    raise TimeoutError, "Job #{message.job_id} timed out after #{timeout_seconds} seconds" unless response

    reply_data = JSON.parse(response.data, symbolize_names: true)

    if reply_data[:success]
      result = reply_data[:result]
    else
      error = reply_data[:error] || 'Job execution failed'
      raise JobError.new(
        error,
        job_class: message.job_class,
        job_id: message.job_id,
        original_error: reply_data
      )
    end

    result
  rescue NATS::IO::Timeout
    raise TimeoutError, "Job #{message.job_id} timed out after #{timeout_seconds} seconds"
  rescue JSON::ParserError => e
    raise JobError.new("Failed to parse response: #{e.message}", job_class: message.job_class,
                                                                 job_id: message.job_id)
  end
end

#push(options) ⇒ Object

Instance methods



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/natswork/client.rb', line 90

def push(options)
  ensure_connected!

  message = build_message(options)
  message.validate!

  connection_pool.with_connection do |connection|
    subject = subject_for_queue(message.queue)

    # For JetStream persistence
    if configuration.use_jetstream
      js = connection.jetstream
      js.publish(subject, message.to_json)
    else
      # For Core NATS
      connection.publish(subject, message.to_json)
    end
  end

  message.job_id
end

#reset_connection!Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/natswork/client.rb', line 40

def reset_connection!
  shutdown_existing_connections
  @connection_pool = ConnectionPool.new(
    size: @configuration.pool_size || 5,
    timeout: @configuration.pool_timeout || 5,
    connection_options: {
      servers: @configuration.servers || ['nats://localhost:4222'],
      max_reconnect_attempts: @configuration.max_reconnect_attempts || 10,
      reconnect_time_wait: @configuration.reconnect_time_wait || 2
    }
  )
  # JetStreamManager will be created lazily when needed
  ensure_streams_exist
end

#shutdownObject



257
258
259
260
261
# File 'lib/natswork/client.rb', line 257

def shutdown
  @mutex.synchronize do
    shutdown_existing_connections
  end
end

#start_connectionObject



263
264
265
# File 'lib/natswork/client.rb', line 263

def start_connection
  ensure_connected!
end

#stop_connectionObject



267
268
269
# File 'lib/natswork/client.rb', line 267

def stop_connection
  shutdown
end

#store_result(job_id, result, ttl = 3600) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/natswork/client.rb', line 221

def store_result(job_id, result, ttl = 3600)
  @mutex.synchronize do
    @result_store[job_id] = result
    @result_expiration_times[job_id] = Time.now + ttl

    # Trigger callbacks if any
    if @result_callbacks[job_id]
      @result_callbacks[job_id].each { |callback| callback.call(result) }
      @result_callbacks.delete(job_id)
    end
  end
end