00
:
00
:
00
:
00
•Corso SEO AI - Usa SEOEMAIL al checkout per il 30% di sconto

Pattern di Concorrenza

Introduzione ai pattern di concorrenza

Go offre goroutine e channel come primitive di concorrenza, ma la vera potenza sta nei pattern che possiamo costruire con questi strumenti. In questa guida esploreremo i pattern piu comuni e utili per scrivere codice concorrente robusto e performante.

Pipeline

Una pipeline e una serie di stadi collegati da channel, dove ogni stadio riceve dati, li elabora e li passa allo stadio successivo:

package main

import "fmt"

// Stadio 1: genera numeri
func genera(numeri ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range numeri {
            out <- n
        }
        close(out)
    }()
    return out
}

// Stadio 2: eleva al quadrato
func quadrato(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// Stadio 3: raddoppia
func raddoppia(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

func main() {
    // Componiamo la pipeline
    risultati := raddoppia(quadrato(genera(1, 2, 3, 4, 5)))

    for r := range risultati {
        fmt.Println(r) // 2, 8, 18, 32, 50
    }
}

Fan-Out / Fan-In

Il pattern fan-out distribuisce il lavoro su piu goroutine. Il pattern fan-in raccoglie i risultati in un singolo channel:

package main

import (
    "fmt"
    "sync"
)

func fanIn(canali ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    // Avvia una goroutine per ogni canale di input
    for _, ch := range canali {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    // Chiudi il canale quando tutti hanno finito
    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func elabora(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n // Operazione costosa
        }
        close(out)
    }()
    return out
}

func main() {
    input := genera(1, 2, 3, 4, 5, 6, 7, 8)

    // Fan-out: distribuiamo su 3 worker
    w1 := elabora(input)
    w2 := elabora(input)
    w3 := elabora(input)

    // Fan-in: raccogliamo i risultati
    for r := range fanIn(w1, w2, w3) {
        fmt.Println(r)
    }
}

func genera(numeri ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range numeri {
            out <- n
        }
        close(out)
    }()
    return out
}

Worker Pool

Un worker pool e un pattern dove un numero fisso di goroutine elabora i lavori da una coda condivisa:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Lavoro struct {
    ID    int
    Dati  string
}

type Risultato struct {
    LavoroID int
    Output   string
}

func worker(id int, lavori <-chan Lavoro, risultati chan<- Risultato, wg *sync.WaitGroup) {
    defer wg.Done()
    for l := range lavori {
        // Simula elaborazione
        time.Sleep(100 * time.Millisecond)
        risultati <- Risultato{
            LavoroID: l.ID,
            Output:   fmt.Sprintf("Worker %d ha elaborato: %s", id, l.Dati),
        }
    }
}

func main() {
    const numWorker = 3
    const numLavori = 10

    lavori := make(chan Lavoro, numLavori)
    risultati := make(chan Risultato, numLavori)

    // Avvia i worker
    var wg sync.WaitGroup
    for w := 1; w <= numWorker; w++ {
        wg.Add(1)
        go worker(w, lavori, risultati, &wg)
    }

    // Invia i lavori
    for i := 1; i <= numLavori; i++ {
        lavori <- Lavoro{ID: i, Dati: fmt.Sprintf("dato-%d", i)}
    }
    close(lavori)

    // Chiudi i risultati quando i worker finiscono
    go func() {
        wg.Wait()
        close(risultati)
    }()

    // Raccogli i risultati
    for r := range risultati {
        fmt.Printf("Lavoro #%d: %s\n", r.LavoroID, r.Output)
    }
}

Done Channel

Il pattern done channel permette di segnalare la cancellazione a goroutine in esecuzione:

package main

import (
    "fmt"
    "time"
)

func generaInfinito(done <-chan struct{}) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        i := 0
        for {
            select {
            case <-done:
                fmt.Println("Generatore fermato")
                return
            case out <- i:
                i++
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    numeri := generaInfinito(done)

    // Leggi solo 5 numeri
    for i := 0; i < 5; i++ {
        fmt.Println(<-numeri)
    }

    // Segnala la cancellazione
    close(done)
    time.Sleep(100 * time.Millisecond)
}

Context per la cancellazione

Il pacchetto context e il modo idiomatico in Go per gestire la cancellazione e i timeout:

package main

import (
    "context"
    "fmt"
    "time"
)

func operazioneLenta(ctx context.Context, id int) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        select {
        case <-time.After(2 * time.Second):
            out <- fmt.Sprintf("Operazione %d completata", id)
        case <-ctx.Done():
            fmt.Printf("Operazione %d cancellata: %v\n", id, ctx.Err())
        }
    }()
    return out
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    risultato := operazioneLenta(ctx, 1)

    select {
    case r := <-risultato:
        fmt.Println(r)
    case <-ctx.Done():
        fmt.Println("Timeout raggiunto:", ctx.Err())
    }
}

Rate Limiting

Il rate limiting controlla la frequenza delle operazioni usando time.Ticker:

package main

import (
    "fmt"
    "time"
)

func main() {
    richieste := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        richieste <- i
    }
    close(richieste)

    // Rate limiter: una richiesta ogni 200ms
    limiter := time.NewTicker(200 * time.Millisecond)
    defer limiter.Stop()

    for req := range richieste {
        <-limiter.C // Attendi il tick
        fmt.Printf("Richiesta %d elaborata alle %s\n",
            req, time.Now().Format("15:04:05.000"))
    }
}

Pattern Semaforo

Un semaforo limita il numero di goroutine concorrenti usando un channel con buffer:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    const maxConcorrenti = 3
    semaforo := make(chan struct{}, maxConcorrenti)
    var wg sync.WaitGroup

    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            semaforo <- struct{}{} // Acquisisci il semaforo
            defer func() { <-semaforo }() // Rilascia il semaforo

            fmt.Printf("Goroutine %d in esecuzione\n", id)
            time.Sleep(500 * time.Millisecond)
            fmt.Printf("Goroutine %d completata\n", id)
        }(i)
    }

    wg.Wait()
    fmt.Println("Tutte le goroutine completate")
}

Conclusione

I pattern di concorrenza in Go permettono di costruire sistemi complessi e performanti a partire da primitive semplici. La pipeline e il fan-in/fan-out sono ideali per l’elaborazione di dati, il worker pool e perfetto per gestire carichi di lavoro variabili, e il semaforo e il rate limiter controllano l’uso delle risorse. Combinando questi pattern con il pacchetto context, possiamo gestire in modo elegante la cancellazione e i timeout in applicazioni concorrenti.