Go, for Distributed Systems
Russ Cox
Russ Cox
I gave variants of this talk three times in 2013, once at SOSP's Programming Languages and Operating Systems (PLOS) workshop, once at MIT Lincoln Lab's annual Software Engineering Symposium, and once at Twitter's Cambridge, Massachusetts office.
The talk assumes an audience familiar with the basic problems of building distributed systems. It presents Go's approach to solving some of those problems.
2Go is an open source programming language that makes it easy to build simple, reliable, and efficient software.
3Design began in late 2007.
Became open source in November 2009.
Developed entirely in the open; very active community.
Language stable as of Go 1, early 2012.
Started as an answer to software problems at Google:
A simple but powerful and fun language.
For more background on design:
6Engineering
Interfaces
Concurrency
7package main import "fmt" func main() { fmt.Printf("hello, world\n") }
import "fmt" guaranteed to read exactly one file.
9$ go get github.com/golang/glog
package main import ( "flag" "github.com/golang/glog" ) func main() { flag.Set("logtostderr", "true") glog.Infof("hello, world") }
Still guaranteed to read exactly one file.
Import name space is decentralized.
10$ gofmt -r 'glog.Infof -> glog.Errorf' hello1.go package main import ( "flag" "github.com/golang/glog" ) func main() { flag.Set("logtostderr", "true") glog.Errorf("hello, world") } $
In C and C++, too much programming and API design is about memory management.
Go has garbage collection, only.
Fundamental for interfaces: memory management details do not bifurcate otherwise-similar APIs.
Fundamental for concurrency: too hard to track ownership otherwise.
Of course, adds cost, latency, complexity in run time system.
12Experience with Java: Uncontrollable cost, too much tuning.
Go lets you limit allocation by controlling memory layout.
Examples:
type Ring struct { R, W int Data [512]byte } type Point struct { X, Y int } type Rectangle struct { Min, Max Point }
Garbage collector implementation remains an active area of work and research.
Design decision: Interior pointers are allowed, as are foreign pointers.
Current design: parallel mark-and-sweep.
With care to use memory wisely, works well in production.
An interface defines a set of methods.
package io type Writer interface { Write(data []byte) (n int, err error) }
A type implements the interface by implementing the methods.
package bytes type Buffer struct { ... } func (b *Buffer) Write(data []byte) (n int, err error) { ... }
An implementation of an interface can be assigned to a variable of that interface type.
package fmt func Fprintf(w io.Writer, format string, args ...interface{})
// +build ignore,OMIT
package main
import (
"bytes"
"fmt"
"io"
"os"
)
var _ = io.Copy
func main() {
b := new(bytes.Buffer) var w io.Writer w = b fmt.Fprintf(w, "hello, %s\n", "world") os.Stdout.Write(b.Bytes())
}
Reader is the obvious counterpart.
package io type Reader interface { Read(data []byte) (n int, err error) } func Copy(dst Writer, src Reader) (n int64, err error)
// +build ignore,OMIT
package main
import (
"bytes"
"fmt"
"io"
"os"
)
func main() {
b := new(bytes.Buffer) fmt.Fprintf(b, "hello, %s\n", "world") io.Copy(os.Stdout, b)
}
Reader and Writer turn out to be very useful.
Adapters
package io func MultiWriter(writers ...Writer) Writer MultiWriter creates a writer that duplicates its writes to all the provided writers, similar to the Unix tee(1) command.
Chaining
package gzip // compress/gzip func NewWriter(w io.Writer) *Writer
Also: buffered writers, encrypted writers, limited writers, HTTP responses.
22Networking:
package net type Conn interface { Read(b []byte) (n int, err error) Write(b []byte) (n int, err error) Close() error LocalAddr() Addr RemoteAddr() Addr SetDeadline(t time.Time) error SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error } func Dial(network, address string) (Conn, error)
Networking example:
// +build ignore,OMIT
package main
import (
"bufio"
"io"
"log"
"net"
"os"
"os/exec"
)
func main() {
if len(os.Args) > 1 && os.Args[1] == "serve" {
serve()
}
finger()
}
func finger() {
c, err := net.Dial("tcp", "localhost:finger") if err != nil { log.Fatal(err) } io.WriteString(c, "rsc\n") io.Copy(os.Stdout, c)
}
func serve() {
l, err := net.Listen("tcp", "localhost:finger")
if err != nil {
log.Fatal(err)
}
for {
c, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go serveConn(c)
}
}
func serveConn(c net.Conn) {
defer c.Close()
b := bufio.NewReader(c)
l, err := b.ReadString('\n')
if err != nil {
return
}
cmd := exec.Command("finger", l[:len(l)-1])
cmd.Stdout = c
cmd.Stderr = c
cmd.Run()
}
Networking client as adapter function:
package smtp func NewClient(conn net.Conn, host string) (*Client, error)
Other implementations of net.Conn: testing, SSL, ...
25Key advantages:
The source of all generality in the Go language.
26Concurrency is about dealing with lots of things at once.
Parallelism is about doing lots of things at once.
Concurrency is about structure, parallelism is about execution.
Concurrency provides a way to structure a solution to solve a problem that may be parallelizable (or not).
28Concurrent: mouse, keyboard, display, and disk drivers in operating system.
Parallel: vector dot product, matrix multiply.
Concurrency can enable parallelism but is useful on its own: modern programs must deal with many things at once.
29Go provides two important concepts:
A goroutine is a thread of control within the program, with its own local variables and stack. Cheap, easy to create.
A channel carries typed messages between goroutines.
30package main import "fmt" func main() { c := make(chan string) go func() { c <- "Hello" c <- "World" }() fmt.Println(<-c, <-c) }
Channels adopted from Hoare's Communicating Sequential Processes.
Go enables simple, safe concurrent programming.
It doesn't forbid bad programming.
Caveat: not purely memory safe; sharing is legal.
Passing a pointer over a channel is idiomatic.
Experience shows this is practical.
32Sequential network address resolution, given a work list:
// +build ignore,OMIT
package main
import (
"fmt"
"math/rand"
"time"
)
func lookup() {
for _, w := range worklist { w.addrs, w.err = LookupHost(w.host) }
}
func main() {
rand.Seed(time.Now().UnixNano())
t0 := time.Now()
lookup()
fmt.Printf("\n")
for _, w := range worklist {
if w.err != nil {
fmt.Printf("%s: error: %v\n", w.host, w.err)
continue
}
fmt.Printf("%s: %v\n", w.host, w.addrs)
}
fmt.Printf("total lookup time: %.3f seconds\n", time.Since(t0).Seconds())
}
var worklist = []*Work{
{host: "fast.com"},
{host: "slow.com"},
{host: "fast.missing.com"},
{host: "slow.missing.com"},
}
type Work struct {
host string
addrs []string
err error
}
func LookupHost(name string) (addrs []string, err error) {
t0 := time.Now()
defer func() {
fmt.Printf("lookup %s: %.3f seconds\n", name, time.Since(t0).Seconds())
}()
h := hosts[name]
if h == nil {
h = failure
}
return h(name)
}
type resolver func(string) ([]string, error)
var hosts = map[string]resolver{
"fast.com": delay(10*time.Millisecond, fixedAddrs("10.0.0.1")),
"slow.com": delay(2*time.Second, fixedAddrs("10.0.0.4")),
"fast.missing.com": delay(10*time.Millisecond, failure),
"slow.missing.com": delay(2*time.Second, failure),
}
func fixedAddrs(addrs ...string) resolver {
return func(string) ([]string, error) {
return addrs, nil
}
}
func delay(d time.Duration, f resolver) resolver {
return func(name string) ([]string, error) {
time.Sleep(d/2 + time.Duration(rand.Int63n(int64(d/2))))
return f(name)
}
}
func failure(name string) ([]string, error) {
return nil, fmt.Errorf("unknown host %v", name)
}
Parallel network address resolution, given a work list:
// +build ignore,OMIT
package main
import (
"fmt"
"math/rand"
"time"
)
func lookup() {
done := make(chan bool, len(worklist)) for _, w := range worklist { go func(w *Work) { w.addrs, w.err = LookupHost(w.host) done <- true }(w) } for i := 0; i < len(worklist); i++ { <-done }
}
func main() {
rand.Seed(time.Now().UnixNano())
t0 := time.Now()
lookup()
fmt.Printf("\n")
for _, w := range worklist {
if w.err != nil {
fmt.Printf("%s: error: %v\n", w.host, w.err)
continue
}
fmt.Printf("%s: %v\n", w.host, w.addrs)
}
fmt.Printf("total lookup time: %.3f seconds\n", time.Since(t0).Seconds())
}
var worklist = []*Work{
{host: "fast.com"},
{host: "slow.com"},
{host: "fast.missing.com"},
{host: "slow.missing.com"},
}
type Work struct {
host string
addrs []string
err error
}
func LookupHost(name string) (addrs []string, err error) {
t0 := time.Now()
defer func() {
fmt.Printf("lookup %s: %.3f seconds\n", name, time.Since(t0).Seconds())
}()
h := hosts[name]
if h == nil {
h = failure
}
return h(name)
}
type resolver func(string) ([]string, error)
var hosts = map[string]resolver{
"fast.com": delay(10*time.Millisecond, fixedAddrs("10.0.0.1")),
"slow.com": delay(2*time.Second, fixedAddrs("10.0.0.4")),
"fast.missing.com": delay(10*time.Millisecond, failure),
"slow.missing.com": delay(2*time.Second, failure),
}
func fixedAddrs(addrs ...string) resolver {
return func(string) ([]string, error) {
return addrs, nil
}
}
func delay(d time.Duration, f resolver) resolver {
return func(name string) ([]string, error) {
time.Sleep(d/2 + time.Duration(rand.Int63n(int64(d/2))))
return f(name)
}
}
func failure(name string) ([]string, error) {
return nil, fmt.Errorf("unknown host %v", name)
}
Aside: can abstract this pattern.
// +build ignore,OMIT
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func lookup() {
var group sync.WaitGroup for _, w := range worklist { group.Add(1) go func(w *Work) { w.addrs, w.err = LookupHost(w.host) group.Done() }(w) } group.Wait()
}
func main() {
rand.Seed(time.Now().UnixNano())
t0 := time.Now()
lookup()
fmt.Printf("\n")
for _, w := range worklist {
if w.err != nil {
fmt.Printf("%s: error: %v\n", w.host, w.err)
continue
}
fmt.Printf("%s: %v\n", w.host, w.addrs)
}
fmt.Printf("total lookup time: %.3f seconds\n", time.Since(t0).Seconds())
}
var worklist = []*Work{
{host: "fast.com"},
{host: "slow.com"},
{host: "fast.missing.com"},
{host: "slow.missing.com"},
}
type Work struct {
host string
addrs []string
err error
}
func LookupHost(name string) (addrs []string, err error) {
t0 := time.Now()
defer func() {
fmt.Printf("lookup %s: %.3f seconds\n", name, time.Since(t0).Seconds())
}()
h := hosts[name]
if h == nil {
h = failure
}
return h(name)
}
type resolver func(string) ([]string, error)
var hosts = map[string]resolver{
"fast.com": delay(10*time.Millisecond, fixedAddrs("10.0.0.1")),
"slow.com": delay(2*time.Second, fixedAddrs("10.0.0.4")),
"fast.missing.com": delay(10*time.Millisecond, failure),
"slow.missing.com": delay(2*time.Second, failure),
}
func fixedAddrs(addrs ...string) resolver {
return func(string) ([]string, error) {
return addrs, nil
}
}
func delay(d time.Duration, f resolver) resolver {
return func(name string) ([]string, error) {
time.Sleep(d/2 + time.Duration(rand.Int63n(int64(d/2))))
return f(name)
}
}
func failure(name string) ([]string, error) {
return nil, fmt.Errorf("unknown host %v", name)
}
Aside: can abstract this pattern further (hypothetical):
var par ParallelDo for _, w := range worklist { w := w // copy iteration variable par.Do(func() { w.addrs, w.err = net.LookupHost(w.host) }) ) par.Wait()
But it's still useful to be able to construct alternate patterns.
36Bounded parallelism:
// +build ignore,OMIT
package main
import (
"fmt"
"math/rand"
"time"
)
func lookup() {
const max = 2 done := make(chan bool, len(worklist)) limit := make(chan bool, max) for _, w := range worklist { go func(w *Work) { limit <- true w.addrs, w.err = LookupHost(w.host) <-limit done <- true }(w) } for i := 0; i < len(worklist); i++ { <-done }
}
func main() {
rand.Seed(time.Now().UnixNano())
t0 := time.Now()
lookup()
fmt.Printf("\n")
for _, w := range worklist {
if w.err != nil {
fmt.Printf("%s: error: %v\n", w.host, w.err)
continue
}
fmt.Printf("%s: %v\n", w.host, w.addrs)
}
fmt.Printf("total lookup time: %.3f seconds\n", time.Since(t0).Seconds())
}
var worklist = []*Work{
{host: "fast.com"},
{host: "slow.com"},
{host: "fast.missing.com"},
{host: "slow.missing.com"},
}
type Work struct {
host string
addrs []string
err error
}
func LookupHost(name string) (addrs []string, err error) {
t0 := time.Now()
defer func() {
fmt.Printf("lookup %s: %.3f seconds\n", name, time.Since(t0).Seconds())
}()
h := hosts[name]
if h == nil {
h = failure
}
return h(name)
}
type resolver func(string) ([]string, error)
var hosts = map[string]resolver{
"fast.com": delay(10*time.Millisecond, fixedAddrs("10.0.0.1")),
"slow.com": delay(2*time.Second, fixedAddrs("10.0.0.4")),
"fast.missing.com": delay(10*time.Millisecond, failure),
"slow.missing.com": delay(2*time.Second, failure),
}
func fixedAddrs(addrs ...string) resolver {
return func(string) ([]string, error) {
return addrs, nil
}
}
func delay(d time.Duration, f resolver) resolver {
return func(name string) ([]string, error) {
time.Sleep(d/2 + time.Duration(rand.Int63n(int64(d/2))))
return f(name)
}
}
func failure(name string) ([]string, error) {
return nil, fmt.Errorf("unknown host %v", name)
}
Bounded parallelism, 2:
// +build ignore,OMIT
package main
import (
"fmt"
"math/rand"
"time"
)
func lookup() {
const max = 2 n := 0 done := make(chan bool, max) for _, w := range worklist { if n++; n > max { <-done n-- } go func(w *Work) { w.addrs, w.err = LookupHost(w.host) done <- true }(w) } for ; n > 0; n-- { <-done }
}
func main() {
rand.Seed(time.Now().UnixNano())
t0 := time.Now()
lookup()
fmt.Printf("\n")
for _, w := range worklist {
if w.err != nil {
fmt.Printf("%s: error: %v\n", w.host, w.err)
continue
}
fmt.Printf("%s: %v\n", w.host, w.addrs)
}
fmt.Printf("total lookup time: %.3f seconds\n", time.Since(t0).Seconds())
}
var worklist = []*Work{
{host: "fast.com"},
{host: "slow.com"},
{host: "fast.missing.com"},
{host: "slow.missing.com"},
}
type Work struct {
host string
addrs []string
err error
}
func LookupHost(name string) (addrs []string, err error) {
t0 := time.Now()
defer func() {
fmt.Printf("lookup %s: %.3f seconds\n", name, time.Since(t0).Seconds())
}()
h := hosts[name]
if h == nil {
h = failure
}
return h(name)
}
type resolver func(string) ([]string, error)
var hosts = map[string]resolver{
"fast.com": delay(10*time.Millisecond, fixedAddrs("10.0.0.1")),
"slow.com": delay(2*time.Second, fixedAddrs("10.0.0.4")),
"fast.missing.com": delay(10*time.Millisecond, failure),
"slow.missing.com": delay(2*time.Second, failure),
}
func fixedAddrs(addrs ...string) resolver {
return func(string) ([]string, error) {
return addrs, nil
}
}
func delay(d time.Duration, f resolver) resolver {
return func(name string) ([]string, error) {
time.Sleep(d/2 + time.Duration(rand.Int63n(int64(d/2))))
return f(name)
}
}
func failure(name string) ([]string, error) {
return nil, fmt.Errorf("unknown host %v", name)
}
Aside: can abstract (still hypothetical):
par.Limit(10) for _, w := range work { w := w // copy iteration variable par.Do(func() { w.addrs, w.err = net.LookupHost(w.host) }) ) par.Wait()
Example: replicated storage with read and write quorums.
const ( F = 2 N = 5 // >= 2F + 1 ReadQuorum = F + 1 WriteQuorum = N - F )
Replicated write, returning after enough writes have succeeded.
// +build ignore,OMIT
package main
import (
"fmt"
"math"
"math/rand"
"sync"
"time"
)
const (
F = 2
N = 5
ReadQuorum = F + 1
WriteQuorum = N - F
)
var delay = false
type Server struct {
mu sync.Mutex
data map[string]*Data
}
type Data struct {
Key string
Value string
Time time.Time
}
func (srv *Server) Delay() {
if delay == false {
return
}
time.Sleep(time.Duration(math.Abs(rand.NormFloat64()*1e9 + 0.1e9)))
}
func (srv *Server) Write(req *Data) {
t0 := time.Now()
defer func() {
if delay {
fmt.Printf("write took %.3f seconds\n", time.Since(t0).Seconds())
}
}()
srv.mu.Lock()
defer srv.mu.Unlock()
srv.Delay()
if srv.data == nil {
srv.data = make(map[string]*Data)
}
if d := srv.data[req.Key]; d == nil || d.Time.Before(req.Time) {
srv.data[req.Key] = req
}
}
func (srv *Server) Read(key string) *Data {
t0 := time.Now()
defer func() {
fmt.Printf("read took %.3f seconds\n", time.Since(t0).Seconds())
}()
srv.mu.Lock()
defer srv.mu.Unlock()
srv.Delay()
return srv.data[key]
}
func better(x, y *Data) *Data {
if x == nil {
return y
}
if y == nil || y.Time.Before(x.Time) {
return x
}
return y
}
func Write(req *Data) {
t0 := time.Now()
done := make(chan bool, len(servers)) for _, srv := range servers { go func(srv *Server) { srv.Write(req) done <- true }(srv) } for n := 0; n < WriteQuorum; n++ { <-done }
if delay {
fmt.Printf("write committed at %.3f seconds\n", time.Since(t0).Seconds())
}
for n := WriteQuorum; n < N; n++ {
<-done
}
if delay {
fmt.Printf("all replicas written at %.3f seconds\n", time.Since(t0).Seconds())
}
}
func Read(key string) {
t0 := time.Now()
replies := make(chan *Data, len(servers))
for _, srv := range servers {
go func(srv *Server) {
replies <- srv.Read(key)
}(srv)
}
var d *Data
for n := 0; n < ReadQuorum; n++ {
d = better(d, <-replies)
}
if delay {
fmt.Printf("read committed at %.3f seconds\n", time.Since(t0).Seconds())
}
for n := ReadQuorum; n < N; n++ {
<-replies
}
if delay {
fmt.Printf("all replicas read at %.3f seconds\n", time.Since(t0).Seconds())
}
}
var servers []*Server
func main() {
servers = make([]*Server, N)
for i := range servers {
servers[i] = new(Server)
}
rand.Seed(time.Now().UnixNano())
delay = false
Write(&Data{"hello", "there", time.Now()})
time.Sleep(1 * time.Millisecond)
delay = true
Write(&Data{"hello", "world", time.Now()})
// Read("hello")
}
Replicated read, returning after enough reads have been gathered.
// +build ignore,OMIT
package main
import (
"fmt"
"math"
"math/rand"
"sync"
"time"
)
const (
F = 2
N = 5
ReadQuorum = F + 1
WriteQuorum = N - F
)
var delay = false
type Server struct {
mu sync.Mutex
data map[string]*Data
}
type Data struct {
Key string
Value string
Time time.Time
}
func (srv *Server) Delay() {
if delay == false {
return
}
time.Sleep(time.Duration(math.Abs(rand.NormFloat64()*1e9 + 0.1e9)))
}
func (srv *Server) Write(req *Data) {
t0 := time.Now()
defer func() {
if delay {
fmt.Printf("write took %.3f seconds\n", time.Since(t0).Seconds())
}
}()
srv.mu.Lock()
defer srv.mu.Unlock()
srv.Delay()
if srv.data == nil {
srv.data = make(map[string]*Data)
}
if d := srv.data[req.Key]; d == nil || d.Time.Before(req.Time) {
srv.data[req.Key] = req
}
}
func (srv *Server) Read(key string) *Data {
t0 := time.Now()
defer func() {
fmt.Printf("read took %.3f seconds\n", time.Since(t0).Seconds())
}()
srv.mu.Lock()
defer srv.mu.Unlock()
srv.Delay()
return srv.data[key]
}
func better(x, y *Data) *Data {
if x == nil {
return y
}
if y == nil || y.Time.Before(x.Time) {
return x
}
return y
}
func Write(req *Data) {
t0 := time.Now()
done := make(chan bool, len(servers))
for _, srv := range servers {
go func(srv *Server) {
srv.Write(req)
done <- true
}(srv)
}
for n := 0; n < WriteQuorum; n++ {
<-done
}
if delay {
fmt.Printf("write committed at %.3f seconds\n", time.Since(t0).Seconds())
}
for n := WriteQuorum; n < N; n++ {
<-done
}
if delay {
fmt.Printf("all replicas written at %.3f seconds\n", time.Since(t0).Seconds())
}
}
func Read(key string) {
t0 := time.Now()
replies := make(chan *Data, len(servers)) for _, srv := range servers { go func(srv *Server) { replies <- srv.Read(key) }(srv) } var d *Data for n := 0; n < ReadQuorum; n++ { d = better(d, <-replies) }
if delay {
fmt.Printf("read committed at %.3f seconds\n", time.Since(t0).Seconds())
}
for n := ReadQuorum; n < N; n++ {
<-replies
}
if delay {
fmt.Printf("all replicas read at %.3f seconds\n", time.Since(t0).Seconds())
}
}
var servers []*Server
func main() {
servers = make([]*Server, N)
for i := range servers {
servers[i] = new(Server)
}
rand.Seed(time.Now().UnixNano())
delay = false
Write(&Data{"hello", "there", time.Now()})
time.Sleep(1 * time.Millisecond)
Write(&Data{"hello", "world", time.Now()})
delay = true
Read("hello")
}
Select allows choosing between multiple channel operations.
Example, chat program:
for { select { case event := <-ui: // process user interface event case msg := <-server: // process server message case t := <-tick: // time has elapsed } }
Most important:
vitess/vtocc, MySQL query balancer
groupcache
https://go.dev/
rsc@golang.org
Videos:
Questions?
46Russ Cox