Class: Mode::Connector::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/mode/connector/scheduler.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data_sources, options = {}) ⇒ Scheduler

Returns a new instance of Scheduler.



10
11
12
13
14
# File 'lib/mode/connector/scheduler.rb', line 10

def initialize(data_sources, options = {})
  @data_sources = data_sources
  @scheduler    = Rufus::Scheduler.new
  @max_jobs     = options[:max_jobs] || 4
end

Instance Attribute Details

#data_sourcesObject (readonly)

Returns the value of attribute data_sources.



8
9
10
# File 'lib/mode/connector/scheduler.rb', line 8

def data_sources
  @data_sources
end

#max_jobsObject (readonly)

Returns the value of attribute max_jobs.



6
7
8
# File 'lib/mode/connector/scheduler.rb', line 6

def max_jobs
  @max_jobs
end

#schedulerObject (readonly)

Returns the value of attribute scheduler.



7
8
9
# File 'lib/mode/connector/scheduler.rb', line 7

def scheduler
  @scheduler
end

Instance Method Details

#processorsObject



32
33
34
# File 'lib/mode/connector/scheduler.rb', line 32

def processors
  scheduler.jobs(:tag => 'processor')
end

#start!Object



16
17
18
19
20
21
22
# File 'lib/mode/connector/scheduler.rb', line 16

def start!
  data_sources.each(&:connection)
  # Make sure this stays outside the scheduler
  
  scheduler.every('5s') { tick }
  scheduler.join
end

#stop!Object



24
25
26
27
28
29
30
# File 'lib/mode/connector/scheduler.rb', line 24

def stop!
  stopper = Thread.new {
    scheduler.stop # Stop polling
  }
  
  stopper.join # wait for threads to finish
end