yield et les générateurs

yield : l’arme fatale pour coder plus léger


1. Pourquoi yield existe-t-il ?

  • Économie de mémoire : au lieu de charger toute une liste en RAM, tu produis les éléments un par un, à la volée (lazy evaluation).
  • Performance sur les flux infinis : tu peux itérer sans connaître la fin (logs, streaming, data science, IA temps réel).
  • Composition élégante : enchaîne plusieurs générateurs sans perdre en lisibilité (pipeline → data-flow).

Exemple 1 : lire un gros fichier ligne par ligne

Sans yield ? Tu obtiens une grosse liste :

def lire_fichier(path):
    with open(path) as f:
        return [ligne.strip() for ligne in f]
    

Avec yield ? Tu produis les lignes à la demande :

def lire_fichier(path):
    with open(path) as f:
        for ligne in f:
            yield ligne.strip()
    

Test :

for ligne in lire_fichier("mon_fichier.txt"):
    print(ligne)
    

yield permet de créer un générateur, donc les lignes sont lues à la volée, très utile si le fichier est gros.
Résultat : même pour un fichier d’1 Go, ta mémoire reste zen 🧘.


Exemple 2 : générer une suite infinie

Un compteur sans fin :

def compteur_infini():
    n = 0
    while True:
        yield n
        n += 1
    

Test :

gen = compteur_infini()
next(gen)  # 0
next(gen)  # 1
next(gen)  # 2
    

Exemple 3 : filtrer en continu (ex : nombres pairs)

def nombres_pairs():
    n = 0
    while True:
        if n % 2 == 0:
            yield n
        n += 1
    

Utilisation :

gen = nombres_pairs()
for _ in range(5):
    print(next(gen))  # 0, 2, 4, 6, 8
    

Exemple 4. Enchaîner avec yield from

def alphabet():
    yield from 'abcde'    # délègue la production des valeurs
    

Utilisation :

for lettre in alphabet():
    print(lettre)
    

Cela affichera : :

a
b
c
d
e
    

Tu composes tes générateurs comme des briques LEGO, idéal pour du traitement de données par étapes (clean → transform → export).


Pourquoi tu dois t’y mettre (dès aujourd’hui)

  • Évite de saturer la mémoire : même avec des giga-octets de données.
  • Écrit du code réactif : tu peux traiter un flux dès le premier élément reçu.
  • Facilite les pipelines : compose plusieurs traitements avec yield from.

Cas d’usage : Prétraitement de données en IA

Tu construis un modèle de machine learning ? Les datasets sont souvent énormes, et tu veux prétraiter les données à la volée sans tout charger.

def charge_et_nettoie_lignes(fichier):
    with open(fichier) as f:
        for ligne in f:
            champs = ligne.strip().split(',')
            if len(champs) == 5 and champs[0] != '':
                yield champs
    

Utilisation dans un pipeline :

for exemple in charge_et_nettoie_lignes("donnees_brutes.csv"):
    x = vecteurize(exemple)
    y = labelise(exemple)
    modele.train(x, y)
    
  • ➡️ Pas de fichier en mémoire : parfait pour du streaming d’apprentissage en ligne.
  • ➡️ Combine avec TensorFlow ou PyTorch via des wrappers custom.

Et si tu veux aller encore plus loin, ajoute une fonction yield from pour combiner plusieurs étapes (tokenisation, normalisation, vectorisation…)


Cas d’usage : API qui stream des données

Tu construis une API REST qui doit envoyer une grosse réponse (logs, export CSV, etc.) ? Utilise yield pour streamer la réponse par morceaux, et garder la latence basse.

Exemple avec Flask :

from flask import Flask, Response

app = Flask(__name__)

def genere_gros_csv():
    yield "id,nom,score\n"
    for i in range(1_000_000):
        yield f"{i},nom_{i},{i*0.5}\n"

@app.route('/export')
def export_csv():
    return Response(genere_gros_csv(), mimetype='text/csv')

# Lance avec : app.run()
    

➡️ Ton API envoie les lignes du CSV **au fur et à mesure**. Le client commence à recevoir les données avant même que tout soit généré.

  • Avantage : pas besoin de stocker le CSV complet en mémoire.
  • Parfait pour : des exports massifs, des flux temps réel (logs, metrics), ou du streaming vidéo/audio chunké.

Tu peux aussi combiner ça avec des yield from pour chaîner plusieurs étapes de prétraitement avant d’envoyer les données 🚀.

Comprendre les générateurs Python en 5 minutes

1. Qu’est-ce qu’un générateur ?

Un générateur est un objet itérable qui produit ses valeurs à la demande plutôt que de les stocker toutes en mémoire. C’est une fonction ou une expression qui implémente le protocole d’itération : __iter__() et __next__() **sans** que tu aies à l’écrire toi-même (Python s’en charge dès qu’il voit un yield).


2. Pourquoi c’est stratégique ?

Besoin métier Comment le générateur y répond
Flux massif ou infini (logs, stream Kafka, data IoT) Produit élément par élément → RAM sous contrôle
Réactivité (API chunked, websockets) Pas d’attente que tout soit prêt → latence minime
Chaînage de traitements (ETL, ML pipeline) Assemble des briques “lego” → code clair, testable

Deux façons de créer un générateur

1. Avec yield

def lire_lignes(path):
    with open(path) as f:
        for ligne in f:
            yield ligne.strip()
    

2. Avec une expression génératrice

gen = (x * x for x in range(5))
    

4. Cycle de vie interne (savoir ce qui se passe vraiment)

  1. Initialisation : l’objet générateur est créé mais rien ne s’exécute.
  2. next() ou boucle for : exécution jusqu’au premier yield, valeur renvoyée.
  3. Suspension : l’état local est gelé (pile, variables locales).
  4. Reprise : au next suivant, on repart après le dernier yield.
  5. Fin : Python lève StopIteration à la fin de la fonction ou sur return.


Communication bidirectionnelle send, throw, close

def accumulate():
    total = 0
    while True:
        n = yield total
        total += n
    

Avec send(), tu peux envoyer une valeur dans le générateur :

gen = accumulate()
next(gen)   # 0
gen.send(5) # 5
gen.send(10) # 15
    
  • g.send(5) injecte une valeur au point d’arrêt.
  • g.throw(ValueError) propage une exception dans le générateur.
  • g.close() lève GeneratorExit pour libérer les ressources proprement.

Composabilité extrême : yield from

def flatten(listes):
    for sous_liste in listes:
        yield from sous_liste
    

Résultat : un pipeline lisible, sans boucle imbriquée ni complexité parasite.


Bonnes pratiques

  • Un générateur s'épuise : pas de seconde passe !
  • Capture les exceptions avec GeneratorExit si tu gères des ressources.
  • Combine avec itertools pour des traitements puissants.

Ce qu’il faut retenir

Les générateurs te permettent de scaler ton code sans exploser la mémoire. Parfaits pour le Big Data, les APIs chunked, ou l’IA temps réel.

Essaie-les maintenant : tu verras la différence dès tes premiers tests !


Exemple plus complexe : Générateur Fibonacci (infinie)

def fibonacci():
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b
    

Lire les 10 premiers nombres de Fibonacci :

fib = fibonacci()
for _ in range(10):
    print(next(fib))
    

Lire jusqu'à une certaine limite :

for nombre in fibonacci():
    if nombre > 100:
        break
    print(nombre)
    

Flux de données : Filtrer des valeurs en continu

def flux_de_nombres():
    n = 0
    while True:
        yield n
        n += 1

def nombres_pairs():
    for n in flux_de_nombres():
        if n % 2 == 0:
            yield n
    

Récupérer les 5 premiers nombres pairs :

pairs = nombres_pairs()
for _ in range(5):
    print(next(pairs))
    

Avantage : Ces générateurs ne créent jamais de grosses listes en mémoire. Ils produisent les valeurs à la volée, parfait pour des flux de données infinis ou très grands.

Cas d’usage : API asynchrone avec FastAPI

Besoin d’un **serveur ultra réactif** qui gère des milliers de connexions (WebSockets, API, streaming) ? Combine **yield + asynchrone** avec FastAPI.

Exemple : streamer un CSV en mode async

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def genere_csv_async():
    yield "id,nom,score\n" # génère la ligne d'entête du CSV
    for i in range(1_000_000):  # génère ensuite 1 million de lignes
        await asyncio.sleep(0)  # Laisse la main à l’event loop
        yield f"{i},nom_{i},{i*0.5}\n" # écriture d'une ligne

@app.get("/export_async")
async def export_csv_async():
    return StreamingResponse(genere_csv_async(), media_type="text/csv")   # envoie progressivement les données au client

# Lance avec : uvicorn nom_du_script:app --reload
    
  • ✅ Async-ready : le serveur reste non bloquant même si la génération est lente.
  • ✅ Scalabilité : parfait pour servir plusieurs exports lourds en parallèle, sans exploser le CPU ou la RAM.

💡 Combine avec des connexions **base de données asynchrones** (ex: asyncpg pour PostgreSQL) pour un pipeline complet, scalable et fluide.


Cas d’usage ultime : pipeline IA complet en mode asynchrone

⚙️ Imagine un système où tu ingères des données en continu (ex: capteurs, flux boursier), tu les prétraites, tu fais des prédictions IA, et tu streams ça en direct via **WebSocket** ou **HTTP chunked**. Tout ça sans bloquer ton serveur.

Exemple : ingestion + prétraitement + prédiction + streaming

from fastapi import FastAPI, WebSocket
import asyncio
import random

app = FastAPI()

# 🔹 Étape 1 : Ingestion de données (mockée)
async def source_de_donnees():
    while True:
        await asyncio.sleep(0.5)  # simule une latence d'arrivée des données
        yield random.uniform(10, 100)  # simule une donnée brute

# 🔸 Étape 2 : Prétraitement (normalisation ici)
async def pretraiter(donnees):
    async for valeur in donnees:
        yield (valeur - 10) / 90  # normalisation simple entre 0 et 1

# 🔸 Étape 3 : Prédiction IA (mock)
async def prediction_ia(donnees):
    async for x in donnees:
        # ici tu mets ton modèle : modele.predict(x)
        yield x * random.uniform(0.8, 1.2)  # mock prédiction

# 🔸 Étape 4 : API WebSocket
@app.websocket("/ws/pipeline")
async def pipeline_ws(websocket: WebSocket):
    await websocket.accept()
    flux = prediction_ia(pretraiter(source_de_donnees()))
    async for pred in flux:
        await websocket.send_text(f"Prédiction IA : {pred:.4f}")
    
  • Chaque étape **yield** les résultats au fur et à mesure → pas de saturation mémoire.
  • Tout est **async** → tu peux gérer plusieurs clients WebSocket en parallèle sans souci.
  • Remplace le mock IA par ton vrai modèle (TensorFlow, PyTorch, scikit-learn async-friendly).

➡️ Résultat : un pipeline **ultra fluide**, capable d’ingérer, traiter et prédire en direct, avec une latence minimale et sans jamais bloquer.

Ce programme démontre la puissance combinée de FastAPI, WebSocket et l’asynchrone en Python pour créer un pipeline de traitement de données en temps réel.
Il se divise en quatre étapes :

  • 🔹 Ingestion des données (simulées)
  • 🔸 Prétraitement des données
  • 🔸 Prédictions (IA simulée)
  • 🔸 Diffusion en temps réel via WebSocket

🔹 Étape 1 : Ingestion de données (mockée)

async def source_de_donnees():
    while True:
        await asyncio.sleep(0.5)
        yield random.uniform(10, 100)

Cette fonction simule un capteur en générant toutes les 0,5 secondes une valeur entre 10 et 100.

🔸 Étape 2 : Prétraitement (normalisation)

async def pretraiter(donnees):
    async for valeur in donnees:
        yield (valeur - 10) / 90

Chaque donnée brute est ramenée entre 0 et 1, une étape essentielle avant toute prédiction IA.

🔸 Étape 3 : Prédiction IA (simulée)

async def prediction_ia(donnees):
    async for x in donnees:
        yield x * random.uniform(0.8, 1.2)

Cette étape représente une IA, remplaçable par un modèle réel.

🔸 Étape 4 : API WebSocket pour diffusion des prédictions

@app.websocket("/ws/pipeline")
async def pipeline_ws(websocket: WebSocket):
    await websocket.accept()
    flux = prediction_ia(pretraiter(source_de_donnees()))
    async for pred in flux:
        await websocket.send_text(f"Prédiction IA : {pred:.4f}")

Un client connecté reçoit les prédictions en continu sans latence.

🖥️ Exemple de client WebSocket (JavaScript)

const ws = new WebSocket("ws://localhost:8000/ws/pipeline");
ws.onmessage = (event) => {
    console.log("Message reçu du serveur :", event.data);
};

🚀 Pourquoi ce modèle est excellent ?

  • ✅ Réactivité : communication instantanée
  • ✅ Scalabilité : gestion de connexions multiples sans blocage
  • ✅ Modularité : architecture claire et adaptable