Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes
Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive
Apache Kafka on Kubernetes series:
Kafka on Kubernetes - using etcd
This is the third post in the Spark on Kubernetes series - if you missed the first and second ones, check them out here:
Our PaaS, Pipeline, deploys cloud native applications to Kubernetes using a CI/CD workflow. As a complementary feature, these applications are monitored - with out-of-the-box dashboards and alerts preconfigured and based on application language, type, cluster size and predefined SLAs (these values can be changed). Centralized log collection, tracing and visualization are also part of the platform.
Note: The Pipeline CI/CD module mentioned in this post is outdated and not available anymore. You can integrate Pipeline to your CI/CD solution using the Pipeline API. Contact us for details.
Pipeline monitors multiple federated clusters with Prometheus - the secure way
To monitor these applications, and in this particular Spark example, we needed a robust, opensource monitoring system. We chose Prometheus. Prometheus is an opensource monitoring and alert system that was open sourced in 2012. Since then, it’s become the standard
for monitoring in the industry. It’s also part of the official Cloud Native Computing Foundation project, so every Kubernetes
related component uses/will use Prometheus for monitoring and alerts.
Prometheus uses a pull
model over http to scrape data from applications. For batch jobs it also supports a push
model, but enabling this feature requires a special component called pushgateway. Let’s imagine this gateway as a rudimentary storage solution, which stores application metrics until Prometheus scrapes all the information from them. Once Prometheus has collected the application metrics, all that’s left to us is to select a visualizer tool which is capable of working alongside Prometheus.
The ultimate tool for that job is Grafana. Grafana is an opensource metric analytics and visualization tool. It has a nice UI and a rich set of default features.
If you search for monitoring Spark with Prometheus
on the internet, all you’ll find is an old blog from 2015 wherein someone was using a Graphite Sink to get metrics from Spark, then mapping it to the Prometheus format. We weren’t interested in that approach, so, to enable Spark monitoring via Prometheus, a couple of changes had to be made in the Spark Code base.
Spark only supports a handful of sinks out-of-the-box (Graphite, CSV, Ganglia), and Prometheus isn’t one of them, so we introduced a new Prometheus sink of our own (PR - with related Apache ticket SPARK-22343). Spark uses a push model to send metrics data, so a Prometheus pushgateway
is required. Metrics data published by Spark is based on Dropwizard, thus the metrics’ formatting is not supported natively by Prometheus, so any new metrics must be converted using DropwizardExports before being pushed to pushgateway.
Initially, we submitted these PRs to the Spark on K8S fork, as this is what’s deployed on our PaaS. However, based on the suggestions of the fork’s maintainers, we extracted all Kubernetes-specific features and re-submitted the PR in the upstream Apache repo as well, since this feature is also useful to that community.
The latest version of pushgateway disallows the receiving of any message that contains a timestamp (see the related PR), so we had to opt for an earlier version which supported them. We’re planning to introduce our version of the pushgateway the same way Weaveworks did theirs (see it on Github) in order to overcome issues and to flesh-out a few advanced scenarios.
The reason why the timestamp is essential is as follows: we wanted metrics data that can be represented in histograms, so historical metrics data could be used for smart scaling of Kubernetes clusters in order to meet the different SLAs defined in our Pipeline spotguides. By default, Prometheus protocol supports timestamps, but if metrics come from the pushgateway
and contain no timestap, Prometheus adds a default timestamp, which is the time that the data was scraped from the pushgateway. Of course, we were not satisfied with that approach.
The default Prometheus pushgateway
API does not support metrics timestamps, so this API has been enhanced to enrich metrics data with them.
public void pushAdd(CollectorRegistry registry,
String job, String timestamp) throws IOException {
doRequest(registry, job, null, "POST", timestamp);
}
Because Spark does not include metric timestamps, we inject them into every metric before reporting them to the pushgateway.
pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava,
s"${System.currentTimeMillis}")
Inside the Spark codebase there’s a file, metrics.properties.template, wherein the user can fine tune which metrics to get and where it is that these metrics are processed.
# Enable Prometheus for all instances by class name
*.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusSink
# Prometheus pushgateway address
*.sink.prometheus.pushgateway-address-protocol=<prometheus pushgateway protocol> - defaults to http
*.sink.prometheus.pushgateway-address=<prometheus pushgateway address> - defaults to 127.0.0.1:9091
*.sink.prometheus.unit=< unit> - defaults to seconds (TimeUnit.SECONDS)
*.sink.prometheus.period=<period> - defaults to 10
# Enable JVM metrics source for all instances by class name
*.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Spark only provides a metrics.template.properties
file to help enable metrics. A proper metrics.properties file has to be created, and during the application submission the following configuration value has to be set to the path of metrics.properties
. This file must be reachable by every Spark component.
--conf spark.metrics.conf=<path_to_the_file>/metrics.properties
Also by default, Spark injects spark.app.id
into the metrics, so the data can be differentiated. Unfortunately, clustering data via this field is hard, because it’s a random string generated on the fly. Spark provides a way of changing this behavior by setting the spark.metrics.namespace
configuration property (for further details, please check the official spark page). To further sort metrics, Spark names a few metrics sources (e.g.: Executor, Driver) but not the Shuffle service, so we created another PR for that.
} else if (instance == "shuffleService") {
MetricRegistry.name(metricsNamespace.getOrElse(""), defaultName)
To make it even easier to slice and dice Spark metrics in Prometheus, we group them by the following keys (metricsnamespace/role/id), where:
- metricsnamespace: is the value passed into conf
spark.metrics.namespace
- role: is the Spark component the metrics originate from (driver/executor/shuffle)
- id: this one is optional, is set only for metrics coming from executors, and represents the identifier of the executor
The following table illustrates this grouping with a simple example:
Metrics | App id | MetricsNamespace | Role | Id | Prometheus Grouping Key |
---|---|---|---|---|---|
spark-prometheus_sample-memory_consumption | job1 | prometheus_sample | driver | - | spark-prometheus_sample/prometheus_sample/driver/memory_consumption |
spark-prometheus_sample-memory_consumption | job1 | prometheus_sample | executor | 1 | spark-prometheus_sample/prometheus_sample/executor/1/memory_consumption |
spark-job2-cpu_usage | job2 | job2 | driver | - | spark-job2/job2/driver/cpu_usage |
The full implementation of our Promethes Sink
can be found here.
Finally, here is the architecture that allows Pipeline to monitor the Spark cluster.
We’ll follow up on this post with additional information on how we use metrics to scale our clusters or enforce custom SLAs.