Class: TxCatcher::Catcher
- Inherits:
-
Object
- Object
- TxCatcher::Catcher
- Defined in:
- lib/txcatcher/catcher.rb
Instance Attribute Summary collapse
-
#break_all_loops ⇒ Object
Returns the value of attribute break_all_loops.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#queue_threads ⇒ Object
readonly
Returns the value of attribute queue_threads.
-
#sockets ⇒ Object
readonly
Returns the value of attribute sockets.
-
#zeromq_threads ⇒ Object
readonly
Returns the value of attribute zeromq_threads.
Instance Method Summary collapse
- #close_all_connections ⇒ Object
-
#initialize(name:, socket_prefix: "ipc:///tmp/", init_threads: true) ⇒ Catcher
constructor
A new instance of Catcher.
-
#listen_to_action_queues(channel) ⇒ Object
Responsible for actions after the message from ZeroMQ is parsed, typically it’s writing data to DB through the models.
-
#listen_to_zeromq_channels(channel) ⇒ Object
Now we can start receiving messages from ZeroMQ.
Constructor Details
#initialize(name:, socket_prefix: "ipc:///tmp/", init_threads: true) ⇒ Catcher
Returns a new instance of Catcher.
8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/txcatcher/catcher.rb', line 8 def initialize(name:, socket_prefix: "ipc:///tmp/", init_threads: true) @socket_prefix = socket_prefix @name = name @queue = {} @sockets = {} @zeromq_threads = [] @queue_threads = [] ['rawtx', 'hashblock'].each do |channel| @queue_threads << Thread.new { listen_to_action_queues(channel) } @zeromq_threads << Thread.new { listen_to_zeromq_channels(channel) } end end |
Instance Attribute Details
#break_all_loops ⇒ Object
Returns the value of attribute break_all_loops.
5 6 7 |
# File 'lib/txcatcher/catcher.rb', line 5 def break_all_loops @break_all_loops end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
6 7 8 |
# File 'lib/txcatcher/catcher.rb', line 6 def name @name end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
6 7 8 |
# File 'lib/txcatcher/catcher.rb', line 6 def queue @queue end |
#queue_threads ⇒ Object (readonly)
Returns the value of attribute queue_threads.
6 7 8 |
# File 'lib/txcatcher/catcher.rb', line 6 def queue_threads @queue_threads end |
#sockets ⇒ Object (readonly)
Returns the value of attribute sockets.
6 7 8 |
# File 'lib/txcatcher/catcher.rb', line 6 def sockets @sockets end |
#zeromq_threads ⇒ Object (readonly)
Returns the value of attribute zeromq_threads.
6 7 8 |
# File 'lib/txcatcher/catcher.rb', line 6 def zeromq_threads @zeromq_threads end |
Instance Method Details
#close_all_connections ⇒ Object
22 23 24 25 26 |
# File 'lib/txcatcher/catcher.rb', line 22 def close_all_connections @break_all_loops = true (@zeromq_threads + @queue_threads).each { |t| t.kill } @sockets.each { |k,v| v[:object].close } end |
#listen_to_action_queues(channel) ⇒ Object
Responsible for actions after the message from ZeroMQ is parsed, typically it’s writing data to DB through the models. We start it before we start listening to any messages from ZeroMQ.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/txcatcher/catcher.rb', line 31 def listen_to_action_queues(channel) @queue[channel] = Queue.new until @break_all_loops LOGGER.report "in #{channel} queue: #{@queue[channel].size}" if Config["logger"]["log_queue_info"] if @queue[channel].empty? sleep 1 else begin @queue[channel].pop.call rescue Sequel::ValidationFailed => e LOGGER.report e, :warn, timestamp: true rescue Exception => e LOGGER.report e, :error, timestamp: true end end end end |
#listen_to_zeromq_channels(channel) ⇒ Object
Now we can start receiving messages from ZeroMQ. On every received message we call a handler method, which parses it appropriately (each ZeroMQ channel has its own handler method) and then adds additional tasks, such as writing to the DB, in the queue. They queue itself is handled in the thread created above.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/txcatcher/catcher.rb', line 54 def listen_to_zeromq_channels(channel) address = "#{@socket_prefix}#{@name}.#{channel}" LOGGER.report "Start listening on #{@name} #{channel}... (#{address})" context = ZMQ::Context.new socket = context.socket(ZMQ::SUB) socket.setsockopt(ZMQ::SUBSCRIBE, channel) socket.connect(address) @sockets[channel] = { object: socket } until @break_all_loops do = [] socket.recv_strings() if [1] = hexlify([1]).downcase @sockets[channel][:last_message] = send("handle_#{channel}", "#{message_hex}") end end end |