Class: TgMq::Connection
- Inherits:
-
Object
- Object
- TgMq::Connection
- Defined in:
- lib/tg_mq/connection.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
- #enqueue_message(data, &cb) ⇒ Object
-
#initialize(itoken:, otoken:, channel:, logger: TgMq.config.logger) ⇒ Connection
constructor
A new instance of Connection.
- #start_sender(output_shaper, bot, delay) ⇒ Object
- #stop(timeout = 2) ⇒ Object
- #stopped? ⇒ Boolean
- #subscribe(&block) ⇒ Object
- #tgretry ⇒ Object
- #wait_for_termination(timeout = 0) ⇒ Object
Constructor Details
#initialize(itoken:, otoken:, channel:, logger: TgMq.config.logger) ⇒ Connection
Returns a new instance of Connection.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/tg_mq/connection.rb', line 10 def initialize(itoken:, otoken:, channel:, logger: TgMq.config.logger) @logger = TgMq.setup_logger(logger).tagged(self.class) @mx = Monitor.new @itoken = itoken @otoken = otoken @channel = channel @output_bot = ::Telegram::Bot::Client.new(@otoken, timeout: 5) @output_shaper = OutputShaper.new(chunk_size: 4080) @input_bot = ::Telegram::Bot::Client.new(@itoken, timeout: 5) @input_shaper = InputShaper.new(logger: logger) # Telegram Bot api alows 30 rps, we use only 20 @delay = 60.0 / 25.0 $r = 0 # @squeue = SizedQueue.new(10) @threads = [] @threads << start_sender(@output_shaper, @output_bot, @delay) end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/tg_mq/connection.rb', line 8 def logger @logger end |
Instance Method Details
#enqueue_message(data, &cb) ⇒ Object
34 35 36 37 |
# File 'lib/tg_mq/connection.rb', line 34 def (data, &cb) logger.debug("Enqueue message of #{data.size} bytes") @output_shaper.enqueue_data(data, &cb) end |
#start_sender(output_shaper, bot, delay) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/tg_mq/connection.rb', line 83 def start_sender(output_shaper, bot, delay) next_send_at = Time.at(0) Thread.new(output_shaper, bot) do |c, b| until @stopped break if @stopped next(sleep 1) if Time.now < next_send_at frame = c.deq(timeout: 1) break if @stopped next(sleep 1) if frame.nil? logger.debug "Sending frame [#{frame.id}] of #{frame.data.size} bytes..." response = tgretry { b.api.sendMessage(chat_id: @channel, text: frame.data) } sleep 0.1 pin_response = tgretry { b.api.pinChatMessage(chat_id: @channel, message_id: response.) } begin frame.callback&.call(pin_response) rescue StandardError nil end next_send_at = Time.now + delay * 2 end end end |
#stop(timeout = 2) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/tg_mq/connection.rb', line 43 def stop(timeout = 2) return if @stopped @stopped = true @output_bot.stop @input_bot.stop if wait_for_termination(timeout) true else @threads.each(&:kill) false end end |
#stopped? ⇒ Boolean
39 40 41 |
# File 'lib/tg_mq/connection.rb', line 39 def stopped? @stopped end |
#subscribe(&block) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/tg_mq/connection.rb', line 66 def subscribe(&block) @input_bot.listen do || case when Telegram::Bot::Types::Message logger.debug(123_123) text = (&.new_chat_title || &.text || &.&.text).to_s logger.debug "Receive raw: [#{text.first(15)}...] of #{text.size} bytes" if (data = @input_shaper.pushdata(text)) decoded = Base64.strict_decode64(data) logger.debug "Receive payload: of #{decoded.size} bytes" block.call(decoded) end end end end |
#tgretry ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/tg_mq/connection.rb', line 112 def tgretry yield rescue Telegram::Bot::Exceptions::ResponseError => e if e.error_code.to_i == 429 logger.warn 'Retry packet send in 5 seconds...' sleep 5 retry end end |
#wait_for_termination(timeout = 0) ⇒ Object
58 59 60 61 62 63 64 |
# File 'lib/tg_mq/connection.rb', line 58 def wait_for_termination(timeout = 0) Timeouter.run(timeout) do |t| return true if @threads.all? do |thread| thread.join(t.left) end end end |