Canali MPSC
I canali sono un meccanismo fondamentale per la comunicazione tra thread in Rust. Il modulo std::sync::mpsc implementa canali multi-producer, single-consumer (MPSC), dove più thread possono inviare messaggi ma un solo thread li riceve.
Creare un Canale con channel()
La funzione mpsc::channel() restituisce una tupla (Sender<T>, Receiver<T>):
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messaggio = String::from("Ciao dal thread!");
tx.send(messaggio).unwrap();
// println!("{}", messaggio); // ERRORE: messaggio è stato spostato
});
let ricevuto = rx.recv().unwrap();
println!("Ricevuto: {}", ricevuto);
}
Quando si invia un valore con send(), la proprietà viene trasferita al ricevitore. Questo garantisce che il thread mittente non possa più accedere al dato dopo l’invio.
Metodi di Ricezione
Il Receiver offre diversi metodi per ricevere messaggi:
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
tx.send(42).unwrap();
});
// recv() - blocca fino alla ricezione
// let valore = rx.recv().unwrap();
// try_recv() - non bloccante, restituisce immediatamente
match rx.try_recv() {
Ok(v) => println!("Ricevuto: {}", v),
Err(mpsc::TryRecvError::Empty) => println!("Nessun messaggio disponibile"),
Err(mpsc::TryRecvError::Disconnected) => println!("Canale chiuso"),
}
// recv_timeout() - blocca per un tempo massimo
match rx.recv_timeout(Duration::from_secs(2)) {
Ok(v) => println!("Ricevuto: {}", v),
Err(mpsc::RecvTimeoutError::Timeout) => println!("Timeout scaduto"),
Err(mpsc::RecvTimeoutError::Disconnected) => println!("Canale chiuso"),
}
}
Canali Sincroni con sync_channel()
A differenza di channel() che è illimitato, sync_channel() crea un canale con buffer limitato. Il mittente si blocca quando il buffer è pieno:
use std::sync::mpsc;
use std::thread;
fn main() {
// Buffer di 2 messaggi
let (tx, rx) = mpsc::sync_channel(2);
thread::spawn(move || {
tx.send(1).unwrap();
println!("Inviato 1");
tx.send(2).unwrap();
println!("Inviato 2");
tx.send(3).unwrap(); // Si blocca finché il ricevitore non legge
println!("Inviato 3");
});
thread::sleep(std::time::Duration::from_secs(1));
for val in rx {
println!("Ricevuto: {}", val);
}
}
Con sync_channel(0) si crea un canale rendezvous: ogni invio si blocca fino a quando il ricevitore non è pronto.
Produttori Multipli con clone()
L’aspetto “multi-producer” dei canali MPSC si ottiene clonando il Sender:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
for id in 0..4 {
let tx_clone = tx.clone();
thread::spawn(move || {
let messaggio = format!("Messaggio dal worker {}", id);
tx_clone.send(messaggio).unwrap();
});
}
// Importante: eliminare il Sender originale
drop(tx);
// Iterare tutti i messaggi ricevuti
for msg in rx {
println!("{}", msg);
}
println!("Tutti i messaggi ricevuti!");
}
E fondamentale fare drop(tx) del sender originale, altrimenti il ciclo for msg in rx non terminerà mai, poiché il canale rimane aperto.
Iterare sul Receiver
Il Receiver implementa il trait Iterator, permettendo di iterare sui messaggi fino alla chiusura del canale:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messaggi = vec![
"Primo messaggio",
"Secondo messaggio",
"Terzo messaggio",
];
for msg in messaggi {
tx.send(msg.to_string()).unwrap();
thread::sleep(Duration::from_millis(200));
}
// tx viene droppato qui, chiudendo il canale
});
// iter() blocca in attesa del prossimo messaggio
for messaggio in rx.iter() {
println!("Ricevuto: {}", messaggio);
}
println!("Canale chiuso, iterazione terminata");
}
Pattern: Pipeline di Elaborazione
I canali sono perfetti per creare pipeline dove ogni stadio è un thread:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx_input, rx_input) = mpsc::channel();
let (tx_output, rx_output) = mpsc::channel();
// Stadio 1: genera dati
thread::spawn(move || {
for i in 1..=5 {
tx_input.send(i).unwrap();
}
});
// Stadio 2: elabora dati (raddoppia)
thread::spawn(move || {
for valore in rx_input {
tx_output.send(valore * 2).unwrap();
}
});
// Stadio 3: consuma i risultati
for risultato in rx_output {
println!("Risultato: {}", risultato);
}
}
Pattern: Fan-Out / Fan-In
Distribuire il lavoro tra più worker e raccogliere i risultati:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx_risultati, rx_risultati) = mpsc::channel();
let dati = vec![10, 20, 30, 40, 50];
thread::scope(|s| {
for &dato in &dati {
let tx = tx_risultati.clone();
s.spawn(move || {
// Simula un'elaborazione costosa
let risultato = dato * dato;
tx.send((dato, risultato)).unwrap();
});
}
drop(tx_risultati); // Chiude il sender originale
// Raccoglie tutti i risultati
for (input, output) in rx_risultati {
println!("{} -> {}", input, output);
}
});
}
Gestione degli Errori
E importante gestire correttamente gli errori di invio e ricezione:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel::<String>();
// Se il ricevitore è stato droppato, send fallisce
drop(rx);
match tx.send("test".to_string()) {
Ok(_) => println!("Inviato con successo"),
Err(e) => println!("Errore di invio: {}", e),
}
// Se il mittente è stato droppato, recv fallisce
let (tx2, rx2) = mpsc::channel::<i32>();
drop(tx2);
match rx2.recv() {
Ok(v) => println!("Ricevuto: {}", v),
Err(e) => println!("Errore di ricezione: {}", e),
}
}
Conclusione
I canali MPSC sono lo strumento principale per la comunicazione tra thread in Rust. Con channel() per canali illimitati e sync_channel() per canali con buffer, è possibile implementare pattern come pipeline, fan-out/fan-in e architetture message-passing. La regola fondamentale da ricordare è che il Sender può essere clonato per avere più produttori, ma esiste un solo Receiver. Quando tutti i sender vengono droppati, il canale si chiude e il ricevitore sa che non arriveranno più messaggi.