Class: MTProto::AsyncClient
- Inherits:
-
Object
- Object
- MTProto::AsyncClient
- Defined in:
- lib/mtproto/async_client.rb
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
Instance Method Summary collapse
- #call(body, metadata: {}) ⇒ Object
-
#initialize(client) ⇒ AsyncClient
constructor
A new instance of AsyncClient.
- #shutdown ⇒ Object
- #use(middleware_class, **options) ⇒ Object
Constructor Details
#initialize(client) ⇒ AsyncClient
Returns a new instance of AsyncClient.
13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/mtproto/async_client.rb', line 13 def initialize(client) @client = client @queue = Async::RequestQueue.new @middleware_classes = [] @pending_requests = Concurrent::Hash.new @timeout_threads = Concurrent::Hash.new @receiver_thread = nil @running = Concurrent::AtomicBoolean.new(false) use(Async::Middleware::FloodWait) end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
11 12 13 |
# File 'lib/mtproto/async_client.rb', line 11 def client @client end |
Instance Method Details
#call(body, metadata: {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/mtproto/async_client.rb', line 30 def call(body, metadata: {}) ensure_receiver_running! request = Async::Request.new(body: body, metadata: ) @queue.call(key: request.key) do send_and_wrap = lambda do |retries = 0| base_future = send_request(request.body) retry_fn = lambda do send_and_wrap.call(retries + 1) end build_middleware_stack.call(request, base_future, retry_fn, retries) end send_and_wrap.call(0) end end |
#shutdown ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/mtproto/async_client.rb', line 50 def shutdown @running.make_false @client.disconnect! if @client.connected? if @receiver_thread @receiver_thread.join(2) @receiver_thread.kill if @receiver_thread.alive? @receiver_thread = nil end @timeout_threads.each_value(&:kill) @timeout_threads.clear @pending_requests.each_value do |future| future.reject(MTProto::Transport::ConnectionError.new('Client shutting down')) unless future.resolved? end @pending_requests.clear @queue.shutdown end |
#use(middleware_class, **options) ⇒ Object
25 26 27 28 |
# File 'lib/mtproto/async_client.rb', line 25 def use(middleware_class, **) @middleware_classes << [middleware_class, ] self end |