package main
import (
"context"
"flag"
"fmt"
"log"
"regexp"
"strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
var (
input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt",
"Input file")
output = flag.String("output", "", "Output file (required)")
wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
)
func init() {
register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
register.Function2x1(formatFn)
register.Emitter1[string]()
}
type extractFn struct{}
func (f *extractFn) ProcessElement(ctx context.Context, line string,
emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
s = s.Scope("CountWords")
col := beam.ParDo(s, &extractFn{}, lines)
return stats.Count(s, col)
}
func main() {
flag.Parse()
beam.Init()
if *output == "" {
log.Fatal("No output provided")
}
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, *input)
counted := CountWords(s, lines)
formatted := beam.ParDo(s, formatFn, counted)
textio.Write(s, *output, formatted)
if err := beamx.Run(context.Background(), p); err != nil {
log.Fatalf("Failed to execute job: %v", err)
}
}