la programmation asynchrone avec Python

La programmation asynchrone est l'un des aspcts qui évolue le plus dans le langage python.
Il s'agit d'une fonctionnalité très importante qui permet de faire gagner de la performance sans pour autant devoir trop complexifier les algorithmes.
Python 3.4 avait apporté un nouveau module nommé asyncio qui a permis de mettre en place une meilleurs évolution que celles qui existaient déjà.
Python 3.5 a transformé cet essai en implémentant dans le coeur du langage ces principes par l'apport de deux mots clés async et avait.


asyncio est le module standard en Python pour la programmation asynchrone. Il permet d’écrire du code concurrent sans threads ni processus, en utilisant une boucle d’événements et des coroutines.


1. Boucle d’événements (event loop)

  • C’est le cœur d’asyncio.
  • Elle attend que des événements soient prêts (ex : fin d’une requête réseau, expiration d’un délai).
  • Elle réveille les coroutines en attente quand c’est le bon moment.

2. Coroutines (async def)

  • Fonctions spéciales qui peuvent suspendre leur exécution (avec await) et reprendre plus tard.
  • Permettent de gérer plusieurs tâches en parallèle sans bloquer le programme.

3. await

  • Suspend l’exécution d’une coroutine jusqu’à ce qu’une autre coroutine ou une opération asynchrone soit terminée.
  • Permet à la boucle d’événements de faire autre chose pendant ce temps.

🏗️ Architecture type asyncio

  1. Tu définis des coroutines avec async def.
  2. À l’intérieur, tu utilises await pour attendre des opérations non bloquantes.
  3. La boucle d’événements (via asyncio.run()) orchestre tout ça.

🎯 Exemple basique avec asyncio

import asyncio

async def dire_bonjour():
    print("Salut")
    await asyncio.sleep(1) # Non bloquant, attend 1 seconde
    print("Comment ça va ?")

asyncio.run(dire_bonjour())

Sortie :

Salut
(1 seconde d’attente)
Comment ça va ?

4. asyncio.gather

  • C’est une fonction d’asyncio qui permet de lancer plusieurs coroutines en parallèle et d'attendre qu'elles se terminent toutes.
  • Imagine plusieurs tâches à effectuer (appels API, lectures de fichiers, etc.) : au lieu de les exécuter les unes après les autres, on peux les lancer simultanément et récupérer leurs résultats en une seule attente.

🚀 Exemple avec plusieurs coroutines en parallèle

import asyncio

async def tache(nom, duree):
    print(f"{nom} commence")
    await asyncio.sleep(duree)
    print(f"{nom} finit après {duree} secondes")

async def main():
    await asyncio.gather(
        tache("Tâche 1", 2),
        tache("Tâche 2", 3)
    )

asyncio.run(main())

Sortie :

Tâche 1 commence
Tâche 2 commence
Tâche 1 finit après 2 secondes
Tâche 2 finit après 3 secondes

Les deux tâches commencent ensemble, mais finissent à des moments différents, sans attente bloquante.

⚠️ Attention aux exceptions

  1. Par défaut, si une coroutine échoue (lève une exception), asyncio.gather annule les autres tâches et relance l’exception.
  2. Il faut passer l’option return_exceptions=True pour qu’il renvoie les exceptions dans les résultats plutôt que d’arrêter le tout :
résultats = await asyncio.gather(
    tâche(1),
    tâche_qui_echoue(),
    return_exceptions=True
)

🚀 Pourquoi c’est puissant ?

  1. ✅ Idéal quand tu veux **attendre plusieurs actions simultanées** (ex. télécharger plusieurs fichiers, interroger plusieurs services).
  2. ✅ Beaucoup plus efficace que des appels séquentiels : tu économises le temps d’attente cumulé.

💥 asyncio.gather avec gestion des exceptions

import asyncio
async def tâche_succès(num):
    print(f"✅ Tâche {num} commence")
    await asyncio.sleep(1)
    print(f"✅ Tâche {num} terminée")
    return f"Résultat {num}"
async def tâche_erreur():
    print("❌ Tâche qui va échouer commence")
    await asyncio.sleep(0.5)
    raise ValueError("Une erreur s’est produite dans la tâche_erreur")
async def main():
    try:
        résultats = await asyncio.gather(
            tâche_succès(1),
            tâche_erreur(),
            tâche_succès(2),
            return_exceptions=True # 👈 clé pour récupérer les exceptions au lieu de planter
        )
        print("\n--- Résultats récupérés ---")
        for i, r in enumerate(résultats):
            if isinstance(r, Exception):
                print(f"Résultat {i} : ⚠️ Exception : {r}")
            else:
                print(f"Résultat {i} : ✅ {r}")
    except Exception as e:
        print(f"🚨 Une exception non gérée a été levée : {e}")

asyncio.run(main())

🔎 Que se passe-t-il ? :

  1. Les 3 tâches sont lancées en parallèle.
  2. L’une d’elles (tâche_erreur) échoue volontairement.
  3. Grâce à return_exceptions=True, l’exception est capturée comme un résultat dans la liste au lieu d’arrêter tout le programme.
  4. Tu peux ensuite parcourir les résultats et tester s’ils sont des exceptions ou des valeurs normales.

📌 Pourquoi c’est capital ?

  1. Sans return_exceptions=True, asyncio.gather annulerait toutes les tâches dès qu’une exception survient,
    ce qui peut être destructeur si tu attends plusieurs résultats indépendants.
  2. Avec return_exceptions=True, tu maîtrises ton programme : tu sais quelles tâches ont échoué, lesquelles ont réussi, et tu peux réagir intelligemment (relancer, alerter, ignorer…).

🚨 Exemple : annuler toutes les tâches si l’une échoue

Ici, on n’utilise pas return_exceptions=True : on laisse l’exception se propager, et on intercepte l’erreur pour annuler les autres tâches en cours.

import asyncio

async def tâche_lente(num):
    print(f"🟢 Tâche {num} commence")
    try:
        await asyncio.sleep(5)
        print(f"✅ Tâche {num} terminée")
    except asyncio.CancelledError:
        print(f"🚫 Tâche {num} annulée")
        raise

async def tâche_erreur():
    print("❌ Tâche qui va échouer commence")
    await asyncio.sleep(1)
    raise RuntimeError("Erreur fatale dans la tâche_erreur")

async def main():
    # Créer les coroutines sous forme de Tasks pour pouvoir les annuler
    task1 = asyncio.create_task(tâche_lente(1))
    task2 = asyncio.create_task(tâche_erreur())
    task3 = asyncio.create_task(tâche_lente(2))

    tasks = [task1, task2, task3]

    try:
        await asyncio.gather(*tasks)
    except Exception as e:
        print(f"🚨 Exception capturée : {e}")
        print("🛑 Annulation des autres tâches en cours...")
        for t in tasks:
             if not t.done():
                 t.cancel()
        # Optionnel : attendre que toutes les tâches annulées terminent proprement
        await asyncio.gather(*tasks, return_exceptions=True)
        print("✅ Annulation terminée")

asyncio.run(main())
✅ Ce qu’il faut comprendre :
  1. On crée explicitement des Task avec asyncio.create_task() pour avoir un handle qu’on peut annuler.
  2. Si tache_erreur échoue, asyncio.gather propage l’exception.
  3. Dans le except, on annule toutes les autres tâches non terminées avec t.cancel().
  4. On attend ensuite la fin de toutes les tâches (y compris celles annulées) pour éviter des warnings comme “Task was destroyed but it is pending”.
  5. Dans chaque tâche lente, on intercepte asyncio.CancelledError pour afficher un message clair lorsqu’elle est annulée.

🔥 Exemples de programmes asynchrones

1. Téléchargement de plusieurs pages web en parallèle (web scraping)

Imagine que tu dois télécharger plusieurs pages d’un site web. Avec du code synchrone, tu dois attendre chaque téléchargement l’un après l’autre. Avec l’asynchrone, tu peux les lancer en parallèle.

import asyncio
import aiohttp

async def telecharger_page(session, url):
    async with session.get(url) as response:
        contenu = await response.text()
        print(f"Téléchargé {url} ({len(contenu)} caractères)")

async def main():
    urls = [
        'https://example.com',
        'https://www.python.org',
        'https://www.asyncio.org'
    ]
    async with aiohttp.ClientSession() as session:
        taches = [telecharger_page(session, url) for url in urls]
        await asyncio.gather(*taches)

asyncio.run(main())

Ce que ça fait : Lance plusieurs requêtes HTTP en même temps sans bloquer.

2. Serveur TCP asynchrone (chat server)

Créer un petit serveur TCP capable de gérer plusieurs clients en parallèle :

import asyncio

clients = []

async def gerer_client(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"Client connecté: {addr}")
    clients.append(writer)

    while True:
        data = await reader.readline()
        if not data:
            break
        message = data.decode()
        print(f"Reçu de {addr}: {message.strip()}")

        for client in clients:
            client.write(data)
            await client.drain()

    print(f"Client déconnecté: {addr}")
    clients.remove(writer)
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(gerer_client, '127.0.0.1', 8888)
    async with server:
        await server.serve_forever()

asyncio.run(main())

3. Simulation d'un système de capteurs (lectures périodiques non bloquantes)

Simuler plusieurs capteurs qui envoient des données à intervalles différents :

import asyncio
import random

async def capteur(nom, intervalle):
    while True:
        valeur = random.randint(0, 100)
        print(f"[{nom}] Valeur mesurée: {valeur}")
        await asyncio.sleep(intervalle)

async def main():
    await asyncio.gather(
        capteur("Température", 2),
        capteur("Humidité", 3),
        capteur("Pression", 5)
    )

asyncio.run(main())

Ce que ça fait : Simule trois capteurs qui envoient des mesures en parallèle sans se bloquer mutuellement.

🔥 Améliorations : gestion des erreurs et limitation des connexions

1. Gérer les erreurs réseau (timeouts, refus de connexion)

Avec aiohttp, tu peux capturer des exceptions comme aiohttp.ClientError ou asyncio.TimeoutError pour gérer les erreurs réseau :

import asyncio
import aiohttp

async def telecharger_page(session, url):
    try:
        async with session.get(url, timeout=10) as response:
            contenu = await response.text()
            print(f"Téléchargé {url} ({len(contenu)} caractères)")
    except aiohttp.ClientError as e:
        print(f"Erreur réseau avec {url}: {e}")
    except asyncio.TimeoutError:
        print(f"Timeout dépassé pour {url}")

async def main():
    urls = [
        'https://example.com',
        'https://www.python.org',
        'https://site-qui-existe-pas.com'
    ]
    async with aiohttp.ClientSession() as session:
        taches = [telecharger_page(session, url) for url in urls]
        await asyncio.gather(*taches)

asyncio.run(main())

2. Limiter le nombre de connexions simultanées (sémaphore)

Utiliser un sémaphore (asyncio.Semaphore) pour éviter de lancer trop de connexions en parallèle :

import asyncio
import aiohttp

semaphore = asyncio.Semaphore(2) # Limite à 2 connexions simultanées

async def telecharger_page(session, url):
    async with semaphore: # Attend si le quota est atteint
        try:
            async with session.get(url, timeout=10) as response:
                contenu = await response.text()
                print(f"Téléchargé {url} ({len(contenu)} caractères)")
        except aiohttp.ClientError as e:
            print(f"Erreur réseau avec {url}: {e}")
        except asyncio.TimeoutError:
            print(f"Timeout dépassé pour {url}")

async def main():
    urls = [
        'https://example.com',
        'https://www.python.org',
        'https://www.asyncio.org',
        'https://www.wikipedia.org'
    ]
    async with aiohttp.ClientSession() as session:
        taches = [telecharger_page(session, url) for url in urls]
        await asyncio.gather(*taches)

asyncio.run(main())

Ce que ça fait : Il y a au maximum 2 connexions simultanées. Les autres attendent leur tour.



Cas d’usage : IA temps réel via WebSocket

Imagine un modèle IA qui prédit en continu (prix boursiers, anomalies, recommandations…).
Avec FastAPI et WebSocket, tu streams ces prédictions en direct vers le client 👇.

Exemple : prédictions en continu (mockées)

from fastapi import FastAPI, WebSocket
import asyncio
import random

app = FastAPI()

@app.websocket("/ws/predictions")
# Crée une route WebSocket accessible via l’URL /ws/predictions. async def predictions_ws(websocket: WebSocket): await websocket.accept() # Accepte la demande de connexion entrante du client. Sans cet appel, la connexion ne sera pas établie while True: prediction = random.uniform(0, 1) # ⚠️ Mock d'une prédiction await websocket.send_text(f"Prediction: {prediction:.4f}") await asyncio.sleep(1) # Simule une latence entre prédictions
  • ➡️ Le client reçoit les prédictions sans attendre que tout soit calculé.
  • ➡️ Ultra fluide : le modèle continue de produire, même si le client n’a pas fini de recevoir les prédictions précédentes.

💡 Remplace la partie random.uniform() par un vrai appel à ton modèle (TensorFlow, PyTorch, scikit-learn). Combine avec un générateur si tu veux traiter les données d’entrée à la volée avant prédiction.


🛠️ Quand utiliser asyncio ?

  • Serveurs réseau (HTTP, WebSocket, TCP...)
  • Scraping web avec des connexions multiples
  • Systèmes temps réel (capteurs, robots...)
  • Automatisation de tâches qui nécessitent d’attendre des ressources externes

🆚 Pourquoi pas des threads/processus ?

  • Les threads/processus consomment plus de mémoire et sont plus lents à créer.
  • L’asynchrone est plus léger car tout tourne dans un seul thread.
  • Mais : asyncio est parfait pour les tâches IO. Pour du calcul intensif, mieux vaut utiliser des processus.

🔍 Conclusion

L’asynchrone en Python avec asyncio est un super outil pour gérer plusieurs tâches IO en parallèle sans bloquer ton programme. Grâce aux coroutines, à la boucle d’événements et à await, tu peux écrire du code concurrent de manière fluide et efficace, sans les lourdeurs des threads ou processus.

Voici ce qu’on retient :

  • async/await rend ton code lisible et non bloquant.
  • Les générateurs asynchrones et async for facilitent la gestion de flux de données non bloquants.
  • aiohttp permet de faire du scraping web asynchrone en toute simplicité.
  • La gestion fine des erreurs réseau et la limitation des connexions simultanées assurent robustesse et respect des ressources.