00
:
00
:
00
:
00
Corso SEO AI - Usa SEOEMAIL al checkout per il 30% di sconto

Canali MPSC

I canali sono un meccanismo fondamentale per la comunicazione tra thread in Rust. Il modulo std::sync::mpsc implementa canali multi-producer, single-consumer (MPSC), dove più thread possono inviare messaggi ma un solo thread li riceve.

Creare un Canale con channel()

La funzione mpsc::channel() restituisce una tupla (Sender<T>, Receiver<T>):

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let messaggio = String::from("Ciao dal thread!");
        tx.send(messaggio).unwrap();
        // println!("{}", messaggio); // ERRORE: messaggio è stato spostato
    });

    let ricevuto = rx.recv().unwrap();
    println!("Ricevuto: {}", ricevuto);
}

Quando si invia un valore con send(), la proprietà viene trasferita al ricevitore. Questo garantisce che il thread mittente non possa più accedere al dato dopo l’invio.

Metodi di Ricezione

Il Receiver offre diversi metodi per ricevere messaggi:

use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500));
        tx.send(42).unwrap();
    });

    // recv() - blocca fino alla ricezione
    // let valore = rx.recv().unwrap();

    // try_recv() - non bloccante, restituisce immediatamente
    match rx.try_recv() {
        Ok(v) => println!("Ricevuto: {}", v),
        Err(mpsc::TryRecvError::Empty) => println!("Nessun messaggio disponibile"),
        Err(mpsc::TryRecvError::Disconnected) => println!("Canale chiuso"),
    }

    // recv_timeout() - blocca per un tempo massimo
    match rx.recv_timeout(Duration::from_secs(2)) {
        Ok(v) => println!("Ricevuto: {}", v),
        Err(mpsc::RecvTimeoutError::Timeout) => println!("Timeout scaduto"),
        Err(mpsc::RecvTimeoutError::Disconnected) => println!("Canale chiuso"),
    }
}

Canali Sincroni con sync_channel()

A differenza di channel() che è illimitato, sync_channel() crea un canale con buffer limitato. Il mittente si blocca quando il buffer è pieno:

use std::sync::mpsc;
use std::thread;

fn main() {
    // Buffer di 2 messaggi
    let (tx, rx) = mpsc::sync_channel(2);

    thread::spawn(move || {
        tx.send(1).unwrap();
        println!("Inviato 1");
        tx.send(2).unwrap();
        println!("Inviato 2");
        tx.send(3).unwrap(); // Si blocca finché il ricevitore non legge
        println!("Inviato 3");
    });

    thread::sleep(std::time::Duration::from_secs(1));

    for val in rx {
        println!("Ricevuto: {}", val);
    }
}

Con sync_channel(0) si crea un canale rendezvous: ogni invio si blocca fino a quando il ricevitore non è pronto.

Produttori Multipli con clone()

L’aspetto “multi-producer” dei canali MPSC si ottiene clonando il Sender:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    for id in 0..4 {
        let tx_clone = tx.clone();
        thread::spawn(move || {
            let messaggio = format!("Messaggio dal worker {}", id);
            tx_clone.send(messaggio).unwrap();
        });
    }

    // Importante: eliminare il Sender originale
    drop(tx);

    // Iterare tutti i messaggi ricevuti
    for msg in rx {
        println!("{}", msg);
    }

    println!("Tutti i messaggi ricevuti!");
}

E fondamentale fare drop(tx) del sender originale, altrimenti il ciclo for msg in rx non terminerà mai, poiché il canale rimane aperto.

Iterare sul Receiver

Il Receiver implementa il trait Iterator, permettendo di iterare sui messaggi fino alla chiusura del canale:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let messaggi = vec![
            "Primo messaggio",
            "Secondo messaggio",
            "Terzo messaggio",
        ];

        for msg in messaggi {
            tx.send(msg.to_string()).unwrap();
            thread::sleep(Duration::from_millis(200));
        }
        // tx viene droppato qui, chiudendo il canale
    });

    // iter() blocca in attesa del prossimo messaggio
    for messaggio in rx.iter() {
        println!("Ricevuto: {}", messaggio);
    }

    println!("Canale chiuso, iterazione terminata");
}

Pattern: Pipeline di Elaborazione

I canali sono perfetti per creare pipeline dove ogni stadio è un thread:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx_input, rx_input) = mpsc::channel();
    let (tx_output, rx_output) = mpsc::channel();

    // Stadio 1: genera dati
    thread::spawn(move || {
        for i in 1..=5 {
            tx_input.send(i).unwrap();
        }
    });

    // Stadio 2: elabora dati (raddoppia)
    thread::spawn(move || {
        for valore in rx_input {
            tx_output.send(valore * 2).unwrap();
        }
    });

    // Stadio 3: consuma i risultati
    for risultato in rx_output {
        println!("Risultato: {}", risultato);
    }
}

Pattern: Fan-Out / Fan-In

Distribuire il lavoro tra più worker e raccogliere i risultati:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx_risultati, rx_risultati) = mpsc::channel();

    let dati = vec![10, 20, 30, 40, 50];

    thread::scope(|s| {
        for &dato in &dati {
            let tx = tx_risultati.clone();
            s.spawn(move || {
                // Simula un'elaborazione costosa
                let risultato = dato * dato;
                tx.send((dato, risultato)).unwrap();
            });
        }

        drop(tx_risultati); // Chiude il sender originale

        // Raccoglie tutti i risultati
        for (input, output) in rx_risultati {
            println!("{} -> {}", input, output);
        }
    });
}

Gestione degli Errori

E importante gestire correttamente gli errori di invio e ricezione:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<String>();

    // Se il ricevitore è stato droppato, send fallisce
    drop(rx);
    match tx.send("test".to_string()) {
        Ok(_) => println!("Inviato con successo"),
        Err(e) => println!("Errore di invio: {}", e),
    }

    // Se il mittente è stato droppato, recv fallisce
    let (tx2, rx2) = mpsc::channel::<i32>();
    drop(tx2);
    match rx2.recv() {
        Ok(v) => println!("Ricevuto: {}", v),
        Err(e) => println!("Errore di ricezione: {}", e),
    }
}

Conclusione

I canali MPSC sono lo strumento principale per la comunicazione tra thread in Rust. Con channel() per canali illimitati e sync_channel() per canali con buffer, è possibile implementare pattern come pipeline, fan-out/fan-in e architetture message-passing. La regola fondamentale da ricordare è che il Sender può essere clonato per avere più produttori, ma esiste un solo Receiver. Quando tutti i sender vengono droppati, il canale si chiude e il ricevitore sa che non arriveranno più messaggi.