This article is co-written by me and my colleague Kai Dai. We are both data platform engineers at Tencent Music (NYSE: TME), a music streaming service provider with a whopping 800 million monthly active users. To drop the number here is not to brag but to give a hint of the sea of data that my poor coworkers and I have to deal with everyday.

What We Use ClickHouse For

The music library of Tencent Music contains data of all forms and types: recorded music, live music, audios, videos, etc. As data platform engineers, our job is to distill information from the data, based on which our teammates can make better decisions to support our users and musical partners.

Specifically, we do all-round analysis of the songs, lyrics, melodies, albums, and artists, turn all this information into data assets, and pass them to our internal data users for inventory counting, user profiling, metrics analysis, and group targeting.

We stored and processed most of our data in Tencent Data Warehouse (TDW), an offline data platform where we put the data into various tag and metric systems and then created flat tables centering each object (songs, artists, etc.).

Then we imported the flat tables into ClickHouse for analysis and Elasticsearch for data searching and group targeting.

After that, our data analysts used the data under the tags and metrics they needed to form datasets for different usage scenarios, during which they could create their own tags and metrics.

The data processing pipeline looked like this:

The Problems with ClickHouse

When working with the above pipeline, we encountered a few difficulties:

  1. Partial Update: Partial update of columns was not supported. Therefore, any latency from any one of the data sources could delay the creation of flat tables, and thus undermine data timeliness.

  2. High storage cost: Data under different tags and metrics was updated at different frequencies. As much as ClickHouse excelled in dealing with flat tables, it was a huge waste of storage resources to just pour all data into a flat table and partition it by day, not to mention the maintenance cost coming with it.

  3. High maintainence cost: Architecturally speaking, ClickHouse was characterized by the strong coupling of storage nodes and compute nodes. Its components were heavily interdependent, adding to the risks of cluster instability. Plus, for federated queries across ClickHouse and Elasticsearch, we had to take care of a huge amount of connection issues. That was just tedious.

Transition to Apache Doris

Apache Doris, a real-time analytical database, boasts a few features that are exactly what we needed in solving our problems:

  1. Partial update: Doris supports a wide variety of data models, among which the Aggregate Model supports real-time partial update of columns. Building on this, we can directly ingest raw data into Doris and create flat tables there. The ingestion goes like this: Firstly, we use Spark to load data into Kafka; then, any incremental data will be updated to Doris and Elasticsearch via Flink. Meanwhile, Flink will pre-aggregate the data so as to release burden on Doris and Elasticsearch.

  2. Storage cost: Doris supports multi-table join queries and federated queries across Hive, Iceberg, Hudi, MySQL, and Elasticsearch. This allows us to split the large flat tables into smaller ones and partition them by update frequency. The benefits of doing so include a relief of storage burden and an increase of query throughput.

  3. Maintenance cost: Doris is of simple architecture and is compatible with MySQL protocol. Deploying Doris only involves two processes (FE and BE) with no dependency on other systems, making it easy to operate and maintain. Also, Doris supports querying external ES data tables. It can easily interface with the metadata in ES and automatically map the table schema from ES so we can conduct queries on Elasticsearch data via Doris without grappling with complex connections.

What's more, Doris supports multiple data ingestion methods, including batch import from remote storage such as HDFS and S3, data reads from MySQL binlog and Kafka, and real-time data synchronization or batch import from MySQL, Oracle, and PostgreSQL. It ensures service availability and data reliability through a consistency protocol and is capable of auto debugging. This is great news for our operators and maintainers.

Statistically speaking, these features have cut our storage cost by 42% and development cost by 40%.

During our usage of Doris, we have received lots of support from the open source Apache Doris community and timely help from the SelectDB team, which is now running a commercial version of Apache Doris.

Further Improvement to Serve Our Needs

Introduce a Semantic Layer

Speaking of the datasets, on the bright side, our data analysts are given the liberty of redefining and combining the tags and metrics at their convenience. But on the dark side, high heterogeneity of the tag and metric systems leads to more difficulty in their usage and management.

Our solution is to introduce a semantic layer in our data processing pipeline. The semantic layer is where all the technical terms are translated into more comprehensible concepts for our internal data users. In other words, we are turning the tags and metrics into first-class citizens for data definement and management.

Why would this help?

For data analysts, all tags and metrics will be created and shared at the semantic layer so there will be less confusion and higher efficiency.

For data users, they no longer need to create their own datasets or figure out which one is applicable for each scenario but can simply conduct queries on their specified tagset and metricset.

Upgrade the Semantic Layer

Explicitly defining the tags and metrics at the semantic layer was not enough. In order to build a standardized data processing system, our next goal was to ensure consistent definition of tags and metrics throughout the whole data processing pipeline.

For this sake, we made the semantic layer the heart of our data management system:

How does it work?

All computing logics in TDW will be defined at the semantic layer in the form of a single tag or metric.

The semantic layer receives logic queries from the application side, selects an engine accordingly, and generates SQL. Then it sends the SQL command to TDW for execution. Meanwhile, it might also send configuration and data ingestion tasks to Doris and decide which metrics and tags should be accelerated.

In this way, we have made the tags and metrics more manageable. A fly in the ointment is that since each tag and metric is individually defined, we are struggling with automating the generation of a valid SQL statement for the queries. If you have any idea about this, you are more than welcome to talk to us.

Give Full Play to Apache Doris

As you can see, Apache Doris has played a pivotal role in our solution. Optimizing the usage of Doris can largely improve our overall data processing efficiency. So in this part, we are going to share with you what we do with Doris to accelerate data ingestion and queries and reduce costs.

What We Want?

Currently, we have 800+ tags and 1300+ metrics derived from the 80+ source tables in TDW. When importing data from TDW to Doris, we hope to achieve:

What We Do?

  1. Generate Flat Tables in Flink Instead of TDW

Generating flat tables in TDW has a few downsides:

On the contrary, generating flat tables in Doris is much easier and less expensive. The process is as follows:

This can largely reduce storage costs since TDW no long has to maintain two copies of data and KafKa only needs to store the new data pending for ingestion. What's more, we can add whatever ETL logic we want into Flink and reuse lots of development logic for offline and real-time data ingestion.

  1. Name the Columns Smartly

As we mentioned, the Aggregate Model of Doris allows partial update of columns. Here we provide a simple introduction to other data models in Doris for your reference:

Unique Model: This is applicable for scenarios requiring primary key uniqueness. It only keeps the latest data of the same primary key ID. (As far as we know, the Apache Doris community is planning to include partial update of columns in the Unique Model, too.)

Duplicate Model: This model stores all original data exactly as it is without any pre-aggregation or deduplication.

After determining the data model, we had to think about how to name the columns. Using the tags or metrics as column names was not a choice because:

Ⅰ. Our internal data users might need to rename the metrics or tags, but Doris 1.1.3 does not support modification of column names.

Ⅱ. Tags might be taken online and offline frequently. If that involves the adding and dropping of columns, it will be not only time-consuming but also detrimental to query performance. Instead, we do the following:

Noteworthily, the recently released Doris 1.2.0 supports Light Schema Change, which means that to add or remove columns, you only need to modify the metadata in FE. Also, you can rename the columns in data tables as long as you have enabled Light Schema Change for the tables. This is a big trouble saver for us.

  1. Optimize Date Writing

Here are a few practices that have reduced our daily offline data ingestion time by 75% and our CUMU compaction score from 600+ to 100.

  1. Use Dori-on-ES in Queries

    About 60% of our data queries involve group targeting. Group targeting is to find our target data by using a set of tags as filters. It poses a few requirements for our data processing architecture:

After consideration, we decided to adopt Doris-on-ES. Doris is where we store the metric data for each scenario as a partition table, while Elasticsearch stores all tag data. The Doris-on-ES solution combines the distributed query planning capability of Doris and the full-text search capability of Elasticsearch. The query pattern is as follows:

SELECT tag, agg(metric) 
   FROM Doris 
   WHERE id in (select id from Es where tagFilter)
   GROUP BY tag

As is shown, the ID data located in Elasticsearch will be used in the sub-query in Doris for metric analysis. In practice, we find that the query response time is related to the size of the target group. If the target group contains over one million objects, the query will take up to 60 seconds. If it is even larger, a timeout error might occur. After investigation, we identified our two biggest time wasters:

I. When Doris BE pulls data from Elasticsearch (1024 lines at a time by default), for a target group of over one million objects, the network I/O overhead can be huge.

II. After the data pulling, Doris BE needs to conduct Join operations with local metric tables via SHUFFLE/BROADCAST, which can cost a lot.

Thus, we make the following optimizations:

  1. Refine the Management of Data

Doris' capability of cold and hot data separation provides the foundation of our cost reduction strategies in data processing.

Doris supports turning hot data into cold data so we only store data of the past seven days in SSD and transfer data older than that to HDD for less expensive storage.

Conclusion

Thank you for scrolling all the way down here and finishing this long read. We've shared our cheers and tears, lessons learned, and a few practices that might be of some value to you during our transition from ClickHouse to Doris. We really appreciate the help from the Apache Doris community and the SelectDB team, but we might still be chasing them around for a while since we attempt to realize auto-identification of cold and hot data, pre-computation of frequently used tags/metrics, simplification of code logic using Materialized Views, and so on and so forth.