Pattern di Concorrenza
Introduzione ai pattern di concorrenza
Go offre goroutine e channel come primitive di concorrenza, ma la vera potenza sta nei pattern che possiamo costruire con questi strumenti. In questa guida esploreremo i pattern piu comuni e utili per scrivere codice concorrente robusto e performante.
Pipeline
Una pipeline e una serie di stadi collegati da channel, dove ogni stadio riceve dati, li elabora e li passa allo stadio successivo:
package main
import "fmt"
// Stadio 1: genera numeri
func genera(numeri ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range numeri {
out <- n
}
close(out)
}()
return out
}
// Stadio 2: eleva al quadrato
func quadrato(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// Stadio 3: raddoppia
func raddoppia(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
func main() {
// Componiamo la pipeline
risultati := raddoppia(quadrato(genera(1, 2, 3, 4, 5)))
for r := range risultati {
fmt.Println(r) // 2, 8, 18, 32, 50
}
}
Fan-Out / Fan-In
Il pattern fan-out distribuisce il lavoro su piu goroutine. Il pattern fan-in raccoglie i risultati in un singolo channel:
package main
import (
"fmt"
"sync"
)
func fanIn(canali ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
// Avvia una goroutine per ogni canale di input
for _, ch := range canali {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
// Chiudi il canale quando tutti hanno finito
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func elabora(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n // Operazione costosa
}
close(out)
}()
return out
}
func main() {
input := genera(1, 2, 3, 4, 5, 6, 7, 8)
// Fan-out: distribuiamo su 3 worker
w1 := elabora(input)
w2 := elabora(input)
w3 := elabora(input)
// Fan-in: raccogliamo i risultati
for r := range fanIn(w1, w2, w3) {
fmt.Println(r)
}
}
func genera(numeri ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range numeri {
out <- n
}
close(out)
}()
return out
}
Worker Pool
Un worker pool e un pattern dove un numero fisso di goroutine elabora i lavori da una coda condivisa:
package main
import (
"fmt"
"sync"
"time"
)
type Lavoro struct {
ID int
Dati string
}
type Risultato struct {
LavoroID int
Output string
}
func worker(id int, lavori <-chan Lavoro, risultati chan<- Risultato, wg *sync.WaitGroup) {
defer wg.Done()
for l := range lavori {
// Simula elaborazione
time.Sleep(100 * time.Millisecond)
risultati <- Risultato{
LavoroID: l.ID,
Output: fmt.Sprintf("Worker %d ha elaborato: %s", id, l.Dati),
}
}
}
func main() {
const numWorker = 3
const numLavori = 10
lavori := make(chan Lavoro, numLavori)
risultati := make(chan Risultato, numLavori)
// Avvia i worker
var wg sync.WaitGroup
for w := 1; w <= numWorker; w++ {
wg.Add(1)
go worker(w, lavori, risultati, &wg)
}
// Invia i lavori
for i := 1; i <= numLavori; i++ {
lavori <- Lavoro{ID: i, Dati: fmt.Sprintf("dato-%d", i)}
}
close(lavori)
// Chiudi i risultati quando i worker finiscono
go func() {
wg.Wait()
close(risultati)
}()
// Raccogli i risultati
for r := range risultati {
fmt.Printf("Lavoro #%d: %s\n", r.LavoroID, r.Output)
}
}
Done Channel
Il pattern done channel permette di segnalare la cancellazione a goroutine in esecuzione:
package main
import (
"fmt"
"time"
)
func generaInfinito(done <-chan struct{}) <-chan int {
out := make(chan int)
go func() {
defer close(out)
i := 0
for {
select {
case <-done:
fmt.Println("Generatore fermato")
return
case out <- i:
i++
}
}
}()
return out
}
func main() {
done := make(chan struct{})
numeri := generaInfinito(done)
// Leggi solo 5 numeri
for i := 0; i < 5; i++ {
fmt.Println(<-numeri)
}
// Segnala la cancellazione
close(done)
time.Sleep(100 * time.Millisecond)
}
Context per la cancellazione
Il pacchetto context e il modo idiomatico in Go per gestire la cancellazione e i timeout:
package main
import (
"context"
"fmt"
"time"
)
func operazioneLenta(ctx context.Context, id int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
select {
case <-time.After(2 * time.Second):
out <- fmt.Sprintf("Operazione %d completata", id)
case <-ctx.Done():
fmt.Printf("Operazione %d cancellata: %v\n", id, ctx.Err())
}
}()
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
risultato := operazioneLenta(ctx, 1)
select {
case r := <-risultato:
fmt.Println(r)
case <-ctx.Done():
fmt.Println("Timeout raggiunto:", ctx.Err())
}
}
Rate Limiting
Il rate limiting controlla la frequenza delle operazioni usando time.Ticker:
package main
import (
"fmt"
"time"
)
func main() {
richieste := make(chan int, 10)
for i := 1; i <= 10; i++ {
richieste <- i
}
close(richieste)
// Rate limiter: una richiesta ogni 200ms
limiter := time.NewTicker(200 * time.Millisecond)
defer limiter.Stop()
for req := range richieste {
<-limiter.C // Attendi il tick
fmt.Printf("Richiesta %d elaborata alle %s\n",
req, time.Now().Format("15:04:05.000"))
}
}
Pattern Semaforo
Un semaforo limita il numero di goroutine concorrenti usando un channel con buffer:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
const maxConcorrenti = 3
semaforo := make(chan struct{}, maxConcorrenti)
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
semaforo <- struct{}{} // Acquisisci il semaforo
defer func() { <-semaforo }() // Rilascia il semaforo
fmt.Printf("Goroutine %d in esecuzione\n", id)
time.Sleep(500 * time.Millisecond)
fmt.Printf("Goroutine %d completata\n", id)
}(i)
}
wg.Wait()
fmt.Println("Tutte le goroutine completate")
}
Conclusione
I pattern di concorrenza in Go permettono di costruire sistemi complessi e performanti a partire da primitive semplici. La pipeline e il fan-in/fan-out sono ideali per l’elaborazione di dati, il worker pool e perfetto per gestire carichi di lavoro variabili, e il semaforo e il rate limiter controllano l’uso delle risorse. Combinando questi pattern con il pacchetto context, possiamo gestire in modo elegante la cancellazione e i timeout in applicazioni concorrenti.