Sep 21, 2019

Parallelized Batch Processing in Go

Hey there 👋 I'm building CodeTrail, which helps engineering teams document and share knowledge close to the codebase with no friction. If you're still using Notion, Confluence or Google Docs to document your engineering work, give it a try and let me know what you think!

Imagine you're building a worker service that will receive a huge list of file metadata to be processed, one by one. Each object represents a file, which you want to upload from one source to the other. Because it won't be a problem, you decide that it makes sense to process subsets of the original array in parallel, decreasing the overall execution time and enabling de-duplication, for example checking if a file was already handled when it's added more than one time and ignoring it in this case.

single item

All these ideas are used in large-scale processing systems, that can benefit from parallelized workloads. The approach we're trying to build requires thorough fine-tuning at times since you'll interface with third-party services that may react differently when being accessed more than once per run per worker, which may result in rate-limiting measures kicking in and performance hits on services that aren't able to scale with your expectations.

All things considered, though, tasks like these are a perfect fit for digging deeper into the ecosystem Go provides around tasks related to synchronization, parallelism and concurrency utilities, mainly the sync package. Let's start by breaking down the initial data set into equally-sized batches.

batches

Since we're going to process a specific data structure, we should define it first! Our FileItem will contain the following fields:

type FileItem struct {
	URL      string `json:"url"`
	FileName string `json:"fileName"`
	Size     int64  `json:"size"`
	MimeType string `json:"mimeType"`
}

Once we defined how each item will look like, we need to calculate, how many batches we will need to process in total.

// This is just an example source of our items, basically
// any slice containing FileItems should suffice here
items := make([]FileItem, 100)

// This is the max. number of items each batch will contain
const maxBatchSize int = 25

// `skip` will be our cursor to remember how
// many items we already processed before
skip := 0

// `batchAmount` is the important metric, it will contain
// the exact number of batches needed to process all items
// depending on the maxBatchSize, where the last batch
// may contain less than 25 (maxBatchSize) elements
filesAmount := len(items)
batchAmount := int(math.Ceil(float64(filesAmount / maxBatchSize)))

Now, that we know the number of batches going to be processed, we can iterate over each batch and create a subset of exactly the items, we'll work on for this batch.

for i := 0; i <= batchAmount; i++ {
  // Bounds are used to access
  // the complete items slice later on
  lowerBound := skip
  upperBound := skip + maxBatchSize

  // In case we're exceeding the total number
  // of files/items, we'll cut down the
  // upper bound to the last element, so we're
  // not overextending the access range
  if upperBound > filesAmount {
    upperBound = filesAmount
  }

  // It's just creating a slice from
  // an existing slice now :)
  batchItems := items[lowerBound:upperBound]

  // Increase `skip` value for next iteration
  skip += maxBatchSize
}

It is vital that all values declared in the example above are correct since any deviation will result in unexpected behaviour during actual runs. Please double-check that lower and upper bounds, skip values and batch item slices contain the expected content.

If we put everything we defined above together now, we should get something similar to the following snippet.

package main

import (
	"math"
)

type FileItem struct {
	URL      string `json:"url"`
	FileName string `json:"fileName"`
	Size     int64  `json:"size"`
	MimeType string `json:"mimeType"`
}

func main() {
	items := make([]FileItem, 100)

	const maxBatchSize int = 25
	skip := 0
	filesAmount := len(items)
	batchAmount := int(math.Ceil(float64(filesAmount / maxBatchSize)))

	for i := 0; i <= batchAmount; i++ {
		lowerBound := skip
		upperBound := skip + maxBatchSize

		if upperBound > filesAmount {
			upperBound = filesAmount
		}

		batchItems := items[lowerBound:upperBound]

		skip += maxBatchSize
	}
}

This is basically everything to set up a really simple batching system. Chances are, that libraries for this exact use case already exist, but not having generics makes building these utilities a bit harder since we'd have to assert types in places we don't have to with the system we just constructed.

Now, we can get to the fun part! From now on, we'll only edit the inner contents of our batch loop, the first thing we'll add is another loop to iterate over the actual items of each batch.

for idx := range batchItems {
  log.Println(idx)
}

That was pretty simple, right? What we wanted to do, though, is to parallelize this process, so let's go ahead and tweak this logic. We need to create a WaitGroup prior to iterating over our batch items since we want to process every item inside of a goroutine later on, multi-threaded and in parallel. For a condensed rundown of WaitGroups in Go, I'll shamelessly plug my recent post about concurrency patterns in Go.

// Everything we need to do is define our
// WaitGroup and increment the counter to
// the number of batch items we're going
// to process
var itemProcessingGroup sync.WaitGroup
itemProcessingGroup.Add(len(batchItems))

for idx := range batchItems {
  log.Println(idx)
}

// After launching our goroutines
// we'll just wait until everything's
// done again and we can continue
itemProcessingGroup.Wait()

The last step to add now is to define a goroutine to be run for each item. Constructing and running goroutines inside for loops can become a source of many problems at first, let's see how we'd go ahead with something like this.

for idx := range batchItems {
  // Pass current node pointer into goroutine func
  // to prevent capturing the loop variable
  go func(item *FileItem, idx int) {
    // Process item...
  }(&batchItems[idx], idx)
}

It can be tempting to pass in the current element of our loop, but this would yield in the goroutine always receiving the first slice element, how come? In our for loop, we're now defining a goroutine closure, which is immediately called but constructed only for the first run and re-used afterward. This forces us to pass in all scoped (everything we need to access from the loop's current iteration) data we need to process in the goroutine as actual parameters. In our case, we'll supply a pointer to the current batch item and the current index.

Taken from Go's FAQ: This behavior of the language, not defining a new variable for each iteration, may have been a mistake in retrospect. It may be addressed in a later version but, for compatibility, cannot change in Go version 1.

If we were to run our code now, we'd wait a long time for everything to complete. Infinity to be precise, the reason for this being, we didn't tell our WaitGroup that the current item was done processing. How do we do this? Let's check our goroutine code once again

go func(item *FileItem, idx int) {
  // No matter the outcome, make sure to
  // notify the WaitGroup of the completion
  defer itemProcessingGroup.Done()

  // Process item...
}(&batchItems[idx], idx)

So that's that. From setting up our batching logic to parallelizing the execution flow, we've done all the preparations. This is what our current code could look like

package main

import (
	"math"
	"sync"
)

type FileItem struct {
	URL      string `json:"url"`
	FileName string `json:"fileName"`
	Size     int64  `json:"size"`
	MimeType string `json:"mimeType"`
}

func main() {
	items := make([]FileItem, 100)

	const maxBatchSize int = 25
	skip := 0
	filesAmount := len(items)
	batchAmount := int(math.Ceil(float64(filesAmount / maxBatchSize)))

	for i := 0; i <= batchAmount; i++ {
		lowerBound := skip
		upperBound := skip + maxBatchSize

		if upperBound > filesAmount {
			upperBound = filesAmount
		}

		batchItems := items[lowerBound:upperBound]

		skip += maxBatchSize

		var itemProcessingGroup sync.WaitGroup
		itemProcessingGroup.Add(len(batchItems))

		for idx := range batchItems {
			go func(currentItem *FileItem, currentIdx int) {
				defer itemProcessingGroup.Done()

				// Process item...
			}(&batchItems[idx], idx)
		}

		itemProcessingGroup.Wait()
	}
}

Did I say we completed all preparations? All except one, good old error handling. This is where it gets a bit more... complicated? At least it might seem like it but bear with me here. By using basic Go primitives including Channels we'll set up the logic necessary to collect all errors occurring in our goroutines and after each batch check if we reached an error threshold.

In our case, we'll exit the complete batching process once we get more than one error, but we could even differentiate between error types, repeat the batching, or change some lines of code to exit after the first error occurs, all of this just by creating two channels: All errors we want to handle should be sent through the processingErrorChan, and after a batch is done (immediately after the itemProcessingGroup.Wait() stops blocking), we'll send a message to the processingDoneChan to deconstruct the error-handling setup for this batch.

// We'll add this snippet after increasing the
// skip value, right before creating the WaitGroup
// and iterating over our batch items

// The following slice will carry all errors
// that occur during batch processing
processingErrors := make([]error, 0)

// Create the two unbuffered channels we listed
// in the description above
processingErrorChan := make(chan error)
processingDoneChan := make(chan int)

// Run a goroutine to receive messages from our
// two channels, either errors or a done message
go func() {
  for {
    select {
    // When a new error comes in, append it to
    // the existing error slice
    case err := <-processingErrorChan:
      processingErrors = append(processingErrors, err)

    // When a done message is sent, close down all
    // channels and return, effectively closing the
    // goroutine. Although it's generally not preferable
    // to close channels in the receiving end since we
    // can't control if more messages are sent to a
    // channel, we can be sure that no more errors
    // will be sent through the channel since our
    // WaitGroup stopped blocking prior to sending
    // the done message, which should always signal
    // that no further messages are to be expected.
    case <-processingDoneChan:
      close(processingErrorChan)
      close(processingDoneChan)
      return
    }
  }
}()

Every time we handle an error inside our goroutine, we should make sure to send it to our newly-added error channel and return the goroutine so our WaitGroup gets notified accordingly.

go func(item *FileItem, idx int) {
  defer itemProcessingGroup.Done()

  // Process item...

  // If an error occurs, send to channel
  err := someFailingOperation()
  if err != nil {
  	processingErrorChan <- err
  	return
  }
}(&batchItems[idx], idx)

The only thing we're missing now, and this time there's not more to come unless you want to extend the logic with preferred changes, we'll send the done message once the WaitGroup stops blocking and check if the goroutines reported some errors we should handle.

// Wait until we processed all items
itemProcessingGroup.Wait()

// Send a simple done message through
// our channel to close down all channels
// used for error communication in this batch
processingDoneChan <- 0

// Check if errors occurred while processing items
if len(processingErrors) > 0 {
  // Handle errors accordingly
}

And that's it. We successfully created fully customizable batching functionality for our purpose, I'll show the final code once more:

package main

import (
	"errors"
	"math"
	"sync"
)

type FileItem struct {
	URL      string `json:"url"`
	FileName string `json:"fileName"`
	Size     int64  `json:"size"`
	MimeType string `json:"mimeType"`
}

func main() {
	items := make([]FileItem, 100)

	const maxBatchSize int = 25
	skip := 0
	filesAmount := len(items)
	batchAmount := int(math.Ceil(float64(filesAmount / maxBatchSize)))

	for i := 0; i <= batchAmount; i++ {
		lowerBound := skip
		upperBound := skip + maxBatchSize

		if upperBound > filesAmount {
			upperBound = filesAmount
		}

		batchItems := items[lowerBound:upperBound]

		skip += maxBatchSize

		processingErrorChan := make(chan error)
		processingDoneChan := make(chan int)
		processingErrors := make([]error, 0)

		go func() {
			for {
				select {
				case err := <-processingErrorChan:
					processingErrors = append(processingErrors, err)
				case <-processingDoneChan:
					close(processingErrorChan)
					close(processingDoneChan)
					return
				}
			}
		}()

		var itemProcessingGroup sync.WaitGroup
		itemProcessingGroup.Add(len(batchItems))

		for idx := range batchItems {
			go func(item *FileItem, idx int) {
				defer itemProcessingGroup.Done()

				// Process item...
			}(&batchItems[idx], idx)
		}

		itemProcessingGroup.Wait()

		processingDoneChan <- 0

		if len(processingErrors) > 0 {
			// ...
		}
	}
}

func someFailingOperation() error {
	return errors.New("something went wrong")
}

If you enjoyed reading this post, feel free to check out my other related posts below and follow me on Twitter to stay on top of future posts!

And as always, if you have questions, suggestions or other feedback, don't hesitate to send a mail 👍

Thanks for reading this post 🙌 I'm building CodeTrail, which helps engineering teams document and share knowledge close to the codebase with no friction. If you're still using Notion, Confluence or Google Docs to document your engineering work, give it a try and let me know what you think!

Bruno Scheufler

At the intersection of software engineering
and management.

On other platforms