Class: TgMq::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/tg_mq/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(itoken:, otoken:, channel:, logger: TgMq.config.logger) ⇒ Connection

Returns a new instance of Connection.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/tg_mq/connection.rb', line 10

def initialize(itoken:, otoken:, channel:, logger: TgMq.config.logger)
  @logger = TgMq.setup_logger(logger).tagged(self.class)

  @mx = Monitor.new

  @itoken = itoken
  @otoken = otoken
  @channel = channel

  @output_bot = ::Telegram::Bot::Client.new(@otoken, timeout: 5)
  @output_shaper = OutputShaper.new(chunk_size: 4080)

  @input_bot = ::Telegram::Bot::Client.new(@itoken, timeout: 5)
  @input_shaper = InputShaper.new(logger: logger)

  # Telegram Bot api alows 30 rps, we use only 20
  @delay = 60.0 / 25.0
  $r = 0

  # @squeue = SizedQueue.new(10)
  @threads = []
  @threads << start_sender(@output_shaper, @output_bot, @delay)
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/tg_mq/connection.rb', line 8

def logger
  @logger
end

Instance Method Details

#enqueue_message(data, &cb) ⇒ Object



34
35
36
37
# File 'lib/tg_mq/connection.rb', line 34

def enqueue_message(data, &cb)
  logger.debug("Enqueue message of #{data.size} bytes")
  @output_shaper.enqueue_data(data, &cb)
end

#start_sender(output_shaper, bot, delay) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/tg_mq/connection.rb', line 83

def start_sender(output_shaper, bot, delay)
  next_send_at = Time.at(0)

  Thread.new(output_shaper, bot) do |c, b|
    until @stopped
      break if @stopped
      next(sleep 1) if Time.now < next_send_at

      frame = c.deq(timeout: 1)
      break if @stopped
      next(sleep 1) if frame.nil?

      logger.debug "Sending frame [#{frame.id}] of #{frame.data.size} bytes..."

      response = tgretry { b.api.sendMessage(chat_id: @channel, text: frame.data) }
      sleep 0.1
      pin_response = tgretry { b.api.pinChatMessage(chat_id: @channel, message_id: response.message_id) }

      begin
        frame.callback&.call(pin_response)
      rescue StandardError
        nil
      end

      next_send_at = Time.now + delay * 2
    end
  end
end

#stop(timeout = 2) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/tg_mq/connection.rb', line 43

def stop(timeout = 2)
  return if @stopped

  @stopped = true
  @output_bot.stop
  @input_bot.stop

  if wait_for_termination(timeout)
    true
  else
    @threads.each(&:kill)
    false
  end
end

#stopped?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/tg_mq/connection.rb', line 39

def stopped?
  @stopped
end

#subscribe(&block) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/tg_mq/connection.rb', line 66

def subscribe(&block)
  @input_bot.listen do |message|
    case message
    when Telegram::Bot::Types::Message
      logger.debug(123_123)
      text = (message&.new_chat_title || message&.text || message&.pinned_message&.text).to_s
      logger.debug "Receive raw: [#{text.first(15)}...] of #{text.size} bytes"

      if (data = @input_shaper.pushdata(text))
        decoded = Base64.strict_decode64(data)
        logger.debug "Receive payload: of #{decoded.size} bytes"
        block.call(decoded)
      end
    end
  end
end

#tgretryObject



112
113
114
115
116
117
118
119
120
# File 'lib/tg_mq/connection.rb', line 112

def tgretry
  yield
rescue Telegram::Bot::Exceptions::ResponseError => e
  if e.error_code.to_i == 429
    logger.warn 'Retry packet send in 5 seconds...'
    sleep 5
    retry
  end
end

#wait_for_termination(timeout = 0) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/tg_mq/connection.rb', line 58

def wait_for_termination(timeout = 0)
  Timeouter.run(timeout) do |t|
    return true if @threads.all? do |thread|
      thread.join(t.left)
    end
  end
end