Implementing functional streams with generics in Go

Note 1: this is an experiment with an upcoming change in Go's type system. If you need something like what's described in the post in the real world, use channels, or something like RxGo.

Note 2: I originally started this post in August and almost abandoned it, but figured it's still a useful exploration of an important upcoming feature use channels, or something like RxGo.

Go 1.18 will support generics, and I decided I would give it a shot. I've been playing with the idea of representing streams with channels and higher order functions that operate on those channels, allowing processes to execute on streams of data somewhat-lazily. This is 100% a toy project and a production-ready version of "lazy/generic streams" will likely come in some future release of RxGo.

Still, while working on a solution to get this working with channels, I figured we could also try to get it working in a more traditional representation, at least in functional languages: streams can be represented by a pair - a value and a function to generate the next value.

I started with a question: with generics, is the Go type-system expressive enough to represent such streams. And how bad would it look without Ocaml's or Haskell's type inference capabilities? Turns out we can represent streams, and the limited type-inference makes it a bit annoying, but it isn't too bad!

The basic type definition

First, let's start with the type definition. Like mentioned above, we want to represent a stream of T as a nullable pair of some value T and a function that produces the next value of the stream T. A nil stream represents an empty stream.

Go doesn't directly support pairs, so we're going to use a struct here:

type Stream[T any] struct {
	Value T
	Next  func() *Stream[T]
}

And we'll represent streams as pointers to that struct, which takes care of the nullable part.

It looks like a linked-list, but since we want to be able to represent a potentially infinite linked list, instead of making Next a pointer to Stream, we make it a function that returns the pointer, and lazily execute that function as needed.

Creating streams

First, let's start with some very basic streams: an empty stream, and a singleton stream (that contains a single element). We'll use helper functions to implement those:

func Empty[T any]() *Stream[T] {
	return nil
}

func Singleton[T any](element T) *Stream[T] {
	return &Stream[T]{
		Value: element,
		Next:  Empty[T],
	}
}

A more interesting helper function is a function that takes a slice and returns a stream with the elements of that slice:

func FromSlice[T any](items []T) *Stream[T] {
	if len(items) == 0 {
		return nil
	}
	return &Stream[T]{
		Value: items[0],
		Next: func() *Stream[T] {
			return FromSlice(items[1:])
		},
	}
}

To keep the context around of where we are in the slice as the stream is consumed, we introduce a helper function that takes the index that we want to consume in the slice, and FromSlice invokes that helper function starting at index 0. The stream becomes nil as soon as the index grows too large, indicating that we finished iterating over the slice.

Infinite streams

Using a slice is no fun, we probably want to be able to implement streams that are potentially infinite. For example, a stream of natural numbers could be represented as follows:

func nat(start int) *Stream[int] {
	return &Stream[int]{
		Value: start,
		Next: func() *Stream[int] {
			return nat(start + 1)
		},
	}
}

Here is where the limitation with Go's type inference starts to show: since it can't infer return types, we have to manually specify Stream[int] in multiple places.

Note how we never return nil in the function above, indicating that this stream doesn't really end.

Manipulating streams

Now that we know how to create them, we need to understand how to manipulate them to accomplish something useful. Two useful helper functions, useful for debugging and what not, are Iter and ToSlice: Iter takes a stream and a function, and iterates over the stream, invoking the function for each element, while ToSlice converts a stream to a slice.

Here's Iter:

func Iter[T any](stream *Stream[T], f func(T)) {
	for ; stream != nil; stream = stream.Next() {
		f(stream.Value)
	}
}

And here's ToSlice, built on top of Iter:

func ToSlice[T any](stream *Stream[T]) []T {
	var result []T
	Iter(stream, func(value T) {
		result = append(result, value)
	})
	return result
}

(folks who are paying attention will probably suggest that we use something like Fold instead of Iter to implement ToSlice, we'll get there).

And now that we have Iter and an infinite stream, we should try to use it maybe? :)

func main() {
	Iter(nat(0), func(v int) {
		fmt.Println(v)
	})
}

This is a very fancy way of making an infinite loop :)

Higher-order functions

Higher-order functions make streams more interesting. There are many possible high-order functions, but we'll explore some common names here: Map, Filter and Fold (Fold may also be called Reduce in other contexts). Then we'll do a fun one with FlatMap.

Note: different from actual lists/arrays, with streams one can't implement Map or Filter using Fold, as Fold always consumes the stream.

The interesting thing with streams is that functions such as Filter and Map don't do any computation unless needed: streams are lazy by nature, and actual computations only happen when they're consumed.

Map

Map takes a stream of type T and a function from T to U and returns a stream of U. Here's the implementation of Map:

func Map[T, U any](stream *Stream[T], f func(T) U) *Stream[U] {
	if stream == nil {
		return nil
	}
	return &Stream[U]{
		Value: f(stream.Value),
		Next: func() *Stream[U] {
			return Map(stream.Next(), f)
		},
	}
}

How can we use Map? A simple/stupid example would be to double all numbers from the nat stream to get a stream of even numbers:

func main() {
	evens := Map(nat(0), func(v int) int {
		return v * 2
	})
	Iter(evens, func(v int) {
		fmt.Println(v)
	})
}

Filter

Filter takes a stream of type T and a predicate function from T to bool and returns a new stream of T, containing only the elements of the original stream for which the given predicate returns true. Here's the implementation of filter:

func Filter[T any](stream *Stream[T], f func(T) bool) *Stream[T] {
	for ; stream != nil; stream = stream.Next() {
		if f(stream.Value) {
			return &Stream[T]{
				Value: stream.Value,
				Next: func() *Stream[T] {
					return Filter(stream.Next(), f)
				},
			}
		}
	}
	return stream
}

Since a non-nil Stream is required to have a valid element, Filter isn't totally lazy, as it has to consume the source stream until the predicate p returns true. Here's how we can get a stream of even numbers using Filter instead of Map:

func main() {
	evens := Filter(nat(0), func(v int) bool {
		return v%2 == 0
	})
	Iter(evens, func(v int) {
		fmt.Println(v)
	})
}

Fold

Fold can be used to eagerly fold the elements of a stream into some other value. For example, if you have a finite stream of integers, you could use Fold to find the largest, the smallest or the sum of all elements in the stream.

Since Fold is eager, it can't really operate on infinite streams, as that would loop forever. Let's look at the implementation of Fold:

func Fold[T, U any](stream *Stream[T], init U, f func(U, T) U) U {
	acc := init
	for ; stream != nil; stream = stream.Next() {
		acc = f(acc, stream.Value)
	}
	return acc
}

And here's an implementation of ToSlice that uses Fold instead of Iter:

func ToSlice[T any](stream *Stream[T]) []T {
	return Fold(stream, []T{}, func(acc []T, elm T) []T {
		return append(acc, elm)
	})
}

FlatMap

FlatMap works like Map, but instead of taking a function from T to U, it takes a function from T to Stream[U]. In order to implement FlatMap, we'll first implement another useful function: Append, which takes two streams s1 and s2 and returns a stream that will have all elements from s1, then all elements from s2. Here's the code for both Append and FlatMap:

func Append[T any](stream1 *Stream[T], stream2 *Stream[T]) *Stream[T] {
	if stream1 == nil {
		return stream2
	}
	return &Stream[T]{
		Value: stream1.Value,
		Next: func() *Stream[T] {
			return Append(stream1.Next(), stream2)
		},
	}
}

func FlatMap[T, U any](stream *Stream[T], f func(T) *Stream[U]) *Stream[U] {
	if stream == nil {
		return nil
	}
	return Append(f(stream.Value), FlatMap(stream.Next(), f))
}

One simple-ish example is taking a stream of strings and turning that into a stream of runes:

var s *Stream[string] = ...
FlatMap(s, func(v string) *Stream[rune] {
	return FromSlice([]rune(v))
})

Slicing streams

On top of filtering, mapping, appending and others, one may want to combine multiple streams (using Append declared above), or get some elements of a stream, or drop some items from a stream. For that, let's look at how we'd implement some other helper functions:

Here's the source for Take, TakeWhile and TakeUntil:

func Take[T any](stream *Stream[T], n uint) *Stream[T] {
	if n == 0 || stream == nil {
		return nil
	}
	return &Stream[T]{
		Value: stream.Value,
		Next: func() *Stream[T] {
			return Take(stream.Next(), n-1)
		},
	}
}

func TakeWhile[T any](stream *Stream[T], f func(T) bool) *Stream[T] {
	if stream == nil {
		return nil
	}
	if f(stream.Value) {
		return &Stream[T]{
			Value: stream.Value,
			Next: func() *Stream[T] {
				return TakeWhile(stream.Next(), f)
			},
		}
	}
	return nil
}

func TakeUntil[T any](stream *Stream[T], f func(T) bool) *Stream[T] {
	return TakeWhile(stream, func(v T) bool { return !f(v) })
}

Notice how TakeUntil is simply implemented in terms of TakeWhile. As a matter of fact, Take could also be implemented with TakeWhile and a closure:

func Take[T any](stream *Stream[T], n uint) *Stream[T] {
	return TakeWhile(stream, func(T) bool {
		if n == 0 {
			return false
		}
		n--
		return true
	})
}

Putting it all together in a realistic example

Let's do a very simple REPL style application: it has a shell where we can enter commands, and those commands will have some side-effect. Let's implement a poor man memcached that operates via stdin, and supports three commands set, get and del (which will set a key-value pair, get the value of a key and delete a key, respectively). We want to be able to have two separate layers: one for parsing and another one for executing commands, and we want to be able to send commands from stdin to the execution layer.

This is an extremely simple example that doesn't really need generics (it's stringly-typed :D), but should give an idea of how functional streams can be used.

First, let's introduce a helper function that allow us to generate a lines stream from an io.Reader:

func FromReader(r io.Reader) *Stream[string] {
	return fromReader(bufio.NewReader(r))
}

func fromReader(r *bufio.Reader) *Stream[string] {
	line, isPrefix, err := r.ReadLine()
	if err != nil {
		return nil
	}
	parts := []string{string(line)}
	for isPrefix {
		line, isPrefix, err = r.ReadLine()
		if err != nil {
			return nil
		}
		parts = append(parts, string(line))
	}
	return &Stream[string]{
		Value: strings.Join(parts, ""),
		Next: func() *Stream[string] {
			return fromReader(r)
		},
	}
}

Note: that we're eating errors here, just another demonstration that you shouldn't do this in production, at least not the way it's described in this blog post :)

The code below will also not handle any errors. Don't do this at home.

Now that we have that function, we can create our "database" instance, loop through the input and execute commands:

func ProcessCommands(input io.Reader, output io.Writer) {
	s := stream.FromReader(input)
	stream.Fold(s, NewDB(output), func(db *DB, line string) *DB {
		cmd := strings.Fields(line)
		db.Execute(cmd[0], cmd[1:])
		return db
	})
}

(for runnable code, checkout the GitHub repository)

Another more interesting example

A less realistic, but interesting example is taking a stream of numbers from stdin, parse them, and sum all the prime numbers. This requires using the helper FromReader, Map to parse the number, Filter to discard non-prime numbers and Fold to sum the filtered values. Notice that this is stdin, so values are getting piped through as they are read from stdin.

Here's what that "pipeline" looks like:

func main() {
	stdin := stream.FromReader(os.Stdin)
	numbers := stream.Map(stdin, parseLine)
	primes := stream.Filter(numbers, isPrime)
	sum := stream.Fold(primes, 0, sum)
	fmt.Println(sum)
}

(again, for runnable code, checkout the GitHub repo)

Why not methods?

TL;DR: Go doesn't really support it. It may in the future.

One thing one may notice from the example above is that using functions makes the code quite verbose, we have to introduce variables for intermediary streams (or we could nest function calls). Functional languages get away with that by having some sort of function composition or using pipe operators / threading macros. But in more object-oriented languages, methods are used, which is a better fit for Go. So the code below:

stdin := stream.FromReader(os.Stdin)
numbers := stream.Map(stdin, parseLine)
primes := stream.Filter(numbers, isPrime)
sum := stream.Fold(primes, 0, sum)

Could become something like:

sum := stream.FromReader(os.Stdin).Map(parseLine).Filter(isPrime).Fold(0, sum)

Why can't we do that in Go? The problem is that methods can't really be generic in Go, it's not currently supported by the generics implementation, which means the Map method in the pipeline above cannot be implemented. It's an issues with how methods are used for structural subtyping & interfaces, so it may be complicated to address or not happen at hall. See the issue in the Go issue tracker for more details!

Feedback

Do you have any feedback? Questions? Concerns? Wanna fix a typo? Checkout the source for this post in GitHub (feel free to send a PR), or the discussion in the GitHub repo.