¿Estás visitando desde Argentina?
Ingresá a Linware Argentina ⯈
Continuar en Linware Argentina ⯈
×
¿Qué estás buscando?
BUSCAR!
BLOG
3 patrones asíncronos esenciales para crear un servicio de Python
Publicada el 20/01/2023

Un servicio es una aplicación que se ejecuta para siempre, reaccionando a los eventos de manera diferente. Por ejemplo, un sitio web o un servicio web recibe solicitudes HTTP y envía respuestas en varios formatos (HTML, JSON, etc.). Para el proyecto de conectores-python , estamos extrayendo trabajo de Elasticsearch, obteniendo datos de alguna fuente para inyectar en Elasticsearch y volviendo a un estado inactivo.

La estructura de un servicio de este tipo que usa Python moderno se basa en iniciar una tarea global a través de una función asíncrona que se repite para siempre y permanece inactiva cuando no está activando algún trabajo. Es un patrón bastante sencillo y ofrece un modelo simple: cada vez que algo debe suceder en esa función principal, se difiere a otra tarea asíncrona y la vida continúa.

Hay algunas consideraciones para hacer que una aplicación de este tipo funcione bien. Queremos asegurarnos de hacer lo siguiente:

  • Sal de forma limpia y rápida
  • Evita la explosión de tareas
  • Aproveche el uso de su memoria

Proyectos como Trio , AnyIO y, hasta cierto punto, algunas funciones de Curio abordan estos temas de varias maneras, pero esta publicación de blog se enfoca en implementar soluciones usando Python estándar.

[Artículo relacionado: Perf8: Métricas de rendimiento para Python ]

Haz una salida elegante y rápida.

Consideremos el siguiente ejemplo, donde la tarea principal verifica si hay algún trabajo por hacer y luego duerme por un minuto llamando a asyncio.sleep() . Implementa un controlador de señal limpio para hacer algunas limpiezas y alternar el indicador de ejecución para una terminación elegante:

 
import asyncionimport osnimport signalnnnasync def main():n    loop = asyncio.get_running_loop()n    running = Truenn    def shutdown():n        nonlocal runningn        # cleanup workn        running = False  # will end the loopnn    for sig in (signal.SIGINT, signal.SIGTERM):n        loop.add_signal_handler(sig, shutdown)nn    while running:         n        await check_and_execute_work()n        await asyncio.sleep(60)nnasyncio.run(main())Lee mas
 
 

Pero esta implementación hará que su servicio no responda por hasta un minuto cuando maneje las señales de apagado si está esperando a que termine la suspensión. Esto puede ser una molestia menor, pero en algunos entornos de ejecución puede ser un problema mayor. También podría tener llamadas de suspensión simultáneas en otras tareas de su aplicación, lo que ralentizará una terminación limpia porque tendrá que esperar a que finalicen todas las tareas. 

Para evitar el problema, debemos poder cancelar inmediatamente todos los períodos de suspensión activos. Un patrón simple es construir una fábrica asyncio.sleep que realice un seguimiento de todas las tareas de sueño y pueda cancelarlas si es necesario. 

A continuación se muestra una clase que usamos en nuestro proyecto:

 
class CancellableSleeps:n    def __init__(self):n        self._sleeps = set()nn    async def sleep(self, delay, result=None, *, loop=None):n        async def _sleep(delay, result=None, *, loop=None):n            coro = asyncio.sleep(delay, result=result)n            task = asyncio.ensure_future(coro)n            self._sleeps.add(task)n            try:n                return await taskn            except asyncio.CancelledError:n                print("Sleep canceled")n                return resultn            finally:n                self._sleeps.remove(task)nn        await _sleep(delay, result=result, loop=loop)nn    def cancel(self):n        for task in self._sleeps:n            task.cancel()Lee mas
 
 

Esta clase realiza un seguimiento de todas las tareas asyncio.sleep() en ejecución y proporciona una forma de cancelarlas inmediatamente. Para usarlo en el servicio, debe crear una instancia de clase y hacer que todo el código use su método sleep() en lugar de asyncio.sleep() .

A continuación se muestra la versión modificada de la función main() que utiliza la clase CancellableSleeps :

 
async def main():n    sleeps = CancellableSleeps()n    loop = asyncio.get_running_loop()n    running = Truenn    def shutdown():n        nonlocal runningn        # cleanup workn        running = False  # will end the loopn        sleeps.cancel() # cancel all running sleep tasksnn    for sig in (signal.SIGINT, signal.SIGTERM):n        loop.add_signal_handler(sig, shutdown)nn    while running:n        await check_and_execute_work()n        await sleeps.sleep(60)Lee mas
 
 

La función de apagado () alternará la bandera de ejecución y cancelará todas las inactividades. Ese mismo objeto de suspensión podría pasar y ser utilizado por todas las tareas que necesitan ejecutar una tarea de suspensión.

Tenga en cuenta que este patrón de cancelación compartida se implementa en Trio si usa esa biblioteca (consulte la funcionalidad principal de Trio ).

Agrupa tus tareas

Una persona que llama tiene dos estrategias posibles para ejecutar una tarea llamando a una función asíncrona. Puede esperar su finalización con la palabra clave await , lo que hace que la persona que llama bloquee el resultado. La segunda opción es usar una estrategia de disparar y olvidar, donde la persona que llama agrega una tarea en el ciclo de eventos y continúa con su vida sin esperar a que termine la tarea.

El segundo patrón es útil cuando necesita concurrencia dentro de una función asíncrona; por ejemplo, cuando su código necesita enviar actualizaciones a Elasticsearch y sabe que puede hacerlo a través de muchas solicitudes simultáneas.

Una posible función que está haciendo esto podría ser:

 
async def send_data():n    tasks = [] n    while have_data: n        batch = await get_batch_of data()n        t = asyncio.create_task(send_data_to_es(batch))n        tasks.append(t)n        t.add_done_callaback(tasks.remove)nn    # make sure we wait for all tasks to endn    await asyncio.gather(*tasks)
 
 

Este ciclo permite que varias tareas de send_data_to_es() se ejecuten simultáneamente, pero si get_batch_data() es mucho más rápido que el tiempo de ejecución de send_data_to_es() , ¡acabas de crear una bomba de tareas! Las llamadas a Elasticsearch se acumularán y, finalmente, se agotarán. Esto también puede generar problemas de memoria si los lotes usan mucha memoria.

Para evitar este problema, las tareas de send_data_to_es() deben limitarse, y una forma genérica de hacerlo es crear un grupo de tareas con un límite superior de simultaneidad.

A continuación se muestra una clase que usamos para esta limitación:

 
class ConcurrentTasks:n    def __init__(self, max_concurrency=5, n                 results_callback=None):n        self.max_concurrency = max_concurrencyn    	self.tasks = []n        self.results_callback = results_callbackn    	self._task_over = asyncio.Event()nn    def __len__(self):n        return len(self.tasks)nn    def _callback(self, task, result_callback=None):n        self.tasks.remove(task)n    	self._task_over.set()n    	if task.exception():n            raise task.exception()n    	if result_callback is not None:n            result_callback(task.result())n    	# global callbackn    	if self.results_callback is not None:n            self.results_callback(task.result())nn    async def put(self, coroutine, result_callback=None):n    	# If self.tasks has reached its max sizen        # we wait for one task to finishn    	if len(self.tasks) >= self.max_concurrency:n            await self._task_over.wait()n            # rearmn            self._task_over.clear()n    	task = asyncio.create_task(coroutine())n    	self.tasks.append(task)n    	task.add_done_callback(n         functools.partial(self._callback,n                           result_callback=result_callback)n    	)n    	return tasknn    async def join(self):n    	await asyncio.gather(*self.tasks)Lee mas
 
 

Es un poco diferente de una cola porque no hay un orden secuencial para desencadenar la ejecución de tareas. La clase se asegurará de que no más de max_concurrency tareas puedan estar activas simultáneamente.
La clase ofrece dos métodos: put() , que se puede usar para agregar una función asíncrona para su ejecución, y join() , que se puede usar para esperar hasta que finalicen todas las tareas. put() bloqueará hasta que el grupo tenga espacio para una nueva tarea. A continuación se muestra la función send_data() modificada que la usa:

 
import functoolsnnasync def send_data():n    tasks = ConcurrentTasks(5) n    while have_data: n        batch = await get_batch_of data()n        await tasks.put(n            functools.partial(n                send_data_to_es, batch)n            )n        )nn    # make sure we wait for all tasks to endn    await tasks.join()Lee mas
 
 

Un par de bibliotecas proporcionan una función similar: aiometer , asyncio-pool .

Python 3.11, lanzado hace solo unos meses, también introdujo la clase TaskGroup que se puede usar como administrador de contexto para ejecutar tareas en paralelo. De la documentación:

 
async def main():n    async with asyncio.TaskGroup() as tg:n        task1 = tg.create_task(some_coro(...))n        task2 = tg.create_task(another_coro(...))n    print("Both tasks have completed now.")
 
 

Todavía no esta usando Python 3.11 en producción, ya que es bastante reciente, pero pasar a asyncio.TaskGroup en el futuro tendría mucho sentido.

Aproveche el uso de su memoria

La memoria es un recurso crítico para una aplicación de servicio. El servicio no puede crecer indefinidamente en la memoria cuando se trata de una gran cantidad de datos.

En connectors-python, estamos creando un servicio intermedio entre un servicio de terceros que proporciona algunos datos y Elasticsearch. Los datos se masajean en nuestra canalización y se transmiten. Ejecutar muchas tareas simultáneas que contienen datos para realizar este trabajo significa que el almacenamiento dinámico de Python y su tamaño de conjunto residente (RSS) de proceso crecerán.

En el ejemplo anterior, la función send_data() mantendrá en la memoria cinco lotes simultáneos de datos que planea enviar a Elasticsearch. Dependiendo de cómo se implementen get_batch_of_data y send_data_to_es , puede obtener hasta tres copias del mismo lote simultáneamente en el montón a menos que pase los mismos objetos. 

Para evitar la sobrecarga de memoria, debemos tener un límite superior para los datos que contiene el servicio. Python tiene una clase de cola que podemos usar para almacenar y limitar los elementos con los que estamos tratando, pero no tiene en cuenta la memoria: no sabe cuánta memoria usa cada elemento.

Podría usar sys.getsizeof() en objetos simples, pero no funcionará para objetos complejos compuestos por otros objetos. No averiguará recursivamente el tamaño de la memoria de cada objeto vinculado, y el valor que obtendrá estará desactivado.

 
>>> import sysn>>> sys.getsizeof(['A', 'B', []])n80n>>> sys.getsizeof(['A', 'B', ['C', 'D', 'E']])n80
 
 

Para obtener el tamaño real, puede usar la biblioteca Pympler :

 
>>> from pympler import asizeofn>>> asizeof.asizeof(['A', 'B', []])n248n>>> asizeof.asizeof(['A', 'B', ['C', 'D', 'E']])n448
 
 

Llamar a asizeof() tiene un costo de CPU pequeño, pero es extremadamente útil para medir la memoria real que está usando. Basándonos en esta biblioteca, hemos creado una clase MemQueue derivada de asyncio.Queue , que almacenará el tamaño de la memoria de los elementos y le permitirá definir un tamaño máximo en la memoria.
Esta clase obtiene el elemento de tamaño cuando se llama a put o put_nowait y proporciona un mecanismo de bloqueo similar que puede encontrar en asyncio.Queue .

 
class MemQueue(asyncio.Queue):n    def __init__(n        self, maxsize=0, maxmemsize=0, n        refresh_interval=1.0, refresh_timeout=60n    ):n    	super().__init__(maxsize)n    	self.maxmemsize = maxmemsizen    	self.refresh_interval = refresh_intervaln    	self.refresh_timeout = refresh_timeoutnn    async def put(self, item):n    	item_size = get_size(item)nn    	# specific locking code (see original class in GitHub)n         n    	super().put_nowait((item_size, item))nn    def put_nowait(self, item):n        item_size = get_size(item)nn    	if self.full(item_size):n            raise asyncio.QueueFulln    	super().put_nowait((item_size, item))Lee mas
 
 

El mecanismo de bloqueo se basa en la creación de un objeto Future que se pone en cola en un archivo collections.deque , y put permanecerá allí hasta que una llamada get elimine un elemento y desbloquee ese futuro. Puede encontrar la implementación completa de esta clase en el módulo utils.py del proyecto. Además de esto, se generará un tiempo de espera después de un tiempo si una opción de venta no puede tener éxito. Consulte los detalles de implementación en https://github.com/elastic/connectors-python/blob/main/connectors/utils.py .

La clase MemQueue es casi un reemplazo en el lugar para asyncio.Queue . En el siguiente ejemplo, se usa en un patrón consumidor-productor:

 
FINISHED = -1nn# queue of 5 MiB max, and 1000 items maxnq = MemQueue(maxsize=1000, maxmemsize=5*1024*1024)nnasync def pull_data():n    async for item in stream_of_data():n        await q.put(item)  # block until <5MiB or <1k itemsn    await q.put(FINISHED)nnasync def push_data():n    while True:n        Item_size, item = await q.get()n        if item == FINISHED:n            breakn        push_item(item)nn# running the producer and consumernasyncio.run(asyncio.gather(pull_data(), push_data()))Lee mas
 
¿Qué piensas?

Puede encontrar todas estas clases en acción en el proyecto connectors-python. Si los usa o tiene ideas de mejora, inicie una discusión en nuestro rastreador de problemas . Es un proyecto de código abierto y las contribuciones son bienvenidas.

Ir al Blog