Overview

At the beginning of 2017, at Uala we figured out that it was the right moment to implement an analytics platform. We tried different options, searching for alternatives day by day, but nothing satisfied us at all: lots of them were highly priced, other required specific cloud provider in order to work (mainly AWS, but they want you to use S3 for data synchronization, EC2 instances and Redshift), other simply did not satisfy our needs.

At that point, there were no alternatives to…build our own analytics platform.

Data collection

Searching across the web for an open sourced (and) adaptable analytics solution, we found out that there were several companies that shared (also broken) small fragments of their analytics platforms on GitHub: we thought we could take some ideas and parts already developed to be able to take cues in a wider development.

The first part where to focus on was the data collection: we put our efforts in order to create a system that receives data in a standard and interoperable format (this is one of the main principle of open sourcing things, isn’t it?). Segment was a great inspiration: they open sourced all the analytics clients for most of the platforms, including Android, iOS and JS.

We forked their repositories, changed API endpoint for our future analytics API endpoints, modified several ingestion logics, lightened them and repacked.

Data ingestion

The server part was the most difficult: in all the solutions found, the missing thing was a system that ingest the data to save them in some storage. We were about to give up and start developing a custom web server from scratch, but at the end with a stroke of luck we found a public repository of a Go-lang webserver, however broken.

So, we rolled up our sleeves and debugged the code and corrected the errors, finally managing to make the webserver work. The operation of the system is as follows: the webserver exposes REST API to clients and pushes data on a Kafka topic.

During our analysis of all data analytics web platforms, we understood that Kafka is used by most of the solutions for managing data transmission for its ability to handle huge amount of data in a solid way; it is coordinated by a Zookeeper cluster.

The result is available at this GitHub repository, along with the Docker image. A simple configuration can be the following:

EA_DEBUG_MODE=false
EA_KAFKA_BROKERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
EA_KAFKA_TOPIC=events
EA_MESSAGE_WRITER=kafka
EA_PROMETHEUS_ENABLED=false
EA_HEALTHCHECK_ENABLED=false
EA_PPROF_ENABLED=false
EA_QUEUE_BUFFERING_DELAY_MS=100
EA_API_PORT=80
EA_AUTHORIZED_WRITE_KEYS=yoursecretkeyhere
EA_ALLOWED_ORIGINS=/https?://(?:.+\.)?example\.com/g

We setup a 5 nodes Zookeeper cluster, distributed over 3 datacenters on Europe, in order to ensure high availability for the data ingestion part.

We choosed a Docker image provided by Confluent, publicly available on the DockerHub and frequently updated. The configuration was quite simple: we just set up the right environment variables and and that’s it.

ZOOKEEPER_CLIENT_PORT='2181' 
ZOOKEEPER_SERVER_ID='1' 
ZOOKEEPER_SERVERS=zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888;zookeeper-4:2888:3888;zookeeper-5:2888:3888 
JMX_PORT='9585'

The ZOOKEEPER_SERVER_ID variable is a progressive positive number, from 1 to 5 across the nodes.

Our Zookeeper cluster is ready, available at:
zookeeper-1
zookeeper-2
zookeeper-3
zookeeper-4
zookeeper-5

After configuring the variables for Zookeeper, it’s time now to switch to the Kafka cluster configuration: to do that, we used the Confluent Docker image as well. The environment variables we set on Kafka containers are the following:

KAFKA_ZOOKEEPER_CONNECT=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181,zookeeper-4:2181,zookeeper-5:2181
KAFKA_BROKER_ID='1'
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9092
HOSTNAME=kafka-1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR='1'
JMX_PORT='9585'
KAFKA_LOG4J_ROOT_LOGLEVEL=WARN
KAFKA_LOG4J_LOGGERS= '"kafka=WARN,kafka.producer.async.DefaultEventHandler=WARN,kafka.request.logger=WARN,kafka.controller=WARN,state.change.logger=INFO,kafka.server.epoch=WARN"'

ItIt is not a complex configuration, however it also requires that the hostname inside the container is set as the HOSTNAME env, otherwise the server won’t start.

Ok, Kafka cluster is now available at:
kafka-1
kafka-2
kafka-3

And finally events are now stored! At least, unless the retention limit of Kafka reached it’s limit.

Storage

Kafka isn’t designed to store data for ever, it’s a pub-sub broker. It’s also difficult to query data (KSQL can help you on this) and certainly is not optimized for that purpose. So it is definitely fundamental to save data using a real DB.

After a lot of research and analysis of numerous databases like Hive, Hadoop with all their pros and cons, the solution came in thinking differently: Yandex developed a proprietary storage solution to satisfy the analytics platform need of Yandex Metrica (the nerd brother of Google Analytics). ClickHouse (the solution) is by far the best among non-relational databases and, proud of the performances achieved by the Russian solution, was open sourced in 2016. It’s a columnar database, blazing fast. Seriously, blazing fast.

A multiple node setup requires Zookeeper in order to synchronize and maintain shards and replicas: thus, the cluster created earlier can be used for the ClickHouse setup too. For our scope, we designed a structure of 3 shards, each of this with 1 replica, so:
clickhouse-1 clickhouse-1-replica
clickhouse-2 clickhouse-2-replica
clickhouse-3 clickhouse-3-replica

There aren’t particular configurations using env variables, but we need to define the cluster in /etc/clickhouse-server/config.xml.

<?xml version="1.0"?>
<yandex>
  ...
  <remote_servers incl="clickhouse_remote_servers" >
    <event_cluster>
      <shard>
        <replica>
          <host>clickhouse-1</host>
          <port>9000</port>
        </replica>
        <replica>
          <host>clickhouse-1-replica</host>
          <port>9000</port>
        </replica>
      </shard>
      <shard>
        <replica>
          <host>clickhouse-2</host>
          <port>9000</port>
        </replica>
        <replica>
          <host>clickhouse-2-replica</host>
          <port>9000</port>
        </replica>
      </shard>
      <shard>
        <replica>
          <host>clickhouse-3</host>
          <port>9000</port>
        </replica>
        <replica>
          <host>clickhouse-3-replica</host>
          <port>9000</port>
        </replica>
      </shard>
    </event_cluster>
  </remote_servers>
  <zookeeper incl="zookeeper-servers">
    <node>
      <host>zookeeper-1</host>
      <port>2181</port>
    </node>
    <node>
      <host>zookeeper-2</host>
      <port>2181</port>
    </node>
    <node>
      <host>zookeeper-3</host>
      <port>2181</port>
    </node>
    <node>
      <host>zookeeper-4</host>
      <port>2181</port>
    </node>
    <node>
      <host>zookeeper-5</host>
      <port>2181</port>
    </node>
  </zookeeper>
  <macros incl="macros">
    <shard>01</shard>
    <replica>01</replica>
  </macros>
  ...
</yandex>

It is important to keep in mind some important notes:

  • event_cluster is the name of the ClickHouse cluster we’re creating, you can choose what you want
  • the zookeeper part requires all Zookeeper cluster’s nodes
  • macros will be used on replicated tables creations, so we need to properly set it: clickhouse-1 will have shard: 01, replica: 01, clickhouse-1-replica will have shard: 01, replica: 02, clickhouse-2 will have shard: 02, replica: 01, and so on.

We keep this configuration mounting a volume on the config.xml path. Another volume has to be mounted on /var/lib/clickhouse for data persistence.

Now the ClickHouse cluster is ready to accept connections and it’s time to move on to defining tables that will be used for data storage. ReplicatedMergeTree engine was therefore chosen on which it fell for the tables management, thanks to the robust table engine provided and the data sharding and replication features across nodes; here it’s posted the usage of macros defined previously.

CREATE TABLE events_clickhouse(
  anonymousId String,
  event String,
  name String,
  groupId String,
  properties Nested(
    key String,
    value String
  ),
  traits Nested(
    key String,
    value String
  ),

  context_app_name String,
  context_app_version String,
  context_app_build String,
  context_app_namespace String,

  context_campaign_name String,
  context_campaign_source String,
  context_campaign_medium String,
  context_campaign_term String,
  context_campaign_content String,

  context_device_id String,
  context_device_manufacturer String,
  context_device_model String,
  context_device_name String,
  context_device_type String,
  context_device_version String,
  context_device_advertisingId String,

  context_library_name String,
  context_library_version String,

  context_location_city String,
  context_location_country String,
  context_location_region String,
  context_location_latitude Float64,
  context_location_longitude Float64,
  context_location_speed Float64,

  context_network_bluetooth UInt8,
  context_network_wifi UInt8,
  context_network_cellular UInt8,
  context_network_carrier String,

  context_os_name String,
  context_os_version String,

  context_page_hash String,
  context_page_path String,
  context_page_referrer String,
  context_page_search String,
  context_page_title String,
  context_page_url String,

  context_referrer_type String,
  context_referrer_name String,
  context_referrer_url String,
  context_referrer_link String,

  context_screen_density Float64,
  context_screen_width Int32,
  context_screen_height Int32,

  context_ip String,
  context_locale String,
  context_timezone String,
  context_userAgent String,
  context_traits Nested(
    key String,
    value String
  ),

  integrations Nested(
    key String,
    value String
  ),
  messageId String,
  originalTimestamp DateTime,
  receivedAt DateTime,
  sentAt DateTime,
  timestamp DateTime,
  date Date,
  type String,
  userId String,
  version String,
  writeKey String
) ENGINE = ReplicatedMergeTree(
    '/event_cluster/tables/{shard}/events_clickhouse',
    '{replica}',
    date,
    (date, messageId),
    8192);

After the table creatino across all the nodes, the second step is to setup the Distributed global table that aggregates data from all shards and replicas. And it is at this point that the help arrived by reading the Altinity’s blog post.

CREATE TABLE events_clickhouse_global AS events_clickhouse
ENGINE = Distributed(event_cluster, default, events_clickhouse, rand());

The configuration, after this passage has come to be the following: 6-nodes ClickHouse cluster, 3 shards and 3 replicas (1 for each shard). Now it’s time to open the vents and let the data start arriving at the destination.

Connecting things

But, as for all the best moments, there was an unexpected:: ClickHouse won’t accept unstructured nested data like JSON, and the Segment’s analytics library sends data using nested objects, so we need to make them flatten.

To solve this blocking situation, numerous researches have led to a clear solution: Apache Nifi. So, after configuring the cluster and the Kafka input topic, a JOLT transform process allows to flat data and match the ClickHouse table definition, giving as output another Kafka topic. Unluckily, after several trials and different approaches for Nifi cluster, nothing seemed to work (old version, strange configurations, …). At that point we forked a GitHub repository and created a custom Docker image (public available) to obtain a fully working cluster setup. There’s one file that contains Nifi’s configuration (/opt/nifi/conf/nifi.properties), so we mounted it as a volume and configured these parts:

...
# cluster node properties (only configure for cluster nodes) #
nifi.cluster.is.node=true
nifi.cluster.node.address=nifi-1
...
# zookeeper properties, used for cluster management #
nifi.zookeeper.connect.string=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181,zookeeper-4:2181,zookeeper-5:2181
nifi.zookeeper.connect.timeout=3 secs
nifi.zookeeper.session.timeout=3 secs
nifi.zookeeper.root.node=/nifi
...

The technical architecture has now reached 3 nodes cluster, so the setting nifi.cluster.node.address will have nifi-1, nifi-2, nifi-3.

It is necessary that those nodes can communicate with Zookeeper and Kafka cluster.

Nifi processes structure, GetKafka reads from event API topic, PutKafka (success) publish to a new topic called events_clickhouse_flatten and PutKafka (failure) publish to a new topic called events_clickhouse_failures

The JOLT transform process uses this specification:

[{
  "operation": "shift",
  "spec": {
    "context": {
      "app": {
        "*": "context_app_&"
      },
      "campaign": {
        "*": "context_campaign_&"
      },
      "device": {
        "*": "context_device_&"
      },
      "library": {
        "*": "context_library_&"
      },
      "location": {
        "*": "context_location_&"
      },
      "network": {
        "*": "context_network_&"
      },
      "os": {
        "*": "context_os_&"
      },
      "page": {
        "*": "context_page_&"
      },
      "referrer": {
        "*": "context_referrer_&"
      },
      "screen": {
        "*": "context_screen_&"
      },
      "*": "context_&"
    },
    "*": "&"
  }
}, {
  "operation": "shift",
  "spec": {
    "traits": {
      "*": {
        "$": "traits\\.key[#2]",
        "@": "traits\\.value[#2]"
      }
    },
    "properties": {
      "*": {
        "$": "properties\\.key[#2]",
        "@": "properties\\.value[#2]"
      }
    },
    "context_traits": {
      "*": {
        "$": "context_traits\\.key[#2]",
        "@": "context_traits\\.value[#2]"
      }
    },
    "integrations": {
      "*": {
        "$": "integrations\\.key[#2]",
        "@": "integrations\\.value[#2]"
      }
    },
    "*": "&"
  }
}, {
  "operation": "modify-overwrite-beta",
  "spec": {
    "traits.value": "=toString",
    "properties.value": "=toString",
    "context_traits.value": "=toString",
    "integrations.value": "=toString"
  }
}]

Starting the nodes defined on nifi, we start to produce flatten events on the new topic kafka events_clickhouse_flatten, it’s time to take data from Kafka and enter them in ClickHouse. There’s a Kafka engine available on ClickHouse, that reads from a Kafka topic and make it available inside the DB as a consumer, so each message can be read only once.

CREATE TABLE events_clickhouse_kafka(
  anonymousId String,
  event String,
  name String,
  groupId String,
  properties Nested(
    key String,
    value String
  ),
  traits Nested(
    key String,
    value String
  ),

  context_app_name String,
  context_app_version String,
  context_app_build String,
  context_app_namespace String,

  context_campaign_name String,
  context_campaign_source String,
  context_campaign_medium String,
  context_campaign_term String,
  context_campaign_content String,

  context_device_id String,
  context_device_manufacturer String,
  context_device_model String,
  context_device_name String,
  context_device_type String,
  context_device_version String,
  context_device_advertisingId String,

  context_library_name String,
  context_library_version String,

  context_location_city String,
  context_location_country String,
  context_location_region String,
  context_location_latitude Float64,
  context_location_longitude Float64,
  context_location_speed Float64,

  context_network_bluetooth UInt8,
  context_network_wifi UInt8,
  context_network_cellular UInt8,
  context_network_carrier String,

  context_os_name String,
  context_os_version String,

  context_page_hash String,
  context_page_path String,
  context_page_referrer String,
  context_page_search String,
  context_page_title String,
  context_page_url String,

  context_referrer_type String,
  context_referrer_name String,
  context_referrer_url String,
  context_referrer_link String,

  context_screen_density Float64,
  context_screen_width Int32,
  context_screen_height Int32,

  context_ip String,
  context_locale String,
  context_timezone String,
  context_userAgent String,
  context_traits Nested(
    key String,
    value String
  ),

  integrations Nested(
    key String,
    value String
  ),
  messageId String,
  originalTimestamp String,
  receivedAt String,
  sentAt String,
  timestamp String,
  type String,
  userId String,
  version String,
  writeKey String
) ENGINE = Kafka('kafka-1:9092,kafka-2:9092,kafka-3:9092', 'events_clickhouse', 'events_clickhouse_consumer_group', 'JSONEachRow');

At this point, the fundamental part is making data accessible many times, not only once, so we followed the official documentation and created a materialized view over this Kafka table, that will write to the DistributedMergeTree table we built before.

CREATE MATERIALIZED VIEW events_flatten_consumer TO events_clickhouse AS
SELECT
  anonymousId,
  event,
  name,
  groupId,
  properties.key,
  properties.value,
  traits.key,
  traits.value,

  context_app_name,
  context_app_version,
  context_app_build,
  context_app_namespace,

  context_campaign_name,
  context_campaign_source,
  context_campaign_medium,
  context_campaign_term,
  context_campaign_content,

  context_device_id,
  context_device_manufacturer,
  context_device_model,
  context_device_name,
  context_device_type,
  context_device_version,
  context_device_advertisingId,

  context_library_name,
  context_library_version,

  context_location_city,
  context_location_country,
  context_location_region,
  context_location_latitude,
  context_location_longitude,
  context_location_speed,

  context_network_bluetooth,
  context_network_wifi,
  context_network_cellular,
  context_network_carrier,

  context_os_name,
  context_os_version,

  context_page_hash,
  context_page_path,
  context_page_referrer,
  context_page_search,
  context_page_title,
  context_page_url,

  context_referrer_type,
  context_referrer_name,
  context_referrer_url,
  context_referrer_link,

  context_screen_density,
  context_screen_width,
  context_screen_height,

  context_ip,
  context_locale,
  context_timezone,
  context_userAgent,
  context_traits.key,
  context_traits.value,

  integrations.key,
  integrations.value,
  messageId,
  toDateTime(originalTimestamp) AS originalTimestamp,
  toDateTime(receivedAt) AS receivedAt,
  toDateTime(sentAt) AS sentAt,
  toDateTime(timestamp) AS timestamp,
  toDate(toDateTime(timestamp)) AS date,
  type,
  userId,
  version,
  writeKey
FROM events_clickhouse_kafka;

Finally everything is ready now and all the parts are connected: sending the events from any client all the data will be saved inside ClickHouse cluster.

Ok, we have data now, and then?

Well, this question is obviously right.

At an early stage, we were tracking data and analyzing them through Apache Superset, that allows SQL queries on ClickHouse, providing different charts and dashboards. We used this Docker image, easy to set up thanks to the explanations on the DockerHub page. To configure the DB the GUI comes to support: it’s very simple and intuitive, SQL lab also allows anybody to write raw queries.

Business intelligence systems built over this structure needs a proper explanation, so, stay tuned!

P.S.: we are only at the beginning: it is essential to start collecting many data points. It’s easier to see some BI results on hundred of millions of event, rather that few thousand.