Sep 01, 2019

Handling Concurrent Go Workflows

Wrapping your head around writing non-blocking, concurrent code can be hard at times, especially when you can't estimate the cost of building your application one way or the other. Using Go's broad spectrum of possibilities to execute synchronously-written and blocking code concurrently with little additional effort shows one of the reasons for Go's massive adoption in highly-available production environments and distributed systems.

In most cases, you wouldn't ever have to think about explicitly writing concurrent code because the tooling you build your application on will make sure to handle this for you, take net/http as an example: For every request, a goroutine is created under the hood to execute handling logic concurrently in a completely transparent way. That's one of the biggest benefits for the sake of simplicity and makes it playfully easy to read and understand Go code.

Sometimes, though, you'll find a task that requires to set up functions running in parallel, messaging queue to be handled, states to be synchronized and so on. To master these cases, I've collected a set of approaches and examples, let's get started with terminology and definitions first so we're all on the same page.

Goroutines

The essential building block of concurrently-executed code in Go is called a Goroutine. A goroutine is a simple function call prefixed by the statement go and represents a "lightweight thread managed by the Go runtime", which shares its memory with the main application, so you're still able to access variables in scope.

Here's an example:

package main

import "log"

func executeBlockingLogicSynchronously(done chan<- bool) {
  log.Println("Hey there")

  // We'll confirm that the logic is done, see "Channels"
  // for a description
  done <- true
}

func main() {
  // Create a channel to wait until logic is done
  done := make(chan bool)

  // Run the following code concurrently
  go func() {
    // This code will run immediately once this is invoked
    executeBlockingLogicSynchronously(done)
  }()

  // Even though the `executeBlockingLogicSynchronously`
  // function will block inside the goroutine,
  // the following code will be executed immediately
  // after starting the goroutine without blocking.

  // To be sure our logic has completed, we'll
  // wait until a value is sent in the done channel
  <-done
}

It's helpful to know that goroutines are extremely cheap on resources, so it's not a problem at all to have hundreds or thousands of them running at a time if it makes sense for your application.

Now that you know how to run application logic concurrently, you might ask yourself how to wait until a batch of running operations has finished. Since goroutines are non-blocking by nature, how do we wait? The most common primitive for this case is a WaitGroup. Used for situations like having to wait for concurrent workers to finish or multiple requests to be sent and completed, quite similar to JavaScript's Promise.all, WaitGroups expose a method to block until all tasks are done, here's an example:

package main

import (
  "fmt"
  "sync"
  "time"
)

/**
A simple worker function that will sleep for
a second and then decrease the WaitGroup
counter by one (using .Done())
*/
func worker(id int, wg *sync.WaitGroup) {
  fmt.Printf("Worker %d starting\n", id)

  time.Sleep(time.Second)
  fmt.Printf("Worker %d done\n", id)

  // Mark worker as done
  wg.Done()
}

func main() {

  wg := sync.WaitGroup{}

  for i := 1; i <= 5; i++ {
    // Increment counter of WaitGroup by one
    wg.Add(1)

    // Run worker in goroutine
    go worker(i, &wg)
  }

  // Block until count of running workers
  // equals 0 (when all workers executed .Done())
  wg.Wait()
}

Mutexes And Sync Helpers

Please note that the following descriptions and examples are merely for grasping the Go internals, for real-world use cases you should always resort to communicating using channels and synchronize through sequential channel reads or mutexes.

To access and mutate shared memory (variables, slices, etc.) from inside Goroutines, the sync package includes helpers to enforce some guarantees on how memory will be handled in parallelized and concurrent control flows.

For simple operations, the atomic package included in sync offers a variety of helper functions to provide secure memory access, for example using atomic.AddUint64 to increment a counter as seen in example below (extracted from the amazing Go by Example):

package main

import "fmt"
import "time"
import "sync/atomic"

func main() {
  var counter uint64

  // Start 50 Goroutines
  for i := 0; i < 50; i++ {
    go func() {
      // Run forever
      for {
        // Increase counter by 1 and sleep for a second
        atomic.AddUint64(&counter, 1)
        time.Sleep(time.Millisecond)
      }
    }()
  }

  // Wait for a second, then load final counter state
  // securely and print to output
  time.Sleep(time.Second)
  counterFinal := atomic.LoadUint64(&counter)
  fmt.Println("counter:", counterFinal)
}

Mutexes (mutual exclusion locks), on the other hand, are one of the core structures to block code from executing as long as the lock is unavailable. Once you call unlock again, the code flow will continue. This is helpful to control which function may access and mutate a specific data structure. A great usage example can be found here.

Channels

The previously explained techniques of sharing memory and executing code concurrently are one way of building applications in Go. The preferred way of thinking, as the following quote will show, is slightly different:

Share memory by communicating; don't communicate by sharing memory.

This is a core principle every program should follow in Go. But how do we communicate other than by accessing variables in a synchronized way using Mutexes and atomic operations? Channels are the answer! Data can be sent into and received from a channel, and you can use channels to block your application from executing a specific instruction before a certain condition is met.

package main

import "fmt"

func main() {
    // Create a new channel with the type it should
    // accept and return (in our case `string`)
    messages := make(chan string)

    // Send a value into our channel from inside a goroutine
    go func() { messages <- "ping" }()

    // Receive the first message to be sent, block until the
    // message is pushed to the channe
    msg := <-messages
    fmt.Println(msg)
}

You can think of channels as pipes used to communicate throughout your application. One goroutine could send errors that occurred during execution into an error channel which will safely persist the error in a data store to be processed later on. In cases where channels can be used, they should be used over manual synchronization of concurrent flows like Mutexes or atomic operations.


So know that we know how to run concurrent logic and how to communicate with other parts of the application, everything should be set for the next big project. To learn more about concurrency in Go, check out the section on it in Effective Go.

If you have any questions, suggestions or, put simply, any feedback to this post, please let me know on Twitter or by sending a mail 👍