Method: Mongo::Collection::View::Readable#parallel_scan

Defined in:
lib/mongo/collection/view/readable.rb

#parallel_scan(cursor_count, options = {}) ⇒ Object

Since:

  • 2.0.0



702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
# File 'lib/mongo/collection/view/readable.rb', line 702

def parallel_scan(cursor_count, options = {})
  if options[:session]
    # The session would be overwritten by the one in +options+ later.
    session = client.get_session(@options)
  else
    session = nil
  end
  server = server_selector.select_server(cluster, nil, session)
  spec = {
    coll_name: collection.name,
    db_name: database.name,
    cursor_count: cursor_count,
    read_concern: read_concern,
    session: session,
  }.update(options)
  session = spec[:session]
  op = Operation::ParallelScan.new(spec)
  # Note that the context object shouldn't be reused for subsequent
  # GetMore operations.
  context = Operation::Context.new(client: client, session: session)
  result = op.execute(server, context: context)
  result.cursor_ids.map do |cursor_id|
    spec = {
      cursor_id: cursor_id,
      coll_name: collection.name,
      db_name: database.name,
      session: session,
      batch_size: batch_size,
      to_return: 0,
      # max_time_ms is not being passed here, I assume intentionally?
    }
    op = Operation::GetMore.new(spec)
    context = Operation::Context.new(
      client: client,
      session: session,
      connection_global_id: result.connection_global_id,
    )
    result = if server.load_balancer?
               # Connection will be checked in when cursor is drained.
               connection = server.pool.check_out(context: context)
               op.execute_with_connection(connection, context: context)
             else
               op.execute(server, context: context)
             end
    Cursor.new(self, result, server, session: session)
  end
end