Class: Listen::Listener

Inherits:
Object
  • Object
show all
Includes:
Celluloid::FSM, QueueOptimizer
Defined in:
lib/listen/listener.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QueueOptimizer

#_calculate_add_remove_difference, #_detect_possible_editor_save, #_logical_action_for, #_reinterpret_related_changes, #_smoosh_changes, #_squash_changes

Constructor Details

#initialize(*args) {|modified, added, removed| ... } ⇒ Listener

Initializes the directories listener.

Parameters:

  • directory (String)

    the directories to listen to

  • options (Hash)

    the listen options (see Listen::Listener::Options)

Yields:

  • (modified, added, removed)

    the changed files

Yield Parameters:

  • modified (Array<String>)

    the list of modified files

  • added (Array<String>)

    the list of added files

  • removed (Array<String>)

    the list of removed files



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/listen/listener.rb', line 31

def initialize(*args, &block)
  @options = _init_options(args.last.is_a?(Hash) ? args.pop : {})

  # Setup logging first
  Celluloid.logger.level = _debug_level
  _log :info, "Celluloid loglevel set to: #{Celluloid.logger.level}"

  @tcp_mode = nil
  if [:recipient, :broadcaster].include? args[1]
    target = args.shift
    @tcp_mode = args.shift
    _init_tcp_options(target)
  end

  @directories = args.flatten.map { |path| Pathname.new(path).realpath }
  @queue = Queue.new
  @block = block
  @registry = Celluloid::Registry.new

  transition :stopped
end

Instance Attribute Details

#blockObject

Returns the value of attribute block.



14
15
16
# File 'lib/listen/listener.rb', line 14

def block
  @block
end

#directoriesObject (readonly)

TODO: deprecate



17
18
19
# File 'lib/listen/listener.rb', line 17

def directories
  @directories
end

#hostObject (readonly)

Returns the value of attribute host.



19
20
21
# File 'lib/listen/listener.rb', line 19

def host
  @host
end

#optionsObject (readonly)

TODO: deprecate



17
18
19
# File 'lib/listen/listener.rb', line 17

def options
  @options
end

#portObject (readonly)

Returns the value of attribute port.



19
20
21
# File 'lib/listen/listener.rb', line 19

def port
  @port
end

#registryObject (readonly)

Returns the value of attribute registry.



18
19
20
# File 'lib/listen/listener.rb', line 18

def registry
  @registry
end

#supervisorObject (readonly)

Returns the value of attribute supervisor.



18
19
20
# File 'lib/listen/listener.rb', line 18

def supervisor
  @supervisor
end

#wait_threadObject (readonly, private)

Returns the value of attribute wait_thread.



275
276
277
# File 'lib/listen/listener.rb', line 275

def wait_thread
  @wait_thread
end

Instance Method Details

#_adapter_classObject (private)



251
252
253
# File 'lib/listen/listener.rb', line 251

def _adapter_class
  @adapter_class ||= Adapter.select(options)
end

#_debug_levelObject (private)



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/listen/listener.rb', line 176

def _debug_level
  # TODO: remove? (since there are BSD warnings anyway)
  bsd = RbConfig::CONFIG['host_os'] =~ /bsd|dragonfly/
  return Logger::DEBUG if bsd

  debugging = ENV['LISTEN_GEM_DEBUGGING'] || options[:debug]
  case debugging.to_s
  when /2/
    Logger::DEBUG
  when /true|yes|1/i
    Logger::INFO
  else
    Logger::ERROR
  end
end

#_init_actorsObject (private)



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/listen/listener.rb', line 192

def _init_actors
  @supervisor = Celluloid::SupervisionGroup.run!(registry)
  supervisor.add(Silencer, as: :silencer, args: self)
  supervisor.add(Record, as: :record, args: self)
  supervisor.pool(Change, as: :change_pool, args: self)

  if @tcp_mode == :broadcaster
    require 'listen/tcp/broadcaster'
    supervisor.add(TCP::Broadcaster, as: :broadcaster, args: [@host, @port])

    # TODO: should be auto started, because if it crashes
    # a new instance is spawned by supervisor, but it's 'start' isn't
    # called
    registry[:broadcaster].start
  end

  supervisor.add(_adapter_class, as: :adapter, args: self)
end

#_init_options(options = {}) ⇒ Object (private)



168
169
170
171
172
173
174
# File 'lib/listen/listener.rb', line 168

def _init_options(options = {})
  { debug: false,
    latency: nil,
    wait_for_delay: 0.1,
    force_polling: false,
    polling_fallback_message: nil }.merge(options)
end

#_init_tcp_options(target) ⇒ Object (private)



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/listen/listener.rb', line 277

def _init_tcp_options(target)
  # Handle TCP options here
  require 'listen/tcp'
  fail ArgumentError, 'missing host/port for TCP' unless target

  if @tcp_mode == :recipient
    @host = 'localhost'
    @options[:force_tcp] = true
  end

  if target.is_a? Fixnum
    @port = target
  else
    @host, port = target.split(':')
    @port = port.to_i
  end
end

#_log(type, message) ⇒ Object (private)



247
248
249
# File 'lib/listen/listener.rb', line 247

def _log(type, message)
  Celluloid.logger.send(type, message)
end

#_process_changesObject (private)

for easier testing without sleep loop



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/listen/listener.rb', line 256

def _process_changes
  return if @queue.empty?

  @last_queue_event_time = nil

  changes = []
  while !@queue.empty?
    changes << @queue.pop
  end

  return if block.nil?

  hash = _smoosh_changes(changes)
  result = [hash[:modified], hash[:added], hash[:removed]]

  # TODO: condition not tested, but too complex to test ATM
  block.call(*result) unless result.all?(&:empty?)
end

#_reconfigure_silencer(extra_options) ⇒ Object (private)



295
296
297
298
# File 'lib/listen/listener.rb', line 295

def _reconfigure_silencer(extra_options)
  @options.merge!(extra_options)
  registry[:silencer] = Silencer.new(self)
end

#_silenced?(path, type) ⇒ Boolean (private)

Returns:

  • (Boolean)


238
239
240
# File 'lib/listen/listener.rb', line 238

def _silenced?(path, type)
  sync(:silencer).silenced?(path, type)
end

#_start_adapterObject (private)



242
243
244
245
# File 'lib/listen/listener.rb', line 242

def _start_adapter
  # Don't run async, because configuration has to finish first
  sync(:adapter).start
end

#_start_wait_threadObject (private)



300
301
302
# File 'lib/listen/listener.rb', line 300

def _start_wait_thread
  @wait_thread = Thread.new { _wait_for_changes }
end

#_stop_wait_threadObject (private)



308
309
310
311
312
313
314
315
# File 'lib/listen/listener.rb', line 308

def _stop_wait_thread
  return unless wait_thread
  if wait_thread.alive?
    wait_thread.wakeup
    wait_thread.join
  end
  @wait_thread = nil
end

#_wait_for_changesObject (private)



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/listen/listener.rb', line 211

def _wait_for_changes
  latency = options[:wait_for_delay]

  loop do
    break if state == :stopped

    if state == :paused || @queue.empty?
      sleep
      break if state == :stopped
    end

    # Assure there's at least latency between callbacks to allow
    # for accumulating changes
    now = Time.now.to_f
    diff = latency + (@last_queue_event_time || now) - now
    if diff > 0
      sleep diff
      next
    end

    _process_changes unless state == :paused
  end
rescue RuntimeError
  Kernel.warn "[Listen warning]: Change block raised an exception: #{$!}"
  Kernel.warn "Backtrace:\n\t#{[email protected]("\n\t")}"
end

#_wakeup_wait_threadObject (private)



304
305
306
# File 'lib/listen/listener.rb', line 304

def _wakeup_wait_thread
  wait_thread.wakeup if wait_thread && wait_thread.alive?
end

#async(type) ⇒ Object



139
140
141
142
# File 'lib/listen/listener.rb', line 139

def async(type)
  proxy = sync(type)
  proxy ? proxy.async : nil
end

#ignore(regexps) ⇒ Object

Add files and dirs to ignore on top of defaults

(@see Listen::Silencer for default ignored files and dirs)



125
126
127
# File 'lib/listen/listener.rb', line 125

def ignore(regexps)
  _reconfigure_silencer(ignore: [options[:ignore], regexps])
end

#ignore!(regexps) ⇒ Object

Replace default ignore patterns with provided regexp



130
131
132
# File 'lib/listen/listener.rb', line 130

def ignore!(regexps)
  _reconfigure_silencer(ignore: [], ignore!: regexps)
end

#only(regexps) ⇒ Object

Listen only to files and dirs matching regexp



135
136
137
# File 'lib/listen/listener.rb', line 135

def only(regexps)
  _reconfigure_silencer(only: regexps)
end

#pauseObject

Stops invoking callbacks (messages pile up)



97
98
99
# File 'lib/listen/listener.rb', line 97

def pause
  transition :paused
end

#paused=(value) ⇒ Object

TODO: deprecate



114
115
116
# File 'lib/listen/listener.rb', line 114

def paused=(value)
  transition value ? :paused : :processing
end

#paused?Boolean Also known as: paused

Returns:

  • (Boolean)


106
107
108
# File 'lib/listen/listener.rb', line 106

def paused?
  state == :paused
end

#processing?Boolean Also known as: listen?

processing means callbacks are called

Returns:

  • (Boolean)


102
103
104
# File 'lib/listen/listener.rb', line 102

def processing?
  state == :processing
end

#queue(type, change, path, options = {}) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/listen/listener.rb', line 148

def queue(type, change, path, options = {})
  fail "Invalid type: #{type.inspect}" unless [:dir, :file].include? type
  fail "Invalid change: #{change.inspect}" unless change.is_a?(Symbol)
  @queue << [type, change, path, options]

  @last_queue_event_time = Time.now.to_f
  _wakeup_wait_thread unless state == :paused

  return unless @tcp_mode == :broadcaster

  message = TCP::Message.new(type, change, path, options)
  registry[:broadcaster].async.broadcast(message.payload)
end

#silencerObject



162
163
164
# File 'lib/listen/listener.rb', line 162

def silencer
  @registry[:silencer]
end

#startObject Also known as: unpause

Starts processing events and starts adapters or resumes invoking callbacks if paused



84
85
86
# File 'lib/listen/listener.rb', line 84

def start
  transition :processing
end

#stopObject

Stops processing and terminates all actors



92
93
94
# File 'lib/listen/listener.rb', line 92

def stop
  transition :stopped
end

#sync(type) ⇒ Object



144
145
146
# File 'lib/listen/listener.rb', line 144

def sync(type)
  @registry[type]
end