An Akka HTTP Client with JSON Parsing

There are many sources of open data on the web, freely accessible via an Application Programming Interface (API) made available over the web. A common interchange format for these APIs is Javascript Object Notation (JSON) which is human readable and predictable, however is not in the correct format for analysis. The data needs to be parsed from the JSON string and made available as an object we can work with. This blog post considers a simple Akka Http client to read data from the Urban Observatory in Newcastle. If you just want to read the code, see this Gist.

Exploring the API

The Urban Observatory consists of a grid of sensors around the North East, measuring traffic, pollution and weather. The focus of this post will be getting sensor data from a single sensor, N05171T, a traffic sensor located near the Metro Centre on Hollinside road. The metadata from this sensor can be found by querying an API endpoint: http://uoweb1.ncl.ac.uk/api/v1/sensor.json?api_key=&sensor_name=N05171T. Note that this requires authentication, using an API key. An API key can be requested using this form.

The result of this query is:

## {
##     "type": "Traffic",
##     "geom": {
##         "type": "LineString",
##         "coordinates": [
##             [
##                 -1.674433347,
##                 54.959041883
##             ],
##             [
##                 -1.673947928,
##                 54.959553799
##             ]
##         ]
##     },
##     "active": "True",
##     "latest": "2017-02-21T08:23:06",
##     "base_height": null,
##     "sensor_height": null,
##     "name": "N05171T",
##     "source": {
##         "web_display_name": "NE Travel Data API (Third Party)",
##         "third_party": true,
##         "db_name": "Scoot Netravel Api",
##         "document": "",
##         "fancy_name": "NE Travel Data API"
##     }
## }
## 

We can see a bit of information about the sensor, including its location, time of latest reading and whether the sensor is active.

In order to retrieve the actual data from the sensor, we query the url http://uoweb1.ncl.ac.uk/api/v1/sensor/data/raw.json with the following required fields:

  • api_key your API key here
  • sensor_name N05171T
  • start_time 20170201
  • end_time 20170202

This returns the following:

## {
##     "type": "Traffic",
##     "geom": {
##         "type": "LineString",
##         "coordinates": [
##             [
##                 -1.674433347,
##                 54.959041883
##             ],
##             [
##                 -1.673947928,
##                 54.959553799
##             ]
##         ]
##     },
##     "active": "True",
##     "data": {
##         "Congestion": {
##             "data": {
##                 "2017-02-01 01:36:46": 0.0
##             },
##             "meta": {
##                 "name": "Congestion",
##                 "theme": "Traffic",
##                 "units": "%"
##             }
##         },
##         "Traffic Flow": {
##             "data": {
##                 "2017-02-01 01:36:46": 4.0
##             },
##             "meta": {
##                 "name": "Traffic Flow",
##                 "theme": "Traffic",
##                 "units": "Passenger Car Units"
##             }
##         },
##         "Average Speed": {
##             "data": {
##                 "2017-02-01 13:21:46": 35.0
##             },
##             "meta": {
##                 "name": "Average Speed",
##                 "theme": "Traffic",
##                 "units": "KmPH"
##             }
##         }
##     },
##     "latest": "2017-02-21T08:33:06",
##     "base_height": null,
##     "sensor_height": null,
##     "name": "N05171T",
##     "source": {
##         "web_display_name": "NE Travel Data API (Third Party)",
##         "third_party": true,
##         "db_name": "Scoot Netravel Api",
##         "document": "",
##         "fancy_name": "NE Travel Data API"
##     }
## }
## 

All but one of the readings have been stripped for each data object to emphasize the structure of the JSON returned by the API. We can see that sensor N05171T records traffic flow, congestion, and average speed. We can provide a further (optional) field to the Urban Observatory API to limit the results to return only one these. Let’s consider the only the average speed, measures in kmph. This is an unusual unit for the UK, as road speed is measured in miles per hour.

Parsing the JSON in Scala

There are many JSON parsing libraries in Scala, including my favourite Circe which is a Typelevel project, providing generic parsers for case classes without additional boilerplate. However, Spray JSON and Akka HTTP work well together, so that is what we will be using today. In order to complete this tutorial, you will need the Akka HTTP and Spray JSON dependencies in your build.sbt file.

libraryDependencies ++=  Seq(
  "com.typesafe.akka" %% "akka-stream" % "2.4.17",
  "com.typesafe.akka" %% "akka-http" % "10.0.3",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.0")

Firstly, we describe the data we are interested in, in a collection of case classes representing the JSON data:

case class Sensor(name: String, data: SensorData)
case class SensorData(averageSpeed: AverageSpeed)
case class AverageSpeed(meta: Meta, data: Map[String, Double])
case class Meta(units: String, theme: String, name: String)

We extract the sensor name, and associated data, without bothering with the additional top-level fields. The sensor data field contains only average speed, by appending &variable=average speed to the end of the HTTP Get request.

Now we have a domain model for the sensor data, we must provide a way for Spray JSON to parse the JSON to the case classes:

import spray.json._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport

trait Protocols extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val metaFormat: RootJsonFormat[Meta] = 
    jsonFormat(Meta.apply, "units", "theme", "name")
   implicit val averagespeedFormat: RootJsonFormat[AverageSpeed] = 
    jsonFormat(AverageSpeed.apply, "meta", "data")
   implicit val sensorDataFormat: RootJsonFormat[SensorData] = 
    jsonFormat(SensorData.apply, "Average Speed")
  implicit val sensorFormat: RootJsonFormat[Sensor] = 
    jsonFormat(Sensor.apply, "name", "data")
}

This trait can be mixed in when the time comes to parse the JSON data. Let’s first test the JSON parsing by directly reading in the JSON we get when running the request in a web browser:

object TestJson extends App with Protocols {
  val json_string = scala.io.Source.fromFile("data/traffic_sensor.json").getLines.mkString

  json_string.
    parseJson.
    convertTo[List[Sensor]].
    foreach(println)
}

There’s a bit going on here, first we have a file which contains the JSON String, this could have been pasted in directly to Scala. Then the string is parsed, this is possible since the TestJson object has the JSON Protocols trait we defined earlier mixed in using with.

Making an API Request using Akka HTTP

In order to make an API request using Akka HTTP, we utilise the high-level client API based on Scala futures:

import akka.actor.ActorSystem
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import HttpMethods._
import akka.stream.ActorMaterializer
import Uri.Query
import scala.concurrent.Future

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher

val uri = Uri("http://uoweb1.ncl.ac.uk/api/v1/sensor/data/raw.json")
val api_key = // your api key here

  val query: Query = Query("api_key" -> api_key,
    "sensor_name" -> "N05171T",
    "start_time" -> "20170201",
    "end_time" -> "20170202",
    "variable" -> "average speed")

val res: Future[HttpResponse] = 
  Http().singleRequest(HttpRequest(GET, uri = uri.withQuery(query)))

We have created a Query object, which is just a sequence of (String, Sting), denoted using the nicer arrow syntax. The request is initialised simply as a singleRequest which retuns a Future containing the HttpResponse. The HttpResponse object contains the status (200 OK, 404 Not Found etc.), and importantly the body content, called HttpEntity, the HttpEntity in this case is simply the JSON. In order to verify we are able to make a requst to the Urban Observatory, we can match on the result of the future using andThen:

res andThen {
    case Success(response) => println(response)
    case Failure(ex) => println(ex)
  } onComplete {
    _ => system.terminate()
  }

andThen expects a function from a Try, an algebraic data type (ADT) which can be either Success or Failure. When the future completes, we shutdown the Actor System required by Akka HTTP. When you run this minimal example, you should receive a response similar to:

HttpResponse(200 OK,List(Date: Tue, 21 Feb 2017 09:27:24 GMT, Server: Apache/2.4.7 (Ubuntu), Vary
: Cookie, X-Frame-Options: SAMEORIGIN),HttpEntity.Chunked(application/json),HttpProtocol(HTTP/1.1
))

The server has returned 200 OK, some headers and some response data. In order to access the response data, we must convert it to a string, first we access the entity field of the HttpResponse, the get the results as a ByteString which can finally be parsed to a String:

val resp = response.
  entity.
  dataBytes.
  map(_.utf8String)

This is an Akka Stream, containing the response from the server. In order to parse it into the sensor data, we map over the string using the JSON parsing function we have previously seen:

resp.
  map(_.parseJson.convertTo[List[Sensor]]).
  runForeach(println)

This should print the parsed data to the console. There are a variety of other Akka Sinks which can be used to consume the HttpEntity, they can be found in the Akka docs overview of built in stages.

A complete working example is available in this Gist.