AsyncAPI streaming microservices with golang
Building event driven APIs described with AsyncAPI is easy.

Watch the video


If you want to learn more about AsyncAPI, head over to AsyncAPI.com and find out more about the standard and the power it provides us.

Moving forward, I will assume that you already know what AsyncAPI is and want to use it in your golang applications.

If you want to check out the source of this article it can be found on GitHub.

1. Getting started

We’re going to create a microservice that exposes a single streaming API via AsyncAPI.

Our service will broadcast a message containing a random word every second to any subscribers of its channel (one-to-many).

Image of AsyncAPI studio with the contract we're using in this tutorial loaded.
AsyncAPI Studio is a nice tool for visualizing our contract.

View the contract source on GitHub

The service offers a single subscription channel only, defined as topic/random-word.

Every second, the service broadcasts a random word to all subscribers of that channel.

Let’s look a the definition of the ‘RandomWord’ message broadcast.

  messages:
    RandomWord:
      description: |
        A random word for you to enjoy. There is no way to know which word it will be.        
      payload:
        allOf:
          - $ref: '#/components/schemas/TransportResponse'
          - type: object
            properties:
              payload:
                type: string
                description: A random english word.
                examples:
                    - pizza
                    - motorcycle
                    - guitars

We can see that it’s composed of two elements, properties defined by TransportResponse and a payload property, consisting of a string, which is our random word.

This tutorial uses Plank to provide all the socket, message broker, boilerplate, and glue code you typically need to implement AsyncAPI services.

Plank is a part of Transport, which operates as an asynchronous application framework for go.

Transport will wrap any response emitted by a service with these properties and make the service response available via the ‘payload’ value. The ‘payload’ can be an object, a primitive, or a string.


2. Creating our Random Word Service

The output of our Random Word Service will be just a string, so the payload of our API response is that random word that our service emits.

Let’s start by importing Transport into a new project.

go get github.com/vmware/transport-go

Next, let’s create the directory in which our AsyncAPI enabled services will live.

mkdir services

Now we can create our new Random Word Service, create a new file named ‘word_service.go

package services

import (
    "math/rand"
    "reflect"
    "github.com/google/uuid"
    "github.com/robfig/cron/v3"
    "github.com/vmware/transport-go/model"
    "github.com/vmware/transport-go/service"
)

const (
    RandomWordChannel = "random-word" // matches asyncapi destination channel.
)

// RandomWordService will broadcast a random word on the "simple-stream" channel, every one second.
type RandomWordService struct {
    words         []string                  // list of random words.
    transportCore service.FabricServiceCore // reference to transport services we will need later on.
    readyChan     chan bool                 // once we're ready, let plank know via this channel.
    cronJob       *cron.Cron                // cronjob that runs every 1s.
}

// NewRandomWordService will return a new instance of RandomWordService
func NewRandomWordService() *RandomWordService {
    return &RandomWordService{}
}
The RandomWordService struct defines a string slice named words, our random word broadcast source. It also defines a pointer to transportCore, which provides access to Transport context and other useful features.

The service also defines a readyChan, which lets Plank know that the service has loaded up our random words and is ready to go.

The last property is a pointer to a cron job that will allow us to execute something over and over, forever.


3. Generating random words

Before broadcasting random words, we need to add logic that generates a pool of random words from which to pick.

// Init will fire when our service is being registered by Plank. 
func (rws *RandomWordService) Init(core service.FabricServiceCore) error {

    // capture a reference to transport core services.
    rws.transportCore = core
    return nil
}

// OnServiceReady fires once Plank has all services loaded and ready to run. 
func (rws *RandomWordService) OnServiceReady() chan bool {
    rws.readyChan = make(chan bool, 1)

    // fetch a list of random words (which runs asynchronously), so it immediately returns.
    rws.fetchRandomWords()

    return rws.readyChan
}

Init and OnServiceReady are lifecycle hooks. They fire after Plank loads the service (Init), and Once Plank is ready to run the service (OnServiceReady).

OnServiceReady Returns a boolean chan that Plank will listen for a signal on before completing activation. We capture a pointer to it named readyChan


3.1 Using another API to generate a list of random words.

We call ‘fetchRandomWords’, which makes an API call using a built-in REST Service provided by Transport.

// fetchRandomWords will call a public REST endpoint that very kindly returns random words.
func (rws *RandomWordService) fetchRandomWords() {

    restRequest := &service.RestServiceRequest{
        Uri:          "https://random-word-api.herokuapp.com/word?number=500",
        Method:       "GET",
        ResponseType: reflect.TypeOf(rws.words),
    }

    // Transport provides a REST Service that makes this API call and provides handlers for the result.
    rws.transportCore.RestServiceRequest(restRequest,
        rws.handleWordFetchSuccess, // handle a successful API call.
        rws.handleWordFetchFailure) // handle a failed API call.
}

It will call a public API (https://random-word-api.herokuapp.com) that will generate random words for us.

We ask the API for 500 random words via the REST Service and then handle the success or failure of that API call by providing success and failure functions as handlers.


3.2 Handling a successful random word API call

// handleWordFetchSuccess will parse a successful incoming word response from our source API.
func (rws *RandomWordService) handleWordFetchSuccess(response *model.Response) {

    // set the word list to the response returned by the REST API Call.
    rws.words = response.Payload.([]string)

    // send a signal down our ready channel, so Plank knows to continue.
    rws.readyChan <- true
}

In our success handler handleWordFetchSuccess, we cast the response of the API call into a ‘[]string’ slice. We send a bool down our readyChan to alert Plank that we’re ready to go.

3.3 Handling a failed random word API call

If our failure handler handleWordFetchFailure activates, we make up some random words of our own and proceed anyway. The list isn’t very long, but it still fulfills the contract.

// handleWordFetchFailure will parse a failed random word API request.
func (rws *RandomWordService) handleWordFetchFailure(response *model.Response) {

    // now we have no data, so make something up using some hard coded values.
    rws.words = []string{"magnum", "fox", "kitty", "cotton", "ember"}

    // we have a back up data-set loaded.
    rws.readyChan <- true
}

4. Adding the cron job

Next, we add code that sets up a repeating cron job. It will send a broadcast message every second to all channel subscribers containing a random word using a simple function that picks it from our service’s list in memory.

// fireRandomWords will create a cron job that repeats every minute, that sends a message to all subscribers
// every minute. We then capture a pointer to that cronjob on our RandomWordService.
func (rws *RandomWordService) fireRandomWords() {

    // function to fire every second.
    var fireMessage = func() {
        id := uuid.New()

        // send a message containing a random word.
        rws.transportCore.SendResponse(&model.Request{Id: &id}, rws.getRandomWord())
    }
    rws.cronJob = cron.New()
    rws.cronJob.AddFunc("@every 1s", fireMessage)
    rws.cronJob.Start()
}

// getRandomWord will return a random word from our in memory list.
func (rws *RandomWordService) getRandomWord() string {
    return rws.words[rand.Intn(len(rws.words)-1)]
}

We need to update both our REST Service API handleWordFetchSuccess and handleWordFetchFailure handlers to call this new method.

It will ensure the service broadcasts an actual list of random words regardless of success or failure to obtain that data.

func (rws *RandomWordService) handleWordFetchFailure(response *model.Response) {
    ... 
    // start random word cron job.
    rws.fireRandomWords()
    ...
}

func (rws *RandomWordService) handleWordFetchSuccess(response *model.Response) {
    ... 
    // start random word cron job.
    rws.fireRandomWords()
    ...
}

5. Add in remaining lifecycle methods

The last step of building our streaming service is to add in a few more Plank lifecycle methods. First, we want to stop our cron job cleanly when we’re shutting down Plank, so we use OnServerShutdown.

// OnServerShutdown will stop the cronjob firing cleanly when Plank shuts down.
func (rws *RandomWordService) OnServerShutdown() {
    rws.cronJob.Stop()
}

We don’t need to implement the second and third methods, as we don’t need them for this tutorial. However, we still need to add them to ensure the contract Plank requires is fulfilled.

// GetRESTBridgeConfig is not used by this service.
func (rws *RandomWordService) GetRESTBridgeConfig() []*service.RESTBridgeConfig { return nil }

// HandleServiceRequest is not used by this servuce.
func (rws *RandomWordService) HandleServiceRequest(r *model.Request, c service.FabricServiceCore){}

Our streaming service is complete; now it’s time to serve it via Plank.


6. Creating the server to run the service

Let’s create a new directory named ‘server’ and add ‘server.go’ to it.

mkdir server

The ‘main’ function first creates a new instance of Plank that we name ‘platformServer’ using a default configuration.

package main

import (
    "os"
    "github.com/daveshanley/asyncapi-tutorials/streaming/services"
    "github.com/vmware/transport-go/plank/pkg/server"
    "github.com/vmware/transport-go/plank/utils"
)

// main will create a new instance of plank using a default configuration.
func main() {

    // create a default server configuration.
    serverConfig, err := server.CreateServerConfig()
    if err != nil {
        utils.Log.Fatalln(err)
        return
    }

    // create a new platform server from our configuration.
    platformServer := server.NewPlatformServer(serverConfig)

    // register our RandomWordService with our platform server.
    if err = platformServer.RegisterService(
            services.NewRandomWordService(), 
            services.RandomWordChannel); err != nil {
        utils.Log.Fatalln(err)
        return
    }

    // register a system channel with the platform, so we can catch interrupts and shut down cleanly.
    syschan := make(chan os.Signal, 1)

    // start plank and start streaming random words to everyone.
    platformServer.StartServer(syschan)
}

Next, we create a new instance of RandomWordService and register with platformServer This will run the Init and OnServiceReady methods mentioned earlier.

The last step is to capture any operating system interrupt commands (like Ctrl-C) to shut down the platform cleanly and pass them to the platformServer pointer.


7. Boot the server

go run server/server.go
Image of a console window, showing the Plank boot screen with Plank running the new service.
Random Word Service is up and running

You should see the Plank boot screen, which tells you that the platform is up and running on localhost on port 30080. The ‘Fabric endpoint’ is a WebSocket endpoint that is open and listening for STOMP connections.

How do we consume it however?


8. Create a client to connect and listen to the word stream

Let’s write a client to connect to our new local broker, subscribe to our new service broadcasting on ‘/topic/random-word’, listen for ten random words and then disconnect.

Let’s create a new file called ‘client.go

package main

import (
    "encoding/json"
    "sync"
    "github.com/vmware/transport-go/bridge"
    "github.com/vmware/transport-go/bus"
    "github.com/vmware/transport-go/model"
    "github.com/vmware/transport-go/plank/utils"
)

func main() {

    // create a message broker connector config and connect to
    // localhost over WebSocket on port 30080.
    config := &bridge.BrokerConnectorConfig{
        Username:   "guest",            // not required for demo, but our API requires it.
        Password:   "guest",            // ^^ same.
        ServerAddr: "localhost:30080",  // our local plank instance, running RandomWordService
        UseWS:      true,               // connect over websockets
        WebSocketConfig: &bridge.WebSocketConfig{   // configure websocket
            WSPath: "/ws",                          // websocket endpoint
            UseTLS: false,                          // this isn't required locally
        }}

First, we create a new message broker configuration that defines ‘localhost:30080’ as our server address and that we want to use WebSockets. We set the WebSocket path to ‘/ws’, which is the default for Plank.

    // get a pointer to transport
    b := bus.GetBus()

    // get a pointer to transport's channel manager
    cm := b.GetChannelManager()   

Next, we grab a couple of pointers to the event bus and bus channel manager provided by Transport.

    // connect to localhost:30080
    c, err := b.ConnectBroker(config)
    if err != nil {
        utils.Log.Fatalf("unable to connect to %s, error: %v", config.ServerAddr, err.Error())
    }  

Once we have those pointers, we can connect to our Plank server running locally on port 30080. Now, let’s create a channel on our application event bus called ‘my-local-word-stream,’ and then we create a stream handler that captures all messages on that channel.

    // create a local channel on the bus that we want to listen to in our application.
    myLocalChan := "my-local-word-stream"
    cm.CreateChannel(myLocalChan)

Next, we map our application channel ‘my-local-word-stream’ to the AsyncAPI channel defined by our service ‘/topic/random-word’.

Transport defines ‘Galactic Channels’ as event bus channels mapped to an AsyncAPI channel or Message Broker destination.

We can listen to a channel on our event bus by using the ListenStream method. It will return a stream handler that we will define as handler.

    // listen to stream of messages coming in on channel, a handler is returned
    // that allows you to add in lambdas that handle your success messages, and your errors.
    handler, _ := b.ListenStream(myLocalChan)

    // mark our local 'my-local-word-stream' myLocalChan as 'galactic' and map it to our connection and
    // the /topic/random-word service
    err = cm.MarkChannelAsGalactic(myLocalChan, "/topic/random-word", c)
    if err != nil {
        utils.Log.Fatalf("unable to map local channel to broker destination: %e", err)
    }

Our handler will allow us to register functions that capture all messages and errors. We don’t want to stream forever, so let’s put a ceiling of ten on our stream by using a WaitGroup

    // create a wait group that will wait 10 times before completing.
    var wg sync.WaitGroup
    wg.Add(10)

Now we have our stream handler and our wait group defined, we can define functions that handle incoming messages and errors.

If you recall the AsyncAPI contract, All responses that use Plank over AsyncAPI are an object that Transport provides containing a Payload property.

    // start and keep listening
    handler.Handle(
        func(msg *model.Message) {

            var randomWord string
            msg.CastPayloadToType(&randomWord)

We will log it out to the console (because it’s just a string) and then mark our wait group as done, incrementing its internal counter.

            // log it out.
            utils.Log.Infof("Random word: %s", value)

            wg.Done()
        },
        func(err error) {
            utils.Log.Errorf("error received on channel: %e", err)
        })

    // wait for 10 ticks of the stream, then we're done.
    wg.Wait()

After ten messages from our stream, the WaitGroup will complete. We can clean things up by closing our stream handler and marking our application bus channel as local. Our client will unsubscribe automatically from ‘/topic/random-word’ on our broker.

    // close our handler, we're done.
    handler.Close()

    // mark channel as local (unsubscribe from /topic/random-word)
    cm.MarkChannelAsLocal(myLocalChan)

Disconnecting is the last step.

    // disconnect from our broker.
    c.Disconnect()
}

9. Run the client

go run client.go

You should see Ten random words print out to the console after ten seconds.

Image of a console window, showing the the log output of ten random words.
Ten random words arriving as a stream that ticks every second.

And we’re done!


All the code from this tutorial can be found on GitHub

Head over to transport-bus.io if you would like to learn more about Transport as a tool for building full stack asynchronous applications.