Потоки

Источник: Lib/asyncio/streams.py


Потоки - это высокоуровневые примитивы async/await-ready для работы с сетевыми соединениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортов.

Вот пример эхо-клиента TCP, написанного с использованием потоков asyncio:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

См. также раздел Examples ниже.

Функции потока

Для создания потоков и работы с ними можно использовать следующие функции верхнего уровня asyncio:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)

Установите сетевое соединение и верните пару объектов (reader, writer).

Возвращаемые объекты reader и writer являются экземплярами классов StreamReader и StreamWriter.

limit определяет предельный размер буфера, используемый возвращаемым экземпляром StreamReader. По умолчанию limit установлен на 64 килобайта.

Остальные аргументы передаются непосредственно в loop.create_connection().

Примечание

Аргумент sock передает право собственности на сокет созданному StreamWriter. Чтобы закрыть сокет, вызовите его метод close().

Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout.

Изменено в версии 3.8: Добавлены параметры happy_eyeballs_delay и interleave.

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Запустите сервер сокетов.

Обратный вызов client_connected_cb вызывается каждый раз, когда устанавливается новое клиентское соединение. В качестве двух аргументов он получает пару (reader, writer), экземпляры классов StreamReader и StreamWriter.

client_connected_cb может быть обычным вызываемым объектом или coroutine function; если это корутинная функция, она будет автоматически запланирована как Task.

limit определяет предельный размер буфера, используемый возвращаемым экземпляром StreamReader. По умолчанию limit установлен на 64 килобайта.

Остальные аргументы передаются непосредственно в loop.create_server().

Примечание

Аргумент sock передает право собственности на сокет созданному серверу. Чтобы закрыть сокет, вызовите метод close() сервера.

Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving.

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.

Сокеты Unix

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Установите соединение с сокетом Unix и верните пару (reader, writer).

Аналогичен open_connection(), но работает с сокетами Unix.

См. также документацию по loop.create_unix_connection().

Примечание

Аргумент sock передает право собственности на сокет созданному StreamWriter. Чтобы закрыть сокет, вызовите его метод close().

Availability: Unix.

Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout. Параметр path теперь может быть значением path-like object

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Запустите сервер сокетов Unix.

Аналогично start_server(), но работает с сокетами Unix.

См. также документацию по loop.create_unix_server().

Примечание

Аргумент sock передает право собственности на сокет созданному серверу. Чтобы закрыть сокет, вызовите метод close() сервера.

Availability: Unix.

Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving. Параметр path теперь может быть path-like object.

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.

StreamReader

class asyncio.StreamReader

Представляет объект читателя, который предоставляет API для чтения данных из потока ввода-вывода. Как asynchronous iterable, объект поддерживает оператор async for.

Не рекомендуется инстанцировать объекты StreamReader напрямую; вместо этого используйте open_connection() и start_server().

feed_eof()

Подтвердите EOF.

coroutine read(n=-1)

Считывание до n байт из потока.

Если значение n не указано или установлено в -1, читайте до EOF, затем верните все прочитанные bytes. Если было получено EOF и внутренний буфер пуст, верните пустой объект bytes.

Если n равно 0, немедленно верните пустой объект bytes.

Если n положительно, верните не более n доступных bytes, как только во внутреннем буфере появится хотя бы 1 байт. Если EOF получен до того, как прочитан хоть один байт, возвращается пустой объект bytes.

coroutine readline()

Прочитайте одну строку, где «строка» - это последовательность байтов, заканчивающаяся \n.

Если получен EOF и \n не найден, метод возвращает частично прочитанные данные.

Если получен EOF и внутренний буфер пуст, верните пустой объект bytes.

coroutine readexactly(n)

Прочитайте ровно n байт.

Вызовите IncompleteReadError, если EOF достигнут до того, как n будет прочитано. Используйте атрибут IncompleteReadError.partial, чтобы получить частично прочитанные данные.

coroutine readuntil(separator=b'\n')

Считывайте данные из потока до тех пор, пока не будет найден сепаратор.

В случае успеха данные и разделитель будут удалены из внутреннего буфера (потреблены). Возвращаемые данные будут содержать разделитель в конце.

Если объем считанных данных превышает заданный лимит потока, возникает исключение LimitOverrunError, данные остаются во внутреннем буфере и могут быть считаны снова.

Если EOF будет достигнут до того, как будет найден полный разделитель, то возникнет исключение IncompleteReadError, и внутренний буфер будет сброшен. Атрибут IncompleteReadError.partial может содержать часть разделителя.

Разделитель* также может быть кортежем разделителей. В этом случае возвращаемым значением будет кратчайший возможный разделитель, имеющий в качестве суффикса любой разделитель. Для целей LimitOverrunError кратчайшим возможным разделителем считается тот, который совпал.

Added in version 3.5.2.

Изменено в версии 3.13: Параметр separator теперь может быть tuple разделителей.

at_eof()

Возвращает True, если буфер пуст и был вызван feed_eof().

StreamWriter

class asyncio.StreamWriter

Представляет объект writer, который предоставляет API для записи данных в поток IO.

Не рекомендуется инстанцировать объекты StreamWriter напрямую; вместо этого используйте open_connection() и start_server().

write(data)

Метод пытается немедленно записать данные в базовый сокет. Если это не удается, данные ставятся в очередь во внутренний буфер записи до тех пор, пока их нельзя будет отправить.

Этот метод следует использовать вместе с методом drain():

stream.write(data)
await stream.drain()
writelines(data)

Метод немедленно записывает список (или любое итерируемое число) байтов в базовый сокет. Если это не удается, данные ставятся в очередь во внутреннем буфере записи до тех пор, пока их не удастся отправить.

Этот метод следует использовать вместе с методом drain():

stream.writelines(lines)
await stream.drain()
close()

Метод закрывает поток и базовый сокет.

Этот метод следует использовать, хотя и не обязательно, вместе с методом wait_closed():

stream.close()
await stream.wait_closed()
can_write_eof()

Возвращает True, если базовый транспорт поддерживает метод write_eof(), False в противном случае.

write_eof()

Закройте конец потока записи после того, как буферизованные данные записи будут удалены.

transport

Возвращает базовый транспорт asyncio.

get_extra_info(name, default=None)

Доступ к необязательной информации о транспорте; подробности см. в разделе BaseTransport.get_extra_info().

coroutine drain()

Дождитесь подходящего момента для возобновления записи в поток. Пример:

writer.write(data)
await writer.drain()

Это метод управления потоком, который взаимодействует с базовым буфером записи IO. Когда размер буфера достигает высокой отметки, drain() блокирует его до тех пор, пока размер буфера не уменьшится до низкой отметки и запись не будет возобновлена. Когда ждать нечего, drain() возвращается немедленно.

coroutine start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Обновление существующего потокового соединения до TLS.

Параметры:

  • sslcontext: сконфигурированный экземпляр SSLContext.

  • server_hostname: задает или переопределяет имя хоста, по которому будет сопоставляться сертификат целевого сервера.

  • ssl_handshake_timeout - время в секундах, в течение которого необходимо дождаться завершения рукопожатия TLS перед прерыванием соединения. 60.0 секунд, если None (по умолчанию).

  • ssl_shutdown_timeout - время в секундах, в течение которого необходимо дождаться завершения отключения SSL перед прерыванием соединения. 30.0 секунд, если None (по умолчанию).

Added in version 3.11.

Изменено в версии 3.12: Добавлен параметр ssl_shutdown_timeout.

is_closing()

Возвращает True, если поток закрыт или находится в процессе закрытия.

Added in version 3.7.

coroutine wait_closed()

Подождите, пока поток не будет закрыт.

Должен вызываться после close(), чтобы дождаться закрытия базового соединения, гарантируя, что все данные были удалены перед, например, выходом из программы.

Added in version 3.7.

Примеры

Клиент TCP-эхо, использующий потоки

Клиент TCP-эхо, использующий функцию asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

См.также

В примере TCP echo client protocol используется низкоуровневый метод loop.create_connection().

Эхо-сервер TCP с использованием потоков

TCP-эхо-сервер с помощью функции asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

См.также

В примере TCP echo server protocol используется метод loop.create_server().

Получение заголовков HTTP

Простой пример запроса HTTP-заголовков URL, переданного в командной строке:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Использование:

python example.py http://example.com/path/page.html

или с помощью HTTPS:

python example.py https://example.com/path/page.html

Зарегистрируйте открытый сокет для ожидания данных с помощью потоков

Корутин, ожидающий, пока сокет получит данные с помощью функции open_connection():

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

См.также

В примере register an open socket to wait for data using a protocol используется низкоуровневый протокол и метод loop.create_connection().

В примере watch a file descriptor for read events используется низкоуровневый метод loop.add_reader() для просмотра дескриптора файла.