RFC: Apache Beam Go SDK design
https://s.apache.org/beam-go-sdk-design-rfc
Henning Korsholm Rohde (herohde@google.com)
Bill Neubauer (wcn@google.com)
Go is strongly-typed, compiled language with a non-inheritance-based type system. It is both different enough that neither Java nor Python’s approaches can be readily re-used and has a natural programming style that would make direct reuse of some aspects awkward to Go programmers. Main relevant differences:
The Beam model and Java SDK makes extensive use of generics. It allows Java pipelines to be mostly type-safe using the capabilities of the language alone and it is used in method chaining, contexts, KV, reusable generic transforms, etc. Go offers no generics, except for a few built-in datatypes. The situation is comparable to Java 1.4. There’s a default base interface similar to java.Object, but using it would make the Go SDK effectively untyped, require casts everywhere and fail to take advantage of the type capabilities that do exist.
Despite strong typing, functions and methods cannot be overloaded in Go. For example, it makes a single “apply” method problematic. In contrast, the signature of ParDo in Java isn’t concerned with types of arguments, since overloads can handle all the needed cases.
The Go type system uses interfaces and composition instead of inheritance. Especially combined with the lack of generics, it makes some otherwise straightforward things harder to do. We’ll see this in composite transforms.
Go has no equivalent of Java’s Class.forName and cannot serialize types, functions or closures. This means that some of the patterns in Java, such as capturing local variables for side input tags, are not possible. These limitations end up causing a small tax for the user in some cases.
Java uses annotations for flexible DoFn signatures, default coder specification, etc. Go does not offer such functionality, except for string tags on struct fields. Again, this limitation makes some things harder to do and eliminates some design options (such as using parameter annotations to define side input tags).
Go has features such as first-class functions, full type reflection (as opposed to type-erased), multiple return values and more. We should not fail to take advantage of them.
In light of the Go language constraints and opportunities, the following major design points guide much of the rest of the design. We use the phrase “native” types to refer to built-in Go types as opposed to synthetic Beam types, such as KV<int,string>.
User functions that execute at runtime should be natively typed and not require casts. It should also not be possible to output an element of the wrong type. In other words, deferred user functions should look and feel much like normal Go functions and allow users to pick the same types as they normally would. For example, a function to increment an integer should look like:
func (i int) int { return i+1 }
Not require explicit or implicit casting that may fail at runtime, say:
func (i interface{}) interface{} { return i.(int)+1 }
func (i beam.Obj) beam.Obj { return beam.IntObj(i.AsInt() + 1)}
This turned out to be non-trivial to achieve for KV and side inputs. One cost is that the Go SDK runtime uses reflection-based metaprogramming extensively.
The lack of generics and overloading eliminates the option of a natively-typed apply chain and forces PCollections to be natively untyped (the Beam type is a runtime value) and in turn PTransforms to be weakly typed. The choice of natively-typed user functions also means that they do not conform to a small set of types that could inform the type system. Instead, we went with what we call the “direct” style, where PTransforms are Go functions that are invoked directly and preserve PCollection arity. The fluent style is not commonly used in Go. We use the natively generic slice (array-like) type for PCollections to retain some native type checking at the PCollection level. For example, transforms can have natural types that ensure that the pipeline shape is right:
Flatten: func ([]PCollection) PCollection
Sources and sinks fall out naturally as well:
textio.Read: func (string) PCollection
textio.Write: func (string, PCollection)
This approach avoids the need for PCollectionList, PCollectionTuple, etc. For ParDo, however, we need to introduce ParDo0, ParDo2, ParDo3, etc to indicate the return arity. Losing the arity of the PCollection at compile time was not a preferable design compromise since it would be too easy to write invalid pipelines. We do allow the user to eschew this safety by using ParDoN which returns a slice of PCollection.
The Go SDK performs static type checking of the pipeline during construction to avoid runtime type errors. The Beam type checker ensures that input/output types of PCollections match up with transforms. It uses a custom representation, FullType, to accurately represent Beam types such as KV<int,string>. The most complex aspect is analysing the DoFn signatures and validating that it’s compatible with the typing context. There are three components that make it hard:
swapKV: func (x X, y Y) (Y, X) { return y, x }
The type checker will infer that a KV<int,string> will be transformed to a KV<string,int>, say, when transformed by the swapKV DoFn, treating the special types as type variables. The Go type system prevents type mistakes such as returning (x, y) or (42, x). The limitation is that functions must use the fixed set of universal types and that non-equality type constraints are not possible.
During pipeline construction, binding and substitution logic are used to perform type-checking of the pipeline, resolving universal types into concrete types. If type-checking fails, the pipeline isn’t valid, and the user gets a type-checking error to help them debug.
The type checker is intimately tied to the universe of user function signatures and the ability to accurately process them. Most reusable transforms use simulated generic types, such as stats.Count or filter.Dedup. The Go SDK type system in some cases exceeds what the Java type checker can catch, such as using a wrong or non-existent side input tag in a DoFn -- the Java type system or SDK cannot easily catch such a mistake, because the tag being used is part of the closure only and not accessible until runtime.
For error handling, Go commonly uses returned errors (as the last tuple parameter with the special “error” type) and manual propagation. Go has no exceptions, except for a “panic” which is generally reserved for unrecoverable failures.
Runtime user functions should follow normal Go conventions and return an error on failure. For example,
trySideEffectFn: func (string) (string, error)
Where the function would return (“”, <some error>) if it failed to perform the side effect. The type checker understands that the (optional) trailing error is not part of the data signature. The runtime will check the error when the function invoked and fail the bundle if not nil.
During pipeline construction, however, transforms should panic at construction time as opposed to returning (PCollection, error). This style is a readability win and allows function chaining. It also works well for static pipelines where bad code should panic. To support programmatic construction of pipelines from user input, we have TryXXX methods that return errors. These APIs don’t allow for function chaining, but for programmatic pipeline construction, that’s OK. In that usage, the plan is being built a node at a time, so chaining wouldn’t be used. Users can mix the TryXXX and XXX semantics freely. Both are the in public API.
Other languages can get this in a single API surface by strategic usage of exception handlers. Since we don’t have exceptions, the bifurcated API surface is necessary to get the differing usability experiences.
The following examples illustrate most aspects of the Go SDK. They offer a concrete context in which to understand the model representation.
The latest version of the code can be found here:
https://github.com/apache/beam/tree/master/sdks/go
All examples are under “examples” and the main package under “pkg/beam”. All other code links in this document point into a snapshot for long-term stability and consistency with the descriptions here.
The below provides an overview of how the Go SDK represents the various objects and concepts in the Beam programming model. The code comments and examples provide further documentation.
Pipeline is still an object. While transforms can still be said to be “applied” to the pipeline, the mechanism is different: each primitive transform takes a pipeline as an argument and adds nodes/edges to the underlying graph (execution plan). This approach avoids the need for intermediate PBegin, PDone, PValue, etc types. The pipeline also holds the current (monitoring) scope for composite transforms to allow them to be Go functions. An advantage here is that transforms have more precise control.
A pipeline runner is simply any Go function that accepts a context and Pipeline for “execution”. Runners can be invoked directly or register themselves to make it easy for users to use them.
We currently do not have an equivalent of PipelineResult. Once the portability job API is stable and an implementation available, we’ll likely introduce a notion of PipelineResult based on that.
PCollection represents a homogeneous collection of data during pipeline construction. It is untyped, but holds the Beam type as a special FullType value. It also holds the coder and other metadata values, similarly to other SDKs.
FullType represents a (possibly generic) Beam type and is essential for type checking. FullType allows us to use the Go type system reflection, but build composite types like Windowed<KV<string, int>> or KV<X,string>. Each FullType has a root type, which is either a user-defined data type or one of our container types for KV encoding or windowing. Java largely handles this through generics and has little need for a separate representation, since these types all implement the same interface and the code that handles them can be generic.
Coder is a contract for transforming data to/from a byte sequence. In Go, we distinguish between system coders and simpler user-defined custom coders. This distinction is needed to handle the composite types specially, because Beam types like KV<int, string> do not actually exist as runtime values. A benefit is that users who wish to use custom coding do not have to be exposed to lengths and inner/outer contexts.
We assume that all data types have a coder. The user can assign their coder on a per-transform-output basis. If the user doesn’t assign a coder, the SDK assigns a default coding behavior. If the data type cannot be encoded by the default coder, the pipeline construction fails. The behavior of the default coder is similar to the behavior provided by the Go JSON package. The coder package documentation contains precise details.
DoFns and other user functions generally are allowed to take one of three forms balancing convenience and expressibility. The Go SDK can offer more flexibility here, because we implement our own type checker. The forms are:
func incFn(i int) int { return i+1 }
func filterFn(i int, emit func(int)) { if i > 5 { emit(i) }}
The first function returns exactly one output whereas the second emits zero or more outputs. Functions can be defined inline. The first style conveniently allows a vast number of standard library functions to be directly usable as DoFns, such as strings.ToUpper, without wrappers of any kind.
type fn struct {
Filter string `json:"filter"`
re *regexp.Regexp
}
func (f *fn) Setup() {
f.re = regexp.MustCompile(f.Filter)
}
func (f *fn) ProcessElement(s string, emit func(string)) {
if f.re.MatchString(word) { emit(word, count) }
}
Uses the special Setup method to initialize itself with a construction-time “Filter” value. The methods names must be public and are recognized by name, because their signature vary. For DoFn structs, possible methods are
Setup, StartBundle, ProcessElement, FinishBundle, Teardown
They have the same interpretation as in other SDKs. Combine functions work similarly, but use different method names.
Functions may also optionally take a context.Context and/or EventTime or return an error. The runtime will provide and handle these parameters accordingly. The signature thus informs the runner as to what the user function actually uses and it may potentially optimize accordingly.
The following are examples of legal DoFns signatures for doFn in beam.ParDo1(p, doFn, words) with a string-typed incoming PCollection:
A DoFn can be a simple function:
func (word string) string
func (word string, emit func(string))
They can return an error if necessary:
func (word string) (string, error)
func (word string, emit func(string)) error
Take in an optional EventTime or context:
func (t EventTime, word string) string
func (ctx context.Context, word string, emit func(string))
Or any combination of the above, such as:
func (ctx context.Context, t EventTime, word string) (string, error)
More formally, this expression describes the signature of an acceptable DoFn.
(Context?, EventTime?, main input, side inputs*, outputs*}) -> (EventTime?, output?, error?)
Restrictions:
Side inputs and GroupByKey (GBK) values are re-iterable streams of data. In Java, they are captured by an object with a generic interface. The interface handles the re-iteration contract, and provides convenience methods to extract singleton values. In Go, we capture the underlying types in the type signature of the side input because we have no choice. However, this does afford some elegance and flexibility. For example,
func (word string, size int, sample []string) string
Is the type of a DoFn with two side inputs: a singleton ‘size’ and a slice ‘sample’. The runtime performs the necessary conversions. There are no tags and the side inputs are positional. The iteration cases (which are required for GBK values) use functional arguments:
func (word string, sample func (*string) bool) string
func (word string, resample func () func (*string) bool) string
The latter allows the DoFn to iterate multiple times over the side input. For the type selected by the user can be used as a runner hint of how to manage the data.
Side outputs are just emitter functions, like the main output. They are also positional:
func (word string, emitSide func(string)) string
func (word string, emitMain, emitSide func(string))
Unlike the main output, side outputs can't be return values of the function, since doing so would be ambiguous with a KV return.
The functional side inputs, the main output, and the side outputs allow an optional EventTime parameter for the timestamp to be accessed or provided. For example,
func (t EventTime, word string) (EventTime, string)
func (t EventTime, word string, emit func (EventTime, string))
Finally, user functions can use universal types in place of any top-level pipeline data type.
The primitive transforms are Go functions in the beam package, with access to the internals of the Pipeline graph. The transforms generally have weak signatures and most validation is left for the type checker.
Impulse is a singleton source used to trigger other (source) transforms:
func Impulse(s Scope) PCollection
It always returns a singleton PCollection<[]byte>.
Create inserts a fixed set of values into the pipeline. The output type of the PCollection is the shared concrete type of the values.
func Create(s Scope, values ...interface{}) PCollection
It works similarly to the other SDKs.
ParDo is a family of functions, ParDo0, ParDo, ParDo2, ParDo3, etc for invoking DoFns on an incoming PCollection. The N encodes the number of outputs. The signature for the N=1 case is:
ParDo(s Scope, interface{}, PCollection, ...Option) PCollection
The transforms takes a pipeline, any value as the DoFn, the incoming PCollection and any number of options for specifying side input. It returns a single output PCollection, whose type
is inferred from the DoFn type and the side input types.
For example, the following DoFn has 1 int-typed singleton side input and 2 string-typed output:
func splitFn(word string, avg int, big, small func(string)) {
if len(word) < avg { small(word) } else { big(word) }
}
It could be used as follows:
words := beam.Create(s, “foo”, “foobar”)
avg := beam.Create(s, 4)
big, small := beam.ParDo2(s, splitFn, words, beam.SideInput{avg})
The two output PCollections would each be of string type and at runtime would be singleton collections containing the values “foobar” and “foo”, respectively. The preservation of native arity and Go native type inference make the ParDo transform applications concise.
GroupByKey has a simple signature:
func GroupByKey(s Scope, a PCollection) PCollection
Like other SDKs, it expects a KV<A,B> but here returns a PCollection with a special GBK<A,B> type for the GBK result. The type system and runtime understands these composite types. We use a special GBK type instead of composing KV<A,Iter<B>>, say, to make it easier for them to handle.
Flatten has the expected signature:
func Flatten(s Scope, cols ...PCollection) PCollection
Combine is similar to ParDo in that it takes a combine user function and possibly side inputs:
Combine(Scope, interface{}, PCollection, ...Option) PCollection
CombinePerKey(Scope, interface{}, PCollection, ...Option) PCollection
The combine is either a global or per-key combine, depending on the variant called.
The simplest combiner form is a binary function, where the accumulator and result types are identical. For example,
func sumFn(x, y int) int { return x + y }
Can be used as:
numbers := beam.Create(s, 1, 2, 3, 4)
sum := beam.Combine(s, sumFn, numbers)
The resulting PCollection is of type int and will be a singleton with the value 10 at runtime. Note that sumFn cannot use simulated generic types, because the + operator is not available for the universal types. To that end, we created a code-generation tool, specialize, to generate type-specialized transforms from a template. It is used in stats.Sum, for example.
Partition is similar to the other SDKs. It partitions a PCollection<A> into N partitions based on a partition function of type A -> int. The signature is:
Partition(Scope, int, interface{}, PCollection) []PCollection
Partition is not a primitive despite significant metaprogramming and could be implemented in user code.
The external Go SDK will be using Splittable DoFn, when supported by the FnAPI, and has no special support for sources or sinks. Sources currently use Impulse and ParDos. Sinks are just ParDos.
The standard IOs include textio and bigqueryio. The latter leverages Go struct tags to apply metadata to structured data types, which greatly improves the readability of sources/sinks using cloud connectors. This is evident in the tornadoes example, as compared to Java.
The Go SDK is a work in progress and there are numerous TODOs throughout the code. Some of the bigger unsupported pieces are:
The Go direct runner supports batch only. We plan to rely on the upcoming Universal Local Runner (ULR) for streaming and advanced use cases, which is a benefit of targeting the portability framework.