Class: TxCatcher::Catcher

Inherits:
Object
  • Object
show all
Defined in:
lib/txcatcher/catcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, socket_prefix: "ipc:///tmp/", init_threads: true) ⇒ Catcher

Returns a new instance of Catcher.



8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/txcatcher/catcher.rb', line 8

def initialize(name:, socket_prefix: "ipc:///tmp/", init_threads: true)
  @socket_prefix  = socket_prefix
  @name           = name
  @queue          = {}
  @sockets        = {}
  @zeromq_threads = []
  @queue_threads  = []

  ['rawtx', 'hashblock'].each do |channel|
    @queue_threads  << Thread.new { listen_to_action_queues(channel)   }
    @zeromq_threads << Thread.new { listen_to_zeromq_channels(channel) }
  end
end

Instance Attribute Details

#break_all_loopsObject

Returns the value of attribute break_all_loops.



5
6
7
# File 'lib/txcatcher/catcher.rb', line 5

def break_all_loops
  @break_all_loops
end

#nameObject (readonly)

Returns the value of attribute name.



6
7
8
# File 'lib/txcatcher/catcher.rb', line 6

def name
  @name
end

#queueObject (readonly)

Returns the value of attribute queue.



6
7
8
# File 'lib/txcatcher/catcher.rb', line 6

def queue
  @queue
end

#queue_threadsObject (readonly)

Returns the value of attribute queue_threads.



6
7
8
# File 'lib/txcatcher/catcher.rb', line 6

def queue_threads
  @queue_threads
end

#socketsObject (readonly)

Returns the value of attribute sockets.



6
7
8
# File 'lib/txcatcher/catcher.rb', line 6

def sockets
  @sockets
end

#zeromq_threadsObject (readonly)

Returns the value of attribute zeromq_threads.



6
7
8
# File 'lib/txcatcher/catcher.rb', line 6

def zeromq_threads
  @zeromq_threads
end

Instance Method Details

#close_all_connectionsObject



22
23
24
25
26
# File 'lib/txcatcher/catcher.rb', line 22

def close_all_connections
  @break_all_loops = true
  (@zeromq_threads + @queue_threads).each { |t| t.kill }
  @sockets.each { |k,v| v[:object].close }
end

#listen_to_action_queues(channel) ⇒ Object

Responsible for actions after the message from ZeroMQ is parsed, typically it’s writing data to DB through the models. We start it before we start listening to any messages from ZeroMQ.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/txcatcher/catcher.rb', line 31

def listen_to_action_queues(channel)
  @queue[channel] = Queue.new
  until @break_all_loops
    LOGGER.report "in #{channel} queue: #{@queue[channel].size}" if Config["logger"]["log_queue_info"]
    if @queue[channel].empty?
      sleep 1
    else
      begin
        @queue[channel].pop.call
      rescue Sequel::ValidationFailed => e
        LOGGER.report e, :warn, timestamp: true
      rescue Exception => e
        LOGGER.report e, :error, timestamp: true
      end
    end
  end
end

#listen_to_zeromq_channels(channel) ⇒ Object

Now we can start receiving messages from ZeroMQ. On every received message we call a handler method, which parses it appropriately (each ZeroMQ channel has its own handler method) and then adds additional tasks, such as writing to the DB, in the queue. They queue itself is handled in the thread created above.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/txcatcher/catcher.rb', line 54

def listen_to_zeromq_channels(channel)
  address = "#{@socket_prefix}#{@name}.#{channel}"
  LOGGER.report "Start listening on #{@name} #{channel}... (#{address})"
  context = ZMQ::Context.new
  socket  = context.socket(ZMQ::SUB)
  socket.setsockopt(ZMQ::SUBSCRIBE, channel)
  socket.connect(address)
  @sockets[channel] = { object: socket }
  until @break_all_loops do
    message = []
    socket.recv_strings(message)
    if message[1]
      message_hex = hexlify(message[1]).downcase
      @sockets[channel][:last_message] = message_hex
      send("handle_#{channel}", "#{message_hex}")
    end
  end
end