Skip to main content

11 posts tagged with "Best Practices"

View All Tags

Best Practices

This article is co-written by me and my colleague Kai Dai. We are both data platform engineers at Tencent Music (NYSE: TME), a music streaming service provider with a whopping 800 million monthly active users. To drop the number here is not to brag but to give a hint of the sea of data that my poor coworkers and I have to deal with everyday.

What We Use ClickHouse For?

The music library of Tencent Music contains data of all forms and types: recorded music, live music, audios, videos, etc. As data platform engineers, our job is to distill information from the data, based on which our teammates can make better decisions to support our users and musical partners.

Specifically, we do all-round analysis of the songs, lyrics, melodies, albums, and artists, turn all this information into data assets, and pass them to our internal data users for inventory counting, user profiling, metrics analysis, and group targeting.

We stored and processed most of our data in Tencent Data Warehouse (TDW), an offline data platform where we put the data into various tag and metric systems and then created flat tables centering each object (songs, artists, etc.).

Then we imported the flat tables into ClickHouse for analysis and Elasticsearch for data searching and group targeting.

After that, our data analysts used the data under the tags and metrics they needed to form datasets for different usage scenarios, during which they could create their own tags and metrics.

The data processing pipeline looked like this:

The Problems with ClickHouse

When working with the above pipeline, we encountered a few difficulties:

  1. Partial Update: Partial update of columns was not supported. Therefore, any latency from any one of the data sources could delay the creation of flat tables, and thus undermine data timeliness.
  2. High storage cost: Data under different tags and metrics was updated at different frequencies. As much as ClickHouse excelled in dealing with flat tables, it was a huge waste of storage resources to just pour all data into a flat table and partition it by day, not to mention the maintenance cost coming with it.
  3. High maintenance cost: Architecturally speaking, ClickHouse was characterized by the strong coupling of storage nodes and compute nodes. Its components were heavily interdependent, adding to the risks of cluster instability. Plus, for federated queries across ClickHouse and Elasticsearch, we had to take care of a huge amount of connection issues. That was just tedious.

Transition to Apache Doris

Apache Doris, a real-time analytical database, boasts a few features that are exactly what we needed in solving our problems:

  1. Partial update: Doris supports a wide variety of data models, among which the Aggregate Model supports real-time partial update of columns. Building on this, we can directly ingest raw data into Doris and create flat tables there. The ingestion goes like this: Firstly, we use Spark to load data into Kafka; then, any incremental data will be updated to Doris and Elasticsearch via Flink. Meanwhile, Flink will pre-aggregate the data so as to release burden on Doris and Elasticsearch.
  2. Storage cost: Doris supports multi-table join queries and federated queries across Hive, Iceberg, Hudi, MySQL, and Elasticsearch. This allows us to split the large flat tables into smaller ones and partition them by update frequency. The benefits of doing so include a relief of storage burden and an increase of query throughput.
  3. Maintenance cost: Doris is of simple architecture and is compatible with MySQL protocol. Deploying Doris only involves two processes (FE and BE) with no dependency on other systems, making it easy to operate and maintain. Also, Doris supports querying external ES data tables. It can easily interface with the metadata in ES and automatically map the table schema from ES so we can conduct queries on Elasticsearch data via Doris without grappling with complex connections.

What’s more, Doris supports multiple data ingestion methods, including batch import from remote storage such as HDFS and S3, data reads from MySQL binlog and Kafka, and real-time data synchronization or batch import from MySQL, Oracle, and PostgreSQL. It ensures service availability and data reliability through a consistency protocol and is capable of auto debugging. This is great news for our operators and maintainers.

Statistically speaking, these features have cut our storage cost by 42% and development cost by 40%.

During our usage of Doris, we have received lots of support from the open source Apache Doris community and timely help from the SelectDB team, which is now running a commercial version of Apache Doris.

Further Improvement to Serve Our Needs

Introduce a Semantic Layer

Speaking of the datasets, on the bright side, our data analysts are given the liberty of redefining and combining the tags and metrics at their convenience. But on the dark side, high heterogeneity of the tag and metric systems leads to more difficulty in their usage and management.

Our solution is to introduce a semantic layer in our data processing pipeline. The semantic layer is where all the technical terms are translated into more comprehensible concepts for our internal data users. In other words, we are turning the tags and metrics into first-class citizens for data definement and management.

Why would this help?

For data analysts, all tags and metrics will be created and shared at the semantic layer so there will be less confusion and higher efficiency.

For data users, they no longer need to create their own datasets or figure out which one is applicable for each scenario but can simply conduct queries on their specified tagset and metricset.

Upgrade the Semantic Layer

Explicitly defining the tags and metrics at the semantic layer was not enough. In order to build a standardized data processing system, our next goal was to ensure consistent definition of tags and metrics throughout the whole data processing pipeline.

For this sake, we made the semantic layer the heart of our data management system:

How does it work?

All computing logics in TDW will be defined at the semantic layer in the form of a single tag or metric.

The semantic layer receives logic queries from the application side, selects an engine accordingly, and generates SQL. Then it sends the SQL command to TDW for execution. Meanwhile, it might also send configuration and data ingestion tasks to Doris and decide which metrics and tags should be accelerated.

In this way, we have made the tags and metrics more manageable. A fly in the ointment is that since each tag and metric is individually defined, we are struggling with automating the generation of a valid SQL statement for the queries. If you have any idea about this, you are more than welcome to talk to us.

Give Full Play to Apache Doris

As you can see, Apache Doris has played a pivotal role in our solution. Optimizing the usage of Doris can largely improve our overall data processing efficiency. So in this part, we are going to share with you what we do with Doris to accelerate data ingestion and queries and reduce costs.

What We Want?

Currently, we have 800+ tags and 1300+ metrics derived from the 80+ source tables in TDW.

When importing data from TDW to Doris, we hope to achieve:

  • Real-time availability: In addition to the traditional T+1 offline data ingestion, we require real-time tagging.
  • Partial update: Each source table generates data through its own ETL task at various paces and involves only part of the tags and metrics, so we require the support for partial update of columns.
  • High performance: We need a response time of only a few seconds in group targeting, analysis and reporting scenarios.
  • Low costs: We hope to reduce costs as much as possible.

What We Do?

  1. Generate Flat Tables in Flink Instead of TDW

Generating flat tables in TDW has a few downsides:

  • High storage cost: TDW has to maintain an extra flat table apart from the discrete 80+ source tables. That’s huge redundancy.
  • Low real-timeliness: Any delay in the source tables will be augmented and retard the whole data link.
  • High development cost: To achieve real-timeliness would require extra development efforts and resources.

On the contrary, generating flat tables in Doris is much easier and less expensive. The process is as follows:

  • Use Spark to import new data into Kafka in an offline manner.
  • Use Flink to consume Kafka data.
  • Create a flat table via the primary key ID.
  • Import the flat table into Doris.

As is shown below, Flink has aggregated the five lines of data, of which “ID”=1, into one line in Doris, reducing the data writing pressure on Doris.

This can largely reduce storage costs since TDW no long has to maintain two copies of data and KafKa only needs to store the new data pending for ingestion. What’s more, we can add whatever ETL logic we want into Flink and reuse lots of development logic for offline and real-time data ingestion.

2. Name the Columns Smartly

As we mentioned, the Aggregate Model of Doris allows partial update of columns. Here we provide a simple introduction to other data models in Doris for your reference:

Unique Model: This is applicable for scenarios requiring primary key uniqueness. It only keeps the latest data of the same primary key ID. (As far as we know, the Apache Doris community is planning to include partial update of columns in the Unique Model, too.)

Duplicate Model: This model stores all original data exactly as it is without any pre-aggregation or deduplication.

After determining the data model, we had to think about how to name the columns. Using the tags or metrics as column names was not a choice because:

I. Our internal data users might need to rename the metrics or tags, but Doris 1.1.3 does not support modification of column names.

II. Tags might be taken online and offline frequently. If that involves the adding and dropping of columns, it will be not only time-consuming but also detrimental to query performance.

Instead, we do the following:

  • For flexible renaming of tags and metrics, we use MySQL tables to store the metadata (name, globally unique ID, status, etc.). Any change to the names will only happen in the metadata but will not affect the table schema in Doris. For example, if a song_name is given an ID of 4, it will be stored with the column name of a4 in Doris. Then if the song_nameis involved in a query, it will be converted to a4 in SQL.
  • For the onlining and offlining of tags, we sort out the tags based on how frequently they are being used. The least used ones will be given an offline mark in their metadata. No new data will be put under the offline tags but the existing data under those tags will still be available.
  • For real-time availability of newly added tags and metrics, we prebuild a few ID columns in Doris tables based on the mapping of name IDs. These reserved ID columns will be allocated to the newly added tags and metrics. Thus, we can avoid table schema change and the consequent overheads. Our experience shows that only 10 minutes after the tags and metrics are added, the data under them can be available.

Noteworthily, the recently released Doris 1.2.0 supports Light Schema Change, which means that to add or remove columns, you only need to modify the metadata in FE. Also, you can rename the columns in data tables as long as you have enabled Light Schema Change for the tables. This is a big trouble saver for us.

3. Optimize Date Writing

Here are a few practices that have reduced our daily offline data ingestion time by 75% and our CUMU compaction score from 600+ to 100.

  • Flink pre-aggregation: as is mentioned above.
  • Auto-sizing of writing batch: To reduce Flink resource usage, we enable the data in one Kafka Topic to be written into various Doris tables and realize the automatic alteration of batch size based on the data amount.
  • Optimization of Doris data writing: fine-tune the the sizes of tablets and buckets as well as the compaction parameters for each scenario:
max_XXXX_compaction_thread
max_cumulative_compaction_num_singleton_deltas
  • Optimization of the BE commit logic: conduct regular caching of BE lists, commit them to the BE nodes batch by batch, and use finer load balancing granularity.

4. Use Dori-on-ES in Queries

About 60% of our data queries involve group targeting. Group targeting is to find our target data by using a set of tags as filters. It poses a few requirements for our data processing architecture:

  • Group targeting related to APP users can involve very complicated logic. That means the system must support hundreds of tags as filters simultaneously.
  • Most group targeting scenarios only require the latest tag data. However, metric queries need to support historical data.
  • Data users might need to perform further aggregated analysis of metric data after group targeting.
  • Data users might also need to perform detailed queries on tags and metrics after group targeting.

After consideration, we decided to adopt Doris-on-ES. Doris is where we store the metric data for each scenario as a partition table, while Elasticsearch stores all tag data. The Doris-on-ES solution combines the distributed query planning capability of Doris and the full-text search capability of Elasticsearch. The query pattern is as follows:

SELECT tag, agg(metric) 
FROM Doris
WHERE id in (select id from Es where tagFilter)
GROUP BY tag

As is shown, the ID data located in Elasticsearch will be used in the sub-query in Doris for metric analysis.

In practice, we find that the query response time is related to the size of the target group. If the target group contains over one million objects, the query will take up to 60 seconds. If it is even larger, a timeout error might occur.

After investigation, we identified our two biggest time wasters:

I. When Doris BE pulls data from Elasticsearch (1024 lines at a time by default), for a target group of over one million objects, the network I/O overhead can be huge.

II. After the data pulling, Doris BE needs to conduct Join operations with local metric tables via SHUFFLE/BROADCAST, which can cost a lot.

Thus, we make the following optimizations:

  • Add a query session variable es_optimize that specifies whether to enable optimization.
  • In data writing into ES, add a BK column to store the bucket number after the primary key ID is hashed. The algorithm is the same as the bucketing algorithm in Doris (CRC32).
  • Use Doris BE to generate a Bucket Join execution plan, dispatch the bucket number to BE ScanNode and push it down to ES.
  • Use ES to compress the queried data; turn multiple data fetch into one and reduce network I/O overhead.
  • Make sure that Doris BE only pulls the data of buckets related to the local metric tables and conducts local Join operations directly to avoid data shuffling between Doris BEs.

As a result, we reduce the query response time for large group targeting from 60 seconds to a surprising 3.7 seconds.

Community information shows that Doris is going to support inverted indexing since version 2.0.0, which is soon to be released. With this new version, we will be able to conduct full-text search on text types, equivalence or range filtering of texts, numbers, and datetime, and conveniently combine AND, OR, NOT logic in filtering since the inverted indexing supports array types. This new feature of Doris is expected to deliver 3~5 times better performance than Elasticsearch on the same task.

5. Refine the Management of Data

Doris’ capability of cold and hot data separation provides the foundation of our cost reduction strategies in data processing.

  • Based on the TTL mechanism of Doris, we only store data of the current year in Doris and put the historical data before that in TDW for lower storage cost.
  • We vary the numbers of copies for different data partitions. For example, we set three copies for data of the recent three months, which is used frequently, one copy for data older than six months, and two copies for data in between.
  • Doris supports turning hot data into cold data so we only store data of the past seven days in SSD and transfer data older than that to HDD for less expensive storage.

Conclusion

Thank you for scrolling all the way down here and finishing this long read. We’ve shared our cheers and tears, lessons learned, and a few practices that might be of some value to you during our transition from ClickHouse to Doris. We really appreciate the help from the Apache Doris community and the SelectDB team, but we might still be chasing them around for a while since we attempt to realize auto-identification of cold and hot data, pre-computation of frequently used tags/metrics, simplification of code logic using Materialized Views, and so on and so forth.

# Links

SelectDB:

https://selectdb.com

Apache Doris:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Best Practices

Author: Junfei Liu, Senior Architect of Duyansoft

The world is getting more and more value out of data, as exemplified by the currently much-talked-about ChatGPT, which I believe is a robotic data analyst. However, in today’s era, what’s more important than the data itself is the ability to locate your wanted information among all the overflowing data quickly. So in this article, I will talk about how I improved overall data processing efficiency by optimizing the choice and usage of data warehouses.

Too Much Data on My Plate

The choice of data warehouses was never high on my worry list until 2021. I have been working as a data engineer for a Fintech SaaS provider since its incorporation in 2014. In the company’s infancy, we didn’t have too much data to juggle. We only needed a simple tool for OLTP and business reporting, and the traditional databases would cut the mustard.

But as the company grew, the data we received became overwhelmingly large in volume and increasingly diversified in sources. Every day, we had tons of user accounts logging in and sending myriads of requests. It was like collecting water from a thousand taps to put out a million scattered pieces of fire in a building, except that you must bring the exact amount of water needed for each fire spot. Also, we got more and more emails from our colleagues asking if we could make data analysis easier for them. That’s when the company assembled a big data team to tackle the beast.

The first thing we did was to revolutionize our data processing architecture. We used DataHub to collect all our transactional or log data and ingest it into an offline data warehouse for data processing (analyzing, computing. etc.). Then the results would be exported to MySQL and then forwarded to QuickBI to display the reports visually. We also replaced MongoDB with a real-time data warehouse for business queries.

This new architecture worked, but there remained a few pebbles in our shoes:

  • We wanted faster responses. MySQL could be slow in aggregating large tables, but our product guys requested a query response time of fewer than five seconds. So first, we tried to optimize MySQL. Then we also tried to skip MySQL and directly connect the offline data warehouse with QuickBI, hoping that the combination of query acceleration capability of the former and caching of the latter would do the magic. Still, that five-second goal seemed to be unreachable. There was a time when I believed the only perfect solution was for the product team to hire people with more patience.
  • We wanted less pain in maintaining dimension tables. The offline data warehouse conducted data synchronization every five minutes, making it not applicable for frequent data updates or deletions scenarios. If we needed to maintain dimension tables in it, we would have to filter and deduplicate the data regularly to ensure data consistency. Out of our trouble-averse instinct, we chose not to do so.
  • We wanted support for point queries of high concurrency. The real-time database that we previously used required up to 500ms to respond to highly concurrent point queries in both columnar storage and row storage, even after optimization. That was not good enough.

Hit It Where It Hurts Most

In March, 2022, we started our hunt for a better data warehouse. To our disappointment, there was no one-size-fits-all solution. Most of the tools we looked into were only good at one or a few of the tasks, but if we gathered the best performer for each usage scenario, that would add up to a heavy and messy toolkit, which was against instinct.

So we decided to solve our biggest headache first: slow response, as it was hurting both the experience of our users and our internal work efficiency.

To begin with, we tried to move the largest tables from MySQL to Apache Doris, a real-time analytical database that supports MySQL protocol. That reduced the query execution time by a factor of eight. Then we tried and used Doris to accommodate more data.

As for now, we are using two Doris clusters: one to handle point queries (high QPS) from our users and the other for internal ad-hoc queries and reporting. As a result, users have reported smoother experience and we can provide more features that are used to be bottlenecked by slow query execution. Moving our dimension tables to Doris also brought less data errors and higher development efficiency.

Both the FE and BE processes of Doris can be scaled out, so tens of PBs of data stored in hundreds of devices can be put into one single cluster. In addition, the two types of processes implement a consistency protocol to ensure service availability and data reliability. This removes dependency on Hadoop and thus saves us the cost of deploying Hadoop clusters.

Tips

Here are a few of our practices to share with you:

Data Model:

Out of the three Doris data models, we find the Unique Model and the Aggregate Model suit our needs most. For example, we use the Unique Model to ensure data consistency while ingesting dimension tables and original tables and the Aggregate Model to import report data.

Data Ingestion:

For real-time data ingestion, we use the Flink-Doris-Connector: After our business data, the MySQL-based binlogs, is written into Kafka, it will be parsed by Flink and then loaded into Doris in a real-time manner.

For offline data ingestion, we use DataX: This mainly involves the computed report data in our offline data warehouse.

Data Management:

We back up our cluster data in a remote storage system via Broker. Then, it can restore the data from the backups to any Doris cluster if needed via the restore command.

Monitoring and Alerting:

In addition to the various monitoring metrics of Doris, we deployed an audit log plugin to keep a closer eye on certain slow SQL of certain users for optimization.

Slow SQL queries:

Some of our often-used monitoring metrics:

Tradeoff Between Resource Usage and Real-Time Availability:

It turned out that using Flink-Doris-Connector for data ingestion can result in high cluster resource usage, so we increased the interval between each data writing from 3s to 10 or 20s, compromising a little bit on the real-time availability of data in exchange for much less resource usage.

Communication with Developers

We have been in close contact with the open source Doris community all the way from our investigation to our adoption of the data warehouse, and we’ve provided a few suggestions to the developers:

  • Enable Flink-Doris-Connector to support simultaneous writing of multiple tables in a single sink.
  • Enable Materialized Views to support Join of multiple tables.
  • Optimize the underlying compaction of data and reduce resource usage as much as possible.
  • Provide optimization suggestions for slow SQL and warnings for abnormal table creation behaviors.

If the perfect data warehouse is not there to be found, I think providing feedback for the second best is a way to help make one. We are also looking into its commercialized version called SelectDB to see if more custom-tailored advanced features can grease the wheels.

Conclusion

As we set out to find a single data warehouse that could serve all our needs, we ended up finding something less than perfect but good enough to improve our query speed by a wide margin and discovered some surprising features of it along the way. So if you wiggle between different choices, you may bet on the one with the thing you want most badly, and taking care of the rest wouldn’t be so hard.

Try Apache Doris out!

It is an open source real-time analytical database based on MPP architecture. It supports both high-concurrency point queries and high-throughput complex analysis. Or you can start your free trial of SelectDB, a cloud-native real-time data warehouse developed based on the Apache Doris open source project by the same key developers.

Best Practices

Guide: In July, 2022, 360 DigiTech applied Apache Doris in its production environment. Now they have 2 clusters containing hundreds of tables and dozens of TBs of data. Every day, hundreds of workflows are running simultaneously and billions of data updates are handled. Apache Doris manages to support such big business with excellent performance and stability all the way.

Author: Middleware Team of 360 DigiTech

About Apache Doris

Apache Doris is a real-time analytical database based on MPP architecture, known for its high performance and ease of use. It supports both high-concurrency point queries and high-throughput complex analysis.

SelectDB is a cloud-native real-time data warehouse developed based on the Apache Doris open source project by the same key developers. The SelectDB company focuses on the enterprise-grade cloud-native distribution for Apache Doris.

About 360 DigiTech

360 DigiTech (NASDAQ: QFIN) is an AI-driven FinTech platform. It works in collaboration with financial institutions to provide tailored online consumer financial products, including 360 IOU APP, 360 Small Business Loans, and 360 Installments, for a broad customer base as a way to achieve inclusive finance. Till now, it has assisted 141 financial institutions in providing credit services for 43 million users, borrowing services for 26.3 million users, and facilitated quarterly transactions worth around US$ 16.5 billion. As a leading credit technology service brand in China, 360 DigiTech had accumulated over 200 million registered users by the end of Q3, 2022.

Requirements of 360 DigiTech

The ever-evolving FinTech business requires higher levels of data security, data accuracy, and real-time data updates. In business scenarios like analysis and tagging, early Clickhouse clusters reveal disadvantages including low stability, complex operation and maintenance, and slow Join queries. In the case of 360 DigiTech, our report data is distributed across various databases, adding to the difficulty of data maintenance and management. We are in urgent need of a powerful and handy tool.

Ideal Data Warehouse for 360 DigiTech

We were looking for a real-time MPP data warehouse that could meet the following requirements:

  • Quick data writing
  • Quick query response time of only seconds
  • Be compatible with standard SQL protocol
  • High Join query performance
  • A rich variety of data models
  • Low maintenance complexity
  • An active user community
  • Be business-friendly, with no potential legal risks

We found Apache Doris an ideal candidate and started a 2-month survey on it in March, 2022. Here are the results:

We decided to go for Apache Doris due to its almost perfect fulfillment of our requirements and a few other considerations:

  1. ClickHouse is more of a log storage and historical data processing tool due to its low maintenability and limitations on Join queries, functions, and data models. We need a tool that can better serve our needs.
  2. Apache Doris comes with an active user community where technical communications are frequent. Plus, SelectDB, the company that commercializes Apache Doris, has established a full-time technical support team for the Doris community that can offer timely response to our problems.
  3. Apache Doris, together with the Apache open source license, are business-friendly and impose no legal risks. Also, the Apache projects have constituted de facto standards in the big data industry and have been embraced by 360 DigiTech internally.

Platform Architecture of 360 DigiTech

The Yushu big data platform of 360 DigiTech provides one-stop big data management, development, and analysis services across the whole data lifecycle. Currently, Apache Doris is mainly used for offline data warehouse analysis acceleration and self-service BI reporting in 360 DigiTech.

After introducing Apache Doris into our toolkit, we ingested some of our high-priority data into Doris clusters. As is shown in the above figure, we placed Doris on top of Apache Hive to give full play to Doris' strong query performance so it could accelerate queries in certain scenarios. This was an easy move since we only needed to make simple adaptations to smoothen the data ingestion from Hive to Doris. The Doris clusters could directly reuse the existing data lineage in Hive.

Data Ingestion Solution

We compared two different data ingestion methods in Doris: Stream Load and Broker Load. We found that Broker Load slightly outperformed Stream Load in ingestion speed while there was no noticeable difference in development costs between the two. Also, Broker Load was able to ingest one large table as one transaction; however, with the Stream Load method, any table of over 10G data would need to be split before it could be ingested. Thus, we opted for Broker Load.

Ad-Hoc Query Solution

Our self-developed query engines support a dynamic switching mechanism. That means the system can identify the meta information of the query data and then conduct automatic routing switch and failover for the query engine (Doris/Presto/Spark/Hive) of the current query.

Since Apache Doris supports the native MySQL protocol and standard SQL, it can be seamlessly integrated with popular BI tools such as Tableau. Hence, we built a Doris report analysis cluster separately as the data source for BI tools.

Application of Doris

How Doris Accelerates Hive Queries

In ad-hoc queries, the performance of traditional query engines (Hive/Spark/Presto) is lagging behind the expectations of data developers and data analysts. Query response time of dozens of seconds or even minutes has become a big hindrance for developers.

To solve this, we have built a data warehouse acceleration layer, where Doris plays a role. As a result, Doris has reduced the average ad-hoc query response time to less than 5 seconds. This was achieved without enabling optimization strategies such as cache and materialized view.

As a follow-up effort, we will analyze the features of relevant queries and adopt proper optimization methods to further improve query performance.

Doris accelerates queries by the query engine dynamic switching mechanism, which works as follows:

As is shown above, the query engine collects the meta information of data in Hive and Doris in a timely manner, including information about databases, tables, fields, and rows. When users submit an ad-hoc query request, the system will parse the target table, and make the following judgements one by one:

  • Is the target table existing in Doris?
  • Does the Doris table have the same scheme as the Hive table?
  • Does the Doris table have the same number of rows as the Hive table?

If all answers are "Yes", the current query will be routed to Doris; otherwise the query will be conducted in Presto, then Spark, and then Hive. If problems occur during the query, a failover will be performed in that same order.

Analysis of Slow Queries and Slow Ingestion

Doris has a well-developed Profile mechanism for slow queries and slow data ingestion. After learning about this mechanism, we started to regularly collect and store profile information of slow queries and slow data ingestion through scheduling tasks on our online clusters.

The profile information contains lots of details. For example, OLAP_SCAN_NODE is the number of the overall scanned rows and the filtered rows of each index; the EXCHANGE_NODE of each instance is the number of received rows and the total received data volume. Based on these details, we conducted tailored optimization and removed performance bottlenecks effectively.

How We Create Tables

In our application scenarios, we use the following tables:

  • PDA Table: fully updated day to day, with daily partitions storing full snapshot data
  • PDI Table: incrementally updated day to day, with daily partitions storing incremental data
  • A Table: full table with no partitions
  • S Table: static data with no day-to-day updates

Since all tables in Doris clusters are ingested from and synchronized with those in Hive, we have only adopted the Duplicate and Unique data models so far. When creating PDA, PDI, and A Tables, in order to reduce the number of partitions and management complexity of FE metadata, we used a dynamic data partitioning method: Old data is partitioned and archived by year and month, while recent data is partitioned by day and hour. We also plan to automize the merging of historical data partitions.

For PDA Tables, we used the Duplicate model instead of the Unique model. The reason is twofold. Firstly, the Doris data ingestion model itself provides data consistency guarantee based on the task label. One ingestion task of one partition of a PDA table in one scheduling cycle generates only one unique and immutable label. Thus, no matter how many execution errors occur, data in that partition will not be duplicated. Secondly, unlike the Unique model, the Duplicate model does not perform pre-aggregate de-duplication, making data ingestion and queries faster.

For PDI Tables, we used the Unique model. The usage of PDI tables involves partial update of historical data (mostly data update and basically no data deletion). Thus, considering the partition availability, we chose the Unique model so that we don't need to rebuild the partitions when updating historical data.

For A Tables, we used dynamic partitioning since temporary data unavailability was acceptable. Before each ingestion, we deleted the historical partitions and loaded all data into the current day's partition. This was easy to implement and could avoid recreation of tables.

Older versions of Apache Doris did not support Hive metadata sychronization and management. We had to select and configure the clusters, Hive table names, data model, and bucket numbers before we could relate to the corresponding Hive table. Then the system would parse table fields, and generate CREATE TABLE statements. An exciting news is that Apache Doris 1.2.0 supports Multi Catalog, which means it can perform auto-synchronization of Hive metadata and schema. This relieves so much of our workload!

Monitoring System

The current Doris cluster monitoring system consists of three parts: host metrics monitoring alerts, log alerts, and cluster metrics monitoring alerts. The overall system is shown as follows:

The host metrics monitoring platform is developed based on Open-Falcon. It collects CPU, IO, memory, and disk metrics of Doris cluster nodes, and creates alerts after detecting abnormal situations.

The cluster metrics monitoring platform is built based on the monitoring solutions of Prometheus and Grafana, which are mentioned in the Doris documentation.

Log alert is a complement to the above two. In some cases, it is a less expensive way to monitor the Doris service logs. By this method, we can accurately identify the root reason for data ingestion failures and receive prompt notifications.

Troubleshooting and Audit Log

In addition to the monitoring system, we need extra measures to address some extreme cluster issues. In order to quickly locate the stacks in the case of unusual downtime of cluster BE, we need to enable Core Dump for all BE nodes. Besides, audit logs also play a significant role in everyday operation and maintenance.

Audit logs of Doris clusters can be collected in two ways:

  • Collect components from the logs and fe.audit.log from each FE node
  • Use the Auditloader plugin provided by Doris (which locates in doris/fe_plugins/auditloader in the Doris source code). See detailed usage in Audit Log Plugin.

We followed the second method since it is simpler. In actual practice, we found several problems regarding the use of the Auditloader plugin and committed PRs to the Doris project. Meanwhile, we customized our Doris console, on which we could check the data distribution and synchronization details of the clusters, and conduct audit log retrieval.

If cluster BE collapses, audit logs can provide information about SQL location, client access, response time of query SQL, and features of access SQL. For example, our data developers once reported a Doris SQL failure with the log showing that we had hit the connection limit. By looking into the audit log, we realized that it was because we had created too many database connections due to a bug in an upstream workflow import task. Also, for several BE downtime that occurred when we used the older versions of Apache Doris, we located the query_id of the relevant SQL using the GDB debugger. Based on that, we quickly found the culprit SQL in the audit log.

Summary and Future Plans

Summary

In July, 2022, we applied Apache Doris in our production environment. Now we have 2 clusters containing hundreds of tables and dozens of TBs of data. Every day, we run hundreds of workflows simultaneously and handle billions of data updates. Apache Doris manages to support such big business with excellent performance and stability all the way.

Why Doris Stands Out:

  • Doris clusters are of neat architecture with no dependencies on other components. With user-friendly data models and various data ingestion methods that bring low adaptation costs, Doris has made the trial use and application of it an easy journey for us.
  • As Doris clusters have become a major data source of our BI tools, Doris has greatly accelerated our report analysis workloads and largely reduced the response time of ad-hoc queries.
  • Doris has a well-developed monitoring and auditing mechanism, which makes maintenance no longer a nightmare.
  • Doris comes with an active user community and a professional technical support team that can respond to and address our problems quickly.

Future Plans

We are planning to apply Apache Doris in more business scenarios, such as real-time data warehouse creation, user behavior profiling, and federated queries to data lakes such as Hive, Iceberg, and Hudi. We also plan to further improve query performance by utilizing the query caching feature and materialized views of Doris based on user query SQL features. We will build a cluster probing tool that monitors data distribution (checking for overly large Tablets or unevenly distributed data in Tablets) and running status of clusters and provides optimization suggestions automatically.

Special thanks to the SelectDB technical support team for their kind help.

Best Practices

Guide: Established in 2015, LY Digital is a financial service platform for tourism industry under LY. Com. In 2020, LY Digital introduced Apache Doris to build a data warehouse because of its rich data import methods, excellent parallel computing capabilities, and low maintenance costs. This article describes the evolution of data warehouse in LY Digital and why we switch to Apache Doris. I hope you like it.

Author: Xing Wang, Lead Developer of LY Digital

1. Background

1.1 About LY Digital

LY Digital is a tourism financial service platform under LY. Com. Formally established in 2015, LY Digital takes "Digital technology empowers the tourism industry." as its vision. At present, LY Digital's business covers financial services, consumer financial services, financial technology and digital technology. So far, more than 10 million users and 76 cities have enjoyed our services.

1.2 Requirements for Data Warehouse

  • Dashboard: Needs dashboard for T+1 business, etc.
  • Early Warning System: Needs risk control, anomaly capital management and traffic monitoring, etc.
  • Business Analysis: Needs timely data query analysis and temporary data retrieval, etc.
  • Finance: Needs liquidation and payment reconciliation.

2. Previous Data Warehouse

2.1 Architecture

Our previous data warehouse adopted the combination of SteamSets and Apache Kudu, which was very popular in the past few years. In this architecture, Binlog is ingested into Apache Kudu after passing through StreamSets in real-time, and is finally queried and used through Apache Impala and visualization tools.

2.1.2 Downside

  • The previous data warehouse has a sophisticated structure that consists of many components that interact with one another, which requires huge operation and maintenance costs.
  • The previous data warehouse has a sophisticated structure that consists of many components that interact with one another, which requires huge operation and maintenance costs.
  • Apache Kudu's performance in wide tables Join is not so good.
  • SLA is not fully guaranteed because tenant isolation is not provided.
  • Although SteamSets are equipped with early warning capabilities, job recovery capabilities are still poor. When configuring multiple tasks, the JVM consumes a lot, resulting in slow recovery.

3. New Data Warehouse

Due to so many shortcomings, we had to give up the previous data warehouse. In 2020, we conducted an in-depth research on the popular data warehouses in the market.

During the research, we focused on comparing Clickhouse and Apache Doris. ClickHouse has a high utilization rate of CPU, so it performs well in single-table query. But it does not perform well in multitable Joins and high QPS. On the other hand, Doris can not only support thousands of QPS per node. Thanks to the function of partitioning, it can also support high-concurrency queries at the QPS level of 10,000. Moreover, the horiziontal scaling in and out of ClickHouse are complex, which cannot be done automatically at present. Doris supports online dynamic scaling, and can be expanded horizontally according to the development of the business.

In the research, Apache Doris stood out. Doris's high-concurrency query capability is very attractive. Its dynamic scaling capabilities are also suitable for our flexible advertising business. So we chose Apache Doris for sure.

After introducing Apache Doris, we upgraded the entire data warehouse:

  • We collect MySQL Binlog through Canal and then it is ingested into Kafka. Because Apache Doris is highly capatible with Kafka, we can easily use Routine Load to load and import data.
  • We have made minor adjustments to the batch processing. For data stored in Hive, Apache Doris can ingest data from Hive through Broker Load. In this way, the data in batch processing can be directly ingested into Doris.

3.2 Why We Choose Doris

The overall performance of Apache Doris is impressive:

  • Data access: It provides rich data import methods and can support the access of many types of data sources;
  • Data connection: Doris supports JDBC and ODBC connections. And it can easily connect with BI tools. In addition, Doris uses the MySQL protocol for communication. Users can directly access Doris through various Client tools;
  • SQL syntax: Doris adopts MySQL protocol and it is highly compatible with MySQL syntax, supporting standard SQL, and is low in learning costs for developers;
  • MPP parallel computing: Doris provides excellent parallel computing capabilities and has obvious advantages in complex Join and wide table Join;
  • Fully-completed documentation: Doris official documentation is very profound, which is friendly for new users.

3.3 Architecture of Real-time Processing

  • Data source: In real-time processing, data sources come from business branches such as industrial finance, consumer finance, and risk control. They are all collected through Canal and API.
  • Data collection: After data collection through Canal-Admin, Canal sends the data to Kafka message queue. After that, the data is ingested into the Doris through Routine Load.
  • Inside Doris: The Doris cluster constitutes a three-level layer of the data warehouse, namely: the DWD layer with the Unique model, the DWS layer with the Aggregation model, and the ADS application layer.
  • Data application: The data is applied in three aspects: real-time dashboard, data timeliness analysis and data service.

3.4 New Features

The data import method is simple and adopts 3 different import methods according to different scenarios:

  • Routine Load: When we submit the Rountine Load task, there will be a process within Doris that consumes Kafka in real time, continuously reads data from Kafka and ingestes it into Doris.
  • Broker Load: Offline data such as dim-tables and historical data are ingested into Doris in an orderly manner.
  • Insert Into: Used for batch processing tasks, Insert into is responsible for processing data in the DWD layer

Doris' data model improves our development efficiency:

  • The Unique model is used when accessing the DWD layer, which can effectively prevent repeated consumption of data.
  • In Doris, aggregation supports 4 models, such as Sum, Replace, Min, and Max. In this way, it may reduce a large amount of SQL code, and no longer allow us to manually write Sum, Min, Max and other codes.

Doris query is efficient:

  • It supports materialized view and Rollup materialized index. The bottom layer of the materialized view is similar to the concept of Cube and the precomputation process. As a way of exchanging space for time, special tables are generated at the bottom layer. In the query, materialized view maps to the tables and responds quickly.

4. Benefits of the New Data Warehouse

  • Data access: In the previous architecture, the Kudu table needs to be created manually during the imports through SteamSets. Lack of tools, the entire process of creating tables and tasks takes 20-30 minutes. Nowadays, fast data access can be realized through the platform. The access process of each table has been shortened from the previous 20-30 minutes to the current 3-5 minutes, which is to say that the performance has been improved by 5-6 times.
  • Data development: After using Doris, we can directly use the data models, such as Unique and Aggregation. The Duplicate model can well support logs, greatly speeding up the development process in ETL.
  • Query analysis: The bottom layer of Doris has functions such as materialized view and Rollup materialized index. Moreover, Doris has made many optimizations for wide table associations, such as Runtime Filter and other Joins. Compared with Doris, Apache Kudu requires more complex optimization to be better used.
  • Data report: It took 1-2 minutes to complete the rendering when we used Kudu to query before, but Doris responded in seconds or even milliseconds.
  • Easy maintenance: Doris is not as complex as Hadoop. In March, our IDC was relocated, and 12 Doris virtual machines were all migrated within three days. The overall operation is relatively simple. In addition to physically moving the machine, FE's scaling only requires simple commands such as Add and Drop, which do not take a long time to do.

5. Look ahead

  • Realize data access based on Flink CDC: At present, Flink CDC is not introduced, but Kafka through Canal instead. The development efficiency can be even faster if we use Flink CDC. Flink CDC still needs us to write a certain amount of code, which is not friendly for data analysts to use directly. We hope that data analysts only need to write simple SQL or directly operate. In the future planning, we plan to introduce Flink CDC.
  • Keep up with the latest release: Now the latest version Apache Doris V1.2.0 has made great achievements in vectorization, multi-catalog, and light schema change. We will keep up with the community to upgrade the cluster and make full use of new features.
  • Strengthen the construction of related systems: Our current index system management, such as report metadata, business metadata, and other management levels still need to be improved. Although we have data quality monitoring functions, it still needs to be strengthened and improved in automation.

Best Practices

Author: Xiang He, Head Developer of Big Data, Commercialization Team of Kwai

1 About Kwai

1.1 Kwai

Kwai(HKG:1024) is a social network for short videos and trends. Discover funny short videos, contribute to the virtual community with recordings, videos of your life, playing daily challenges or likes the best memes and videos. Share your life with short videos and choose from dozens of magical effects and filters for them.

1.2 Kwai's Commercial Report Engine

Kwai’s commercial report engine provides advertisers with real-time query service for multi-dimensional analysis reports. And it also provides query service for multi-dimensional analysis reports for internal users. The engine is committed to dealing with high-performance, high-concurrency, and high-stability query problems in multi-dimensional analysis report cases.

2 Previous Architecture

2.1 Background

Traditional OLAP engines deal with multi-dimensional analysis in a more pre-modeled way, by building a data cube (Cube) to perform operations such as Drill-down, Roll-up, Slice, and Dice and Pivot. Modern OLAP analysis introduces the idea of ​​a relational model, representing data in two-dimensional relational tables. In the modeling process, usally there are two modeling methods. One is to ingest the data of multiple tables into one wide table through Join; the other is to use the star schema, divide the data into fact table and dim-table. And then Join them when querying. Both options have some pros and cons:

Wide table:

Taking the idea of ​​exchanging space for time. The primary key of the dim-table is the unique ID to fill all dimensions, and multiple dimension data is stored in redundant storage. Its advantage is that it is convenient to query, unnecessary to associate additional dim-tables, which is way better. The disadvantage is that if there is a change in dimension data, the entire table needs to be refreshed, which is bad for high-frequency Update.

Star Schema:

Dimension data is completely separated from fact data. Dimension data is often stored in a dedicated engine (such as MySQL, Elasticsearch, etc.). When querying, dimension data is associated with the primary key. The advantage is that changes in dimension data do not affect fact data, which can support high-frequency Update operations. The disadvantage is that the query logic is relatively more complex, and multi-table Join may lead to performance loss.

2.2 Requirement for an OLAP Engine

In Kwai’s business, the commercial reports engine supports the real-time query of the advertising effect for advertisers. When building the report engine, we expect to meet the following requirements:

  • Immersive data: the original data of a single table increases by ten billion every day
  • High QPS in Query: thousand-level QPS on average
  • High stability requirements: SLA level of 99.9999 %

Most importantly, due to frequent changes in dimension data, dim-tables need to support Update operations up to thousand-level QPS and further support requirements such as fuzzy matching and word segmentation retrieval. Based on the above requirements, we chose star schema and built a report engine architecture with Apache Druid and Elasticsearch.

2.3 Previous Architecture: Based on Apache Druid

We chose the combination of Elasticsearch and Apache Druid. In data import, we use Flink to pre-aggregate the data at minute-evel, and use Kafka to pre-aggregate the data at hour-level. In data query, the application initiates a query request through RE Front API, and Re Query initiates queries to the dim-table engine (Elasticsearch and MySQL) and the extension engine respectively.

Druid is a timing-based query engine that supports real-time data ingestion and is used to store and query large amounts of fact data. We adopt Elasticseach based on those concerns:

  • High update frequency, QPS is around 1000
  • Support word segmentation and fuzzy search, which is suitable for Kwai
  • Supports high-level dim-table data, which can be directly qualified without adopting sub-database and sub-table just like MySQL database
  • Supports data synchronization monitoring, and has check and recovery services as well

2.4 Engine of the Reports

The report engine can be divided into two layers: REFront and REQuery. REMeta is an independent metadata management module. The report engine implements MEMJoin inside REQuery. It supports associative query between fact data in Druid and dimension data in Elasticsearch. And it also provides virtual cube query for upper-layer business, avoiding the exposion of complex cross-engine management and query logic.

3 New Architecture Based on Apache Doris

3.1 Problems Remained

First, we came across a problem when we build the report engine. Mem Join is single-machine with serial execution. When the amount of data pulled from Elasticsearch exceeds 100,000 at a single time, the response time is close to 10s, and the user experience is poor. Moreover, using a single node to execute large-scale data Join will consume a lot of memory, causing Full GC.

Second, Druid's Lookup Join function is not so perfect, which is a big problem, and it cannot fully meet our business needs.

3.2 Database Research

So we conducted a survey on popular OLAP databases in the industry, the most representative of which are Apache Doris and Clickhouse. We found out that Apache Doris is more capable of Join between large and wide tables. ClickHouse can support Broadcast memory-based Join, but the performance is not good for the Join between large and wide tables with a large data volume. Both Doris and Clickhouse support detailed data storage, but the capability for concurrency of Clickhouse is low. On the contrary, Doris supports high-concurrency and low-latency query services, and a single machine supports up to thousands of QPS. When the concurrency increases, horizontal expansion of FE and BE can be supported. However, Clickhouse's data import is not able to support Transaction SQL, which cannot realize Exactly-once semantics and has limited ablility for standard SQL. In contrast, Doris provides Transaction SQL and atomicity for data import. Doris itself can ensure that messages in Kafka are not lost or re-subscribed, which is to say, Exactly-Once semantics is supported. ClickHouse has high learning cost, high operation and maintenance costs, and weak in distribution. The fact that it requires more customization and deeper technical strength is another problem. Doris is different. There are only two core components, FE and BE, and there are fewer external dependencies. We also found that because Doris is closer to the MySQL protocol, it is more convenient than Clickhouse and the cost of migration is not so large. In terms of horizontal expansion, Doris' expansion and contraction can also achieve self-balancing, which is much better than that of Clickhouse.

From this point of view, Doris can better improve the performance of Join and is much better in other aspects such as migration cost, horizontal expansion, and concurrency. However, Elasticsearch has inherent advantages in high-frequency Update.

It would be an ideal solution to deal with high-frequency Upate and Join performance at the same time by building engines through Doris on Elasticsearch.

3.3 Good Choice: Doris on Elasticsearch

What is the query performance of Doris on Elasticsearch?

First of all, Apache Doris is a real-time analytical database based on MPP architecture, with strong performance and strong horizontal expansion capability. Doris on Elasticsearch takes advantage on this capability and does a lot of query optimization. Secondly, after integrating Elasticsearch, we have also made a lot of optimizations to the query:

  • Shard-level concurrency
  • Automatic adaptation of row and column scanning, priority to column scanning
  • Sequential read, terminated early
  • Two-phase query becomes one-phase query
  • Broadcast Join is especially friendly for small batch data

3.4 Doris on Elasticsearch

The upgrade of the data link is relatively simple. In the first step, in Doris we build a new Olap table and configure the materialized view. Second, the routine load is initiated based on the Kafka topic of the previous fact data, and then real-time data is ingested. The third step is to ingest offline data from Hive's broker load. The last step is to create an Elasticsearch external table through Doris.

3.4.2 Upgrades of the Report Engine

Note: The MySQL dim-table associated above is based on future planning. Currently, Elasticsearch is mainly used as the dim-table engine

Report Engine Adaptation

  • Generate virtual cube table based on Doris's star schema
  • Adapt to cube table query analysis, intelligent Push-down
  • Gray Release

4 Online Performance

4.1 Fact Table Query Performance Comparison

Druid

Doris

99th percentile of response time: Druid: 270ms, Doris: 150ms and which is reduced by 45%

4.2 Comparison of Cube Table Query Performance in Join

Druid

99th percentile of response time: Druid: 660ms, Doris: 440ms and which is reduced by 33%

4.3 Benefits

  • The overall time consumption of 99 percentile is reduced by about 35%
  • Resource saving about 50%
  • Remove the complex logic of MemJoin from the report engine; Realize through DO(in the case of large query: dim-table results exceed 100,000, performance improvement exceeds 10 times, 10s to 1s)
  • Richer query semantics (Mem Join is relatively simple and does not support complex queries)

5 Summary and Plans

In Kwai's commercial business, Join queries between dimension data and fact data is very common. After using Doris, query becomes simple. We only need to synchronize the fact table and dim-table on a daily basis and Join while querying. By replacing Druid and Clickhouse with Doris, Doris basically covers all scenarios when we use Druid. In this way, Kwai's commercial report engine greatly improves the aggregation and analysis capabilities of massive data. During the use of Apache Doris, we also found some unexpected benefits: For example, the import method of Routine Load and Broker Load is relatively simple, which improves the query speed; The data occupation is greatly reduced; Doris supports the MySQL protocol, which is much easier for data analyst to fetch data and make charts.

Although the Doris on Elasticsearch has fully meet our requirement, Elasticsearch external table still requires manual creation. However, Apache Doris recently released the latest version V1.2.0. The new version has added Multi-Catlog, which provides the ability to seamlessly access external table sources such as Hive, Elasticsearch, Hudi, and Iceberg. Users can connect to external tables through the CREATE CATALOG command, and Doris will automatically map the library and table information of the external dable. In this way, we don't need to manually create the Elasticsearch external tables to complete the mapping in the future, which greatly saves us time and cost of development and improves the efficiency of research and development. The power of other new functions such as Vectorization and Ligt Schema Change also gives us new expectations for Apache Doris. Bless Apache Doris!

Best Practices

Guide: Refined operation is a trend of the future Internet, which requires excellent data analysis. In this article, you will get knowledge of: the construction of Netease Lifease's DMP system and the application of Apache Doris.

Author | Xiaodong Liu, Lead Developer, Netease

Better data analysis enables users to get better experience. Currently, the normal analysis method is to build a user tags system to accurately generate user portraits and improve user experience. The topic we shared today is the practice of Netease DMP tags system.

About Netease and Lifease

NetEase (NASDAQ: NTES) is a leading Internet technology company in China, providing users with free emails, gaming, search engine services, news and entertainment, sports, e-commerce and other services.

Lifease is Netease's self-operated home furnishing e-commerce brand. Its products cover 8 categories in total: home life, apparel, food and beverages, personal care and cleaning, baby products, outdoor sport, digital home appliances, and Lifease's Special. In Q1 of 2022, Lifease launches "Pro " membership and other multiple memberships for different users. The number of Pro members has increased by 65% ​​compared with the previous year.

About the DMP System

DMP system plays an important role in Lifease's data analysis. The data sources of DMP mainly include:

  • Business logs of APPs, H5s, PCs and other terminals
  • Basic data constructed within NetEase Group
  • Data from products sold by third-party such as JD.com, Alibaba, and Bytedance Through data collection and data cleaning, the above data is ingested into data assets. Based on these data, DMP has created a system of functions, such as tag creation, grouping and portrait analysis, which supports the business including: intelligent product matching, user engagement, and user insight. In general, the DMP system concentrates on building a data-centric tagging system and portrait system to assist the business.

You can get basic knowledge of the DMP system starting from the concepts below:

  • Tagging: Tagging is one of the user monitoring abilities to uniquely identify individual users across different browsers, devices, and user sessions. This approach to user tagging works by capturing available data in your application's page source: age, address, preference and other variables.
  • Targeting: Target audience may be dictated by age, gender, income, location, interests or a myriad of other factors.
  • User Portrait Analysis: User portrait analysis is to develop user profiles, actions and attributes after targeting audience. For instance, check the behavior paths and consumption models of users whose portraits are "City: Hangzhou, Gender: Female" on Lifease APP.

Llifease's tagging system mainly provides two core capabilities:

  1. Tag Query: the ability to query the specified tag of a specific entity, which is often used to display basic information.
  2. Targeting Audience: for both real-time and offline targets. Result after targeting is mainly used for:
  • As Grouping Criteria: It can be used to tell if the user is in one or more specified groups. This occasionally occurs in scenarios such as advertising and contact marketing.
  • Resultset Pull: Extract specified data to business system for customized development.
  • Portrait Analysis: Analyze the behavioral and consumption models in specific groups of people for more refined operations.

The overall business process is as follows:

  • First define the rules for tags and grouping;
  • After defining the DSL, the task can be submitted to Spark for processing;
  • After the processing is done, the results can be stored in Hive and Doris;
  • Data from Hive or Doris can be queried and used according to the actual business needs.

The DMP platform is divided into four modules: Processing&storage layer, scheduling layer, service layer, and metadata management. All tag meta-information is stored in the source data table; The scheduling layer schedules tasks for the entire business process: Data processing and aggregation are converted into basic tags, and the data in the basic tags and source tables are converted into something that can be used for data query through SQL; The scheduling layer dispatches tasks to Spark to process, and then stores results in both Hive and Doris. The service layer consists of four parts: tag service, entity grouping service, basic tag data service, and portrait analysis service.

The lifecycle of tag consists of 5 phases:

  • Tag requirements: At this stage, the operation team demands and the product manager team evaluates the rationality and urgency of the requirements.
  • Scheduling production: Developers first sort out the data from ODS to DWD, which is the entire link of DM layer. Secondly, they build a model based on data, and at the same time, monitor the production process.
  • Targeting Audience: After the tag is produced, group the audience by those tags.
  • Precision marketing: Carry out precision marketing strategy to people grouped by.
  • Effect evaluation: In the end, tage usage rate and use effect need to be evaluated for future optimization.

Production of Tags

Tag data layering:

  • The bottom layer is the ODS layer, including user login logs, event tracking records, transaction data, and Binlog data of various databases
  • The data processed by the ODS layer, such as user login table, user activity table and order information table reaches the DWD detail layer
  • The DWD layer data is aggregated to the DM layer and the tags are all implemented based on the DM layer data. At present, we have fully automated the data output from the original database to the ODS layer. And we also realized partial automation from the ODS layer to the DWD layer. And there are a small number of automated operations from the DWD to the DM layer, which will be our focus in the future.

Tags are devided based on timeliness: offline tags, quasi-real-time tags and real-time tags. According to the scale of data, it is divided into: aggregation tags and detail tags. In other cases, tags can also be divided into: account attribute tags, consumption behavior tags, active behavior tags, user preference tags, asset information tags, etc.

It is inconvenient to use the data of the DM layer directly because the basic data is relatively primitive. The abstraction level is lacking and it is not easy to use. By combining basic data with AND, OR, and NOT, business tags are formed for further use, which can reduce the cost of understanding operations and make it easier to use.

After the tags are merged, it is necessary to apply the tags to specific business scenarios, such as grouping. The configuration is shown on the left side of the figure above, which supports offline crowd packages and real-time behaviors (need to be configured separately). After configuration, generate the DSL rules shown on the right side of the figure above, expressed in Json format, which is more friendly to FE, and can also be converted into query statements of the datebase engine.

Tagging is partially automated. The degree of automation in grouping is relatively high. For example, group refresh can be done regularly every day; Advanced processing, such as intersection/merge/difference between groups; Data cleaning means timely cleaning up expired and invalid data.

Tags Storage

Lifease's DMP labeling system needs to carry relatively large customer end traffic, and has relatively high requirements for real-time performance. Our storage requirements include:

  • Need support high-performance query to deal with large-scale customer end traffic
  • Need support SQL to facilitate data analysis scenarios
  • Need support data update mechanism
  • Can store large amount of data
  • Need support for extension functions to handle custom data structures
  • Closely integrated with big data ecology

In the field of big data, multiple engines vary in different applicable scenarios. We used the popular engines in the chart below to optimize our database architecture for 2 times.

Our architecture V1.0 is shown below:

Most of the offline data is stored in Hive while a small part is stored in Hbase (mainly used for querying basic tags). Part of the real-time data is stored in Hbase for basic tags query and the rest is double-written into KUDU and Elasticsearch for real-time grouping and data query. The data offline is processed by Impala and cached in Redis. Disadvantages :

  • Too many database engines.
  • Double writing has hidden problems with data quality. One side may succeed while the other side fails, resulting in data inconsistency.
  • The project is complex and maintainability is poor. In order to reduce the usage of engine and storage, we improved and implemented version 2.0 :

In storage architecture V2.0, Apache Doris is adopted. Offline data is mainly stored in Hive. At the same time, basic tags are imported into Doris, and real-time data as well. The query federation of Hive and Doris is performed based on Spark, and the results are stored in Redis. After this improvement, an storage engine which can manages offline and real-time data has been created. We are currently use Apache Doris 1.0, which enables : 1. The query performance can be controlled within 20ms at 99% 2. The query performance can be controlled within 50ms at 99.9%. Now the architecture is simplified, which greatly reduces operation and maintenance costs.

Advantages of Apache Doris in Practice

Lifeuse has adopted Apache Doris to check, batch query, path analyse and grouping. The advantages are as follows:

  • The query federation performance of key query and a small number of tables exceeds 10,000 QPS, with RT99<50MS.
  • The horizontal expansion capability is relatively strong and maintenance cost is relatively low.
  • The offlin and real-time data are unified to reduce the complexity of the tags model.

The downside is that importing a large amount of small data takes up more resources. But this problem has been optimized in Doris 1.1. Apache Doris has greatly enhanced the data compaction capability in version 1.1, and can quickly complete aggregation of new data, avoiding the -235 error caused by too many versions of sharded data and the low query efficiency problems.

Future Plan

Hive and Spark are gradually turning into Apache Doris. Optimize the tagging system:

  • Establish a rich and accurate tag evaluation system
  • Improve tag quality and output speed
  • Improve tag coverage More precision operation:
  • Build a rich user analysis model
  • Improve the user insight model evaluation system based on the frequency of use and user value
  • Establish general image analysis capabilities to assist intelligent decision-making in operations

Best Practices

Guide: Xiaomi Group introduced Apache Doris in 2019. At present, Apache Doris has been widely used in dozens of business departments within Xiaomi. A set of data ecology with Apache Doris has been formed. This article is transcribed from an online meetup speech of the Doris community, aiming to share the practice of Apache Doris in Xiaomi.

Author: ZuoWei, OLAP Engineer, Xiaomi

About Xiaomi

Xiaomi Corporation (“Xiaomi” or the “Group”; HKG:1810), a consumer electronics and smart manufacturing company with smartphones and smart hardware connected by an Internet of Things (IoT) platform. In 2021, Xiaomi’s total revenue amounted to RMB328.3 billion(USD472,231,316,200), an increase of 33.5% year-over-year; adjusted net profit was RMB22.0 billion(USD3,164,510,800), an increase of 69.5% year-over-year.

Due to the growing need of data analysis, Xiaomi Group introduced Apache Doris in 2019. As one of the earliest users of Apache Doris, Xiaomi Group has been deeply involved in the open-source community. After three years of development, Apache Doris has been widely used in dozens of business departments within Xiaomi, such as Advertising, New Retail, Growth Analysis, Dashboards, UserPortraits, AISTAR, Xiaomi Youpin. Within Xiaomi, a data ecosystem has been built around Apache Doris.

At present, Apache Doris already has dozens of clusters in Xiaomi, with an overall scale of hundreds of virtual machines . Among them, the largest single cluster reaches nearly 100 nodes, with dozens of real-time data synchronization tasks. And the largest daily increment of a single table rocket to 12 billion, supporting PB-level storage. And a single cluster can support more than 20,000 multi-dimensional analysis queries per day.

Architecture Evolution

The original intention of Xiaomi to introduce Apache Doris is to solve the problems encountered in user behavior analysis. With the development of Xiaomi’s Internet business, the demand for growth analysis using user behavior data is becoming stronger and stronger. If each business branch builds its own growth analysis system, it will not only be costly, but also inefficient. Therefore, if there is a product that can help them stop worrying about underlying complex technical details, relevant business personnel can focus on their own technical work. In this way, it can greatly improve work efficiency. Therefore, Xiaomi Big Data and the cloud platform jointly developed the growth analysis system called Growing Analytics (referred to as GA), which aims to provide a flexible multi-dimensional real-time query and analysis platform, which can manage data access and query solutions in a unified way, and help business branches to refine operation.

Previous Architecture

The growth analysis platform project was established in mid-2018. At that time, based on the consideration of development time and cost, Xiaomi reused various existing big data basic components (HDFS, Kudu, SparkSQL, etc.) to build a growth analysis query system based on Lambda architecture. The architecture of the first version of the GA system is shown in the figure below, including the following aspects:

  • Data Source: The data source is the front-end embedded data and user behavior data.

  • Data Access: The buried point data is uniformly cleaned and written into Xiaomi’s internal self-developed message queue, and the data is imported into Kudu through Spark Streaming.

  • Storage: Separate hot and cold data in the storage layer. Hot data is stored in Kudu, and cold data is stored in HDFS. At the same time, partitioning is carried out in the storage layer. When the partition unit is day, part of the data will be cooled and stored on HDFS every night.

  • Compute and Query: In the query layer, use SparkSQL to perform federated queries on the data on Kudu and HDFS, and finally display the query results on the front-end page.

At that time, the first version of the growth analysis platform helped us solve a series of problems in the user operation process, but there were also two problems:

Problem 1: scattered components

Since the historical architecture is based on the combination of SparkSQL + Kudu + HDFS, too many dependent components lead to high operation and maintenance costs. The original design is that each component uses the resources of the public cluster, but in practice, it is found that during the execution of the query job, the query performance is easily affected by other jobs in the public cluster, and query jitter is prone to occur, especially when reading data from the HDFS public cluster , sometimes slower.

Problem 2: high resource consumption

When querying through SparkSQL, the latency is relatively high. SparkSQL is a query engine designed based on a batch processing system. In the process of exchanging data shuffle between each stage, it still needs to be placed on the disk, and the delay in completing the SQL query is relatively high. In order to ensure that SQL queries are not affected by resources, we ensure query performance by adding machines. However, in practice, we find that there is limited room for performance improvement. This solution cannot make full use of machine resources to achieve efficient queries. A certain waste of resources.

In response to the above two problems, our goal is to seek an MPP database that integrates computing and storage to replace our current storage and computing layer components. After technical selection, we finally decided to use Apache Doris to replace the older generation of historical architecture.

New Choice

Popular MPP-based query engines such as Impala and Presto, can efficiently support SQL queries, but they still need to rely on Kudu, HDFS, Hive Metastore and other storage system, which increase the operation and maintenance costs. At the same time, due to the separation of storage and compute, the query engine cannot easily find the data changes in the storage layer, resulting in bad performance in detailed query optimization. If you want to cache at the SQL layer, you cannot guarantee that the query results are up-to-date.

Apache Doris is a top-level project of the Apache Foundation. It is mainly positioned as a high-performance, real-time analytical database, and is mainly used to solve reports and multi-dimensional analysis. It integrates Google Mesa and Cloudera Impala technologies. We conducted an in-depth performance tests on Doris and communicated with the community many times. And finally, we determined to replace the previous computing and storage components with Doris.

New Architecture Based on Apache Doris

The new architecture obtains event tracking data from the data source. Then data is ingested into Apache Doris. Query results can be directly displayed in the applications. In this way, Doris has truly realized the unification of computing, storage, and resource management tools .

We chose Doris because:

  • Doris has excellent query performance and can meet our business needs.

  • Doris supports standard SQL, and the learning cost is low.

  • Doris does not depend on other external components and is easy to operate and maintain.

  • The Apache Doris community is very active and friendly, crowded with contributors. It is easier for further versions upgrades and convenient for maintenance.

Query Performance Comparison between Apache Doris & Spark SQL

Note: The comparison is based on Apache Doris V0.13

We selected a business model with an average daily data volume of about 1 billion, and conducted performance tests on Doris in different scenarios, including 6 event analysis scenarios, 3 retention analysis scenarios, and 3 funnel analysis scenarios. After comparing it with the previous architecture(SparkSQL+Kudu+HDFS), we found out:

  • In the event analysis scenario, the average query time was reduced by 85%.

  • In the scenarios of retention analysis and funnel analysis, the average query time was reduced by 50%.

Real Practice

Below we will introduce our experience of data import, data query, A/B test in the business application of Apache Doris.

Data Import

Xiaomi writes data into Doris mainly through Stream Load, Broker Load and a small amount of data by Insert. Usually data is generally written into the message queue first, which is divided into real-time and offline data.

How to write real-time data into Apache Doris:

After part of real-time data processed by Flink, they will be written into Doris through Flink-Doris-Connector provided by Apache Doris. The rest of the data is written through Spark Streaming. The bottom layer of these two writing approaches both rely on the Stream Load provided by Apache Doris.

How to write offline data into Apache Doris:

After offline data is partially written into Hive, they will be written into Doris through Xiaomi’s data import tool. Users can directly submit Broker Load tasks to the Xiaomi’s data import tool and import data directly into Doris, or import data through Spark SQL, which relies on the Spark-Doris-Connector provided by Apache Doris. Spark Doris Connector is actually the encapsulation of Stream Load.

Data Query

Users can query after data import is done. Inside Xiaomi, we query through our data platform. Users can perform visual queries on Doris through Xiaomi’s data platform, and conduct user behavior analysis and user portrait analysis. In order to help our teams conduct event analysis, retention analysis, funnel analysis, path analysis and other behavioral analysis, we have added corresponding UDF (User Defined Function) and UDAF (User Defined Aggregate Function) to Doris.

In the upcoming version 1.2, Apache Doris adds the function of synchronizing metadata through external table, such as Hive/Hudi/Iceberg and Multi Catalog tool. External table query improves performance, and the ability to access external tables greatly increases ease of use. In the future, we will consider querying Hive and Iceberg data directly through Doris, which builds an architecture of datalake.

A/B Test

In real business, the A/B test is a method of comparing two versions of strategies against each other to determine which one performs better. A/B test is essentially an experiment where two or more variants of a page are shown to users at random, and statistical analysis. It is popular approach used to determine which variation performs better for a given conversion goal. Xiaomi’s A/B test platform is an operation tool product that conducts the A/B test with experimental grouping, traffic splitting, and scientific evaluation to assist in decision making. Xiaomi’s A/B test platform has several query applications: user deduplication, indicator summation, covariance calculation, etc. The query types will involve Count (distinct), Bitmap, Like, etc.

Apache Doris also provides services to Xiaomi’s A/B test platform. Everyday, Xiaomi’s A/B test platform needs to process a temendous amount of data with billions of queries. That’s why Xiaomi’s A/B test platform is eager to improve the query performance.

Apache Doris V1.1 released just in time and has fully supported vectorization in the processing and storage. Compared with the non-vectorized version, the query performance has been significantly improved. It is time to update Xiaomi’s Doris cluster to the latest version. That’s why we first launched the latest vectorized version of Doris on Xiaomi’s A/B test platform.

Test before Launch

Note: The following tests are based on Apache Doris V1.1.2

We built a test cluster for Apache Doris V1.1.2, which is as big as that of the Xiaomi online Apache Doris V0.13 version, to test before the vectorization version goes online. The test is divided into two aspects: single SQL parrellel query test and batch SQL concurrent query test.

The configurations of the two clusters are exactly the same, and the specific configuration information is as follows:

  • Scale: 3 FEs + 89 virtual machines

  • CPU: Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz 16 cores 32 threads × 2

  • Memory: 256GB

  • Disk: 7.3TB × 12 HDD

Single SQL Parellel Query Test

We choose 7 classic queries in the Xiaomi A/B test. For each query, we limited the time range to 1 day, 7 days, and 20 days for testing, where the daily partition data size is about 3.1 billion (the data volume is about 2 TB). The test results are shown in the figures:

The Apache Doris V1.1.2 has at least 3~5 times performance improvement compared to the Xiaomi online Doris V0.13, which is remarkable.

Optimization

Note: The following tests are based on Apache Doris V1.1.2

Based on Xiaomi’s A/B test business data, we tuned Apache Doris V1.1.2 and conducted concurrent query tests on the tuned Doris V1.1.2 and Xiaomi’s online Doris V0.13. The test results are as follows.

Optimization in Test 1

We choose user deduplication, index summation, and covariance calculation query(the total number of SQL is 3245) in the A/B test to conduct concurrent query tests on the two versions. The single-day partition data of the table is about 3.1 billion (the amount of data is about 2 TB) and the query will be based on the latest week’s data. The test results are shown in the figures:

Compared with Apache Doris V0.13, the overall average latency of Doris V1.1.2 is reduced by about 48%, and the P95 latency is reduced by about 49%. In this test, the query performance of Doris V1.1.2 was nearly doubled compared to Doris V0.13.

Optimization in Test 2

We choose 7 A/B test reports to test the two versions. Each A/B test report is corresponded to two modules in Xiaomi A/B test platform and each module represents thousands of SQL query. Each report submits query tasks to the cluster where the two versions reside at the same concurrency. The test results are shown in the figure:

Compared with Doris V0.13, Doris V1.1.2 reduces the overall average latency by around 52%. In the test, the query performance of Doris V1.1.2 version was more than 1 time higher than that of Doris V0.13.

Optimization in Test 3

To verify the performance of the tuned Apache Doris V1.1.2 in other cases, we choose the Xiaomi user behavior analysis to conduct concurrent query performance tests of Doris V1.1.2 and Doris V0.13. We choose behavior analysis query for 4 days on October 24, 25, 26 and 27, 2022. The test results are shown in the figures:

Compared with Doris V0.13, the overall average latency of Doris V1.1.2 has been reduced by about 77%, and the P95 latency has been reduced by about 83%. In this test, the query performance of Doris V1.1.2 version is 4~6 times higher than that of Doris V0.13.

Conclusion

Since we adopted Apache Doris in 2019, Apache Doris has currently served dozens of businesses and sub-brands within Xiaomi, with dozens of clusters and hundreds of nodes. It completes more than 10,000 user online analysis queries every day and is responsible for most of the online analysis in Xiaomi.

After performance test and tuning, Apache Doris V1.1.2 has met the launch requirements of the Xiaomi A/B test platform and does well in query performance and stability. In some cases, it even exceeds our expectations, such as the overall average latency being reduced by about 77% in our tuned version.

Meanwhile, some functions have in the above been released in Apache Doris V1.0 or V1.1, some PRs have been merged into the community Master Fork and should be released soon. Recently the activity of the community has been greatly enhanceed. We are glad to see that Apache Doris has become more and more mature, and stepped forward to an integrated datalake. We truly believe that in the future, more data analysis will be explored and realized within Apache Doris.

Best Practices

Guide: This article discusses the exploration and practice of the search engine team in JD.com using Apache Flink and Apache Doris in real-time data analysis. The popularity of stream computing is increasing day by day: More papers are published on Google Dataflow; Apache Flink has become the one of the most popular engine in the world; There is wide application of real-time analytical databases more than ever before, such as Apache Doris; Stream computing engines are really flourishing. However, no engine is perfect enough to solve every problem. It is important to find a suitable OLAP engine for the business. We hope that JD.com's practice in real-time OLAP and stream computing may give you some inspiration.

Author: Li Zhe, data engineer of JD.com, who focused on offline data, stream computing and application development.

About JD.com

JD.com (NASDAQ: JD), a leading e-commerce company in China, had a net income of RMB 951.6 billion in 2021. JD Group owns JD Retail, JD Global, JD Technology, JD Logistics, JD Cloud, etc. Jingdong Group was officially listed on the NASDAQ Stock Exchange in May 2014.

JD Search Box's Requirement: Real-time Data Analysis

JD search box, as the entrance of the e-commerce platform, provides a link betwee merchants and users. Users can express their needs through the search box. In order to better understand user intentions and quickly improve the conversion rate, multiple A/B tests are running online at the same time, which apply to multiple products. The category, organization, and brand all need to be monitored online for better conversion. At present, JD search box demands real-time data in application mainly includes three parts:

  1. The overall data of JD search box.
  2. Real-time monitoring of the A/B test.
  3. Top list of hot search words to reflect changes in public opinion. Words trending can reflect what users care

The analysis mentioned above needs to refine the data to the SKU-level. At the same time, we also undertake the task of building a real-time data platform to show our business analysists different real-time stream computing data.

Although different business analysists care about different data granularity, time frequency, and dimensions, we are hoping to establish a unified real-time OLAP data warehouse and provide a set of safe, reliable and flexible real-time data services.

At present, the newly generated exposure logs every day reach hundreds of millions. The logs willl increase by 10 times if they are stored as SKU. And they would grow to billions of records if based on A/B test. Aggregation queries cross multi-dimension require second-level response time.

Such an amount of data also brings huge challenges to the team: 2 billion rows have been created daily; Up to 60 million rows need to be imported per minute; Data latency should be limited to 1 minute; MDX query needs to be executed within 3 seconds; QPS has reached above 20. Yet a new reliable OLAP database with high stability should be able to respond to priority 0 emergency.

The Evolution of the Real-time Architecture

Our previous architecture is based on Apache Storm for a point-to-point data processing. This approach can quickly meet the needs of real-time reports during the stage of rapid business growth in the early days. However, with the continuous development of business, disadvantages gradually appear. For example, poor flexibility, poor data consistency, low development efficiency and increased resource costs.

In order to solve the problems of the previous architecture, we first upgraded the architecture and replaced Apache Storm with Apache Flink to achieve high throughput. At the same time, according to the characteristics of the search data, the real-time data is processed hierarchically, which means the PV data flow, the SKU data flow and the A/B test data flow are created. It is expected to build the upper real-time OLAP layer based on the real-time flow.

When selecting OLAP database, the following points need to be considered:

  1. The data latency is at minute-level and the query response time is at second-level
  2. Suppots standard SQL, which reduces the cost of use
  3. Supports JOIN to facilitate adding dimension
  4. Traffic data can be deduplicated approximately, but order data must be exact deduplicated
  5. High throughput with tens of millions of records per minute and tens of billions of new records every day
  6. Query concurrency needs to be high because Front-end may need it

By comparing the OLAP engines that support real-time import , we made an in-depth comparison among Apache Druid, Elasticsearch, Clickhouse and Apache Doris:

We found out that Doris and Clickhouse can meet our needs. But the concurrency of Clickhouse is low for us, which is a potential risk. Moreover, the data import of Clickhouse has no TRANSACTION and cannot achieve Exactly-once semantics. Clickhouse is not fully supportive of SQL.

Finally, we chose Apache Doris as our real-time OLAP database. For user behavior log data, we use Aggregation Key data table; As for E-commerce orders data, we use Unique Key data table. Moreover, we split the previous tasks and reuse the logic we tried before. Therefore, when Flink is processing, there will be new topic flow and real-time flow of different granularities generated in DWD. The new architecture is as follows:

In the current technical architecture, flink task is very light. Based on the production data detail layer, we directly use Doris to act as the aggregation layer function. And we ask Doris to complete window calculation which previously belongs to Flink. We also take advantage of the routine load to consume real-time data. Although the data is fine-grained before importing, based on the Aggregation Key, asynchronous aggregation will be automatically performed. The degree of aggregation is completely determined by the number of dimensions. By creating Rollup on the base table, double-write or multi-write and pre-aggregate operations are performed during import, which is similar to the function of materialized view, which can highly aggregate data to improve query performance.

Another advantage of using Kafka to directly connect to Doris at the detail layer is that it naturally supports data backtracking. Data backtracking means that when real-time data is out of order, the "late" data can be recalculated and the previous results can be updated. This is because delayed data can be written to the table whenever it arrives. The final solution is as follows:

Optimization during the Promotion

As mentioned above, we have established Aggregation Key of different granularities in Doris, including PV, SKU, and A/B test granularity. Here we take the exposure A/B test model with the largest amount of daily production data as an example to explain how to support the query of tens of billions of records per day during the big promotion period.

Strategy we used:

  • Monitoring: 10, 30, 60 minutes A/B test with indicators, such as exposure PV, UV, exposure SKU pieces, click PV, click UV and CTR.
  • Data Modeling: Use exposed real-time data to establish Aggregation Key; And perform HyperLogLog approximate calculation with UV and PV

Clusters we had:

  • 30+ virtual machines with storage of NVMe SSD
  • 40+ partitions exposed by A/B test
  • Tens of billions of new data are created every day
  • 2 Rollups

Benefits overall:

  • Bucket Field can quickly locate tablet partition when querying
  • Import 600 million records in 10 minutes
  • 2 Rollups have relatively low IO, which meet the requirement of the query

Look Ahead

JD search box introduced Apache Doris in May 2020, with a scale of 30+ BEs, 10+ routine load tasks running online at the same time. Replacing Flink's window computing with Doris can not only improve development efficiency, adapt to dimension changes, but also reduce computing resources. Apache Doris provides unified interface services ensuring data consistency and security. We are also pushing the upgrade of JD search box's OLAP platform to the latest version. After upgrading, we plan to use the bitmap function to support accurate deduplication operations of UV and other indicators. In addition, we also plan to use the appropriate Flink window to develop the real-time stream computing of the aggregation layer to increase the richness and completeness of the data.

Best Practices

Guide: The topic of this sharing is the application of Apache Doris in NIO, which mainly includes the following topics:

  1. Introduction about NIO
  2. The Development of OLAP in NIO
  3. Apache Doris-the Unified OLAP Data warehouse
  4. Best Practice of Apache Doris on CDP Architecture
  5. Summery and Benefits

Author:Huaidong Tang, Data Team Leader, NIO INC

About NIO

NIO Inc. (NYSE: NIO)is a leading company in the premium smart electric vehicle market. Founded in November 2014, NIO designs, develops, jointly manufactures and sells premium smart electric vehicles, driving innovations in autonomous driving, digital technologies, electric powertrains and batteries.

Recently, NIO planned to enter the U.S. market alongside other western markets by the end of 2025. The company has already established a U.S. headquarters in San Jose, California, where they started hiring people.

The Architecture Evolution of OLAP in NIO

The architectural evolution of OLAP in NIO took several steps for years.

1. Introduced Apache Druid

At that time, there were not so many OLAP storage and query engines to choose from. The more common ones were Apache Druid and Apache Kylin. There are 2 reasons why we didn't choose Kylin.

  • The most suitable and optimal storage at the bottom of Kylin is HBase and adding it would increase the cost of operation and maintenance.

  • Kylin's precalculation involves various dimensions and indicators. Too many dimensions and indicators would cause great pressure on storage.

We prefer Druid because we used to be users and are familiar with it. Apache Druid has obvious advantages. It supports real-time and offline data import, columnar storage, high concurrency, and high query efficiency. But it has downsides as well:

  • Standard protocols such as JDBC are not used

  • The capability of JOIN is weak

  • Significant performance downhill when performing dedeplication

  • High in operation and maintenance costs, different components have separate installation methods and different dependencies; Data import needs extra integration with Hadoop and the dependencies of JAR packages

2. Introduced TiDB

TiDB is a mature datawarehouse focused on OLTP+OLAP, which also has distinctive advantages and disadvantages:

Advantage:

  • OLTP database, can be updated friendly

  • Supports detailed and aggregated query, which can handle dashboard statistical reports or query of detailed data at the same time

  • Supports standard SQL, which has low cost of use

  • Low operation and maintenance cost

Disadvantages:

  • It is not an independent OLAP. TiFlash relies on OLTP and will increase storage. Its OLAP ability is insufficient

  • The overall performance should be measured separately by each scene

3. Introduced Apache Doris

Since 2021, we have officially introduced Apache Doris. In the process of selection, we are most concerned about various factors such as product performance, SQL protocol, system compatibility, learning and operation and maintenance costs. After deep research and detailed comparison of the following systems, we came to the following conclusions:

Apache Doris, whose advantages fully meet our demands:

  • Supports high concurrent query (what we concerned most)

  • Supports both real-time and offline data

  • Supports detailed and aggregated query

  • UNIQ model can be updated

  • The ability of Materialized View can greatly speed up query efficiency

  • Fully compatible with the MySQL protocol and the cost of development is relatively low

  • The performance fully meets our requirements

  • Lower operation and maintenance costs

Moreover, there is another competitor, Clickhouse. Its stand-alone performance is extremely strong, but its disadvantages are hard to accept:

  • In some cases, its multi-table JOIN is weak

  • Relatively low in concurrency

  • High operation and maintenance costs

With multiple good performances, Apache Doris outstands Druid and TiDB. Meanwhile Clickhouse did not fit well in our business, which lead us to Apache Doris.

Apache Doris-the Unified OLAP Datawarehouse

This diagram basically describes our OLAP Architecuture, including data source, data import, data processing, data warehouse, data service and application.

1. Data Source

In NIO, the data source not only refers to database, but also event tracking data, device data, vehicle data, etc. The data will be ingested into the big data platform.

2. Data Import

For business data, you can trigger CDC and convert it into a data stream, store it in Kafka, and then perform stream processing. Some data that can only be passed in batches will directly enter our distributed storage.

3. Data Processing

We took the Lambda architecture rather than stream-batch integration.

Our own business determines that our Lambda architecture should be divided into two paths: offline and real-time:

  • Some data is streamed.

  • Some data can be stored in the data stream, and some historical data will not be stored in Kafka.

  • Some data requires high precision in some circumstances. In order to ensure the accuracy of the data, an offline pipeline will recalculate and refresh the entire data.

4. Data Warehouse

From data processing to the data warehouse, we did not adopt Flink or Spark Doris Connector. We use Routine Load to connect Apache Doris and Flink, and Broker Load to connect Doris and Spark. The data generated in batches by Spark will be backed up to Hive for further use in other scenarios. In this way, each calculation is used for multiple scenarios at the same time, which greatly improves the efficiency. It also works for Flink.

5. Data Service

What behind Doris is One Service. By registering the data source or flexible configuration, the API with flow and authority control is automatically generated, which greatly improves flexibility. And with the k8s serverless solution, the entire service is much more flexible.

6. Application

In the application layer, we mainly deploy some reporting applications and other services.

We mainly have two types of scenarios:

  • User-oriented , which is similar to the Internet, contains a data dashboard and data indicators.

  • Car-oriented , car data enters Doris in this way. After certain aggregation, the volume of Doris data is about billions. But the overall performance can still meet our requirements.

Best Practice of Apache Doris on CDP Architecture

1. CDP Architecture

Next, let me introduce Doris' practice on the operating platform. This is what happens in our real business. Nowadays, Internet companies will make their own CDP, which includes several modules:

  • Tags , which is the most basic part.

  • Target , based on tags, select people according to some certain logic.

  • Insight , aiming at a group of people, clarify the distribution and characteristics of the group.

  • Touch , use methods such as text messages, phone calls, voices, APP notifications, IM, etc. to reach users, and cooperate with flow control.

  • Effect analysis, to improve the integrity of the operation platform, with action, effect and feedback.

Doris plays the most important role here, including: tags storage, groups storage, and effect analysis.

Tags are divided into basic tags and basic data of user behavior. We can flexibly customize other tags based on those facts. From the perspective of time effectiveness, tags are also divided into real-time tags and offline tags.

2. Considerations for CDP Storage Selection

We took five dimensions into account when we select CDP storage.

(1) Unification of Offline and Real-time

As mentioned earlier, there are offline tags and real-time tags. Currently we are close to quasi-real-time. For some data, quasi-real-time is good enough to meet our needs. A large number of tags are still offline tags. The methods used are Doris's Routine Load and Broker Load.

ScenesRequirementsApache Doris's Function
Real-time tagsReal-time data updatesRoutine Load
Offline tagsHighly efficient batch importBroker Load
Unification of offline and real-timeUnification of offline and real-time data storageRoutine Load and Broker Load update different columns of the same table

In addition, on the same table, the update frequency of different columns is also different. For example, we need to update the user's identity in real time because the user's identity changes all the time. T+1's update does not meet our needs. Some tags are offline, such as the user's gender, age and other basic tags, T+1 update is sufficient to meet our standards. The maintenance cost caused by putting the tags of basic users on the same table is very low. When customizing tags later, the number of tables will be greatly reduced, which benefits the overall performance.

(2) Efficient Targets

When users tags are done, is time to target right group of people. The target is to filter out all the people who meet the conditions according to different combinations of tags. At this time, there will be queries with different combinations of tag conditions. There was an obvious improvement when Apache Doris upgraded to vectorization.

ScenesRequirementsApache Doris's Function
Complex Condition TargetsHighly efficient combination of tagsOptimization of SIMD

(3) Efficient Polymerization

The user insights and effect analysis statistics mentioned above require statistical analysis of the data, which is not a simple thing of obtaining tags by user ID. The amount of data read and query efficiency have a great impact on the distribution of our tags, the distribution of groups, and the statistics of effect analysis. Apache Doris helps a lot:

  • Data Partition. We shard the data by time order and the analysis and statistics will greatly reduce the amount of data, which can greatly speed up the efficiency of query and analysis.

  • Node aggregation. Then we collect them for unified aggregation.

  • Vectorization. The vectorization execution engine has significant performance improvement.

ScenesRequirementsApache Doris's Function
Distribution of Tags ValuesThe distribution values ​​of all tags need to be updated every day. Fast and efficient statistics are requiredData partition lessens data transfer and calculation
Distribution of GroupsSame as AboveUnified storage and calculation, each node aggregates first
Statistics for Performance AnalysisSame as AboveSpeed up SIMD

(4) Multi-table Association

Our CDP might be different from common CDP scenarios in the industry, because common CDP tags in some scenarios are estimated in advance and no custom tags, which leaves the flexibility to users who use CDP to customize tags themselves. The underlying data is scattered in different database tables. If you want to create a custom tag, you must associate the tables.

A very important reason we chose Doris is the ability to associate multiple tables. Through performance tests, Apache Doris is able to meet our requirements. And Doris provides users with powerful capabilities because tags are dynamic.

ScenesRequirementsApache Doris's Function
Distributed Characteristics of the PopulationThe distribution of statistical groups under a certain characteristicTable Association
Single TagDisplay tags

(5) Query Federation

Whether the user is successfully reached or not will be recorded in TiDB. Notifications during operations may only affect user experience. If a transaction is involved, such as gift cards or coupons, the task execution must be done without repetition. TiDB is more suitable for this OLTP scenario.

But for effect analysis, it is necessary to understand the extent to which the operation plan is implemented, whether the goal is achieved and its distribution. It is necessary to combine task execution and group selection for analysis, which requires the query association between Doris and TiDB.

The size of the tag is probably small, so we would like to save it into Elasticsearch. However, it proves us wrong later.

ScenesRequirementsApache Doris's Function
Effect Analysis Associated with Execution DetailsDoris query associated with TiDBQuery Association with other databases
Group Tags Associated with Behavior AggregationDoris query associated with Elasticsearch

Summery and Benefits

  1. bitmap. Our volume are not big enough to test its full efficiency. If the volume reaches a certain level, using bitmap might have a good performance improvement. For example, when calculating UV , bitmap aggregation can be considered if the full set of Ids is greater than 50 million.

  2. The performance is good when Elasticsearch single-table query is associated with Doris.

  3. Better to update columns in batches. In order to reduce the number of tables and improve the performance of the JOIN table, the table designed should be as streamlined as possible and aggregated as much as possible. However, fields of the same type may have different update frequencies. Some fields need to be updated at daily level, while others may need to be updated at hourly level. Updating a column alone is an important requirement. The solution from Apache Doris is to use REPLACE_IF_NOT_NULL. Note: It is impossible to replace the original non-null value with null. You can replace all nulls with meaningful default values, such as unknown.

  4. Online Services. Apache Doris serves online and offline scenarios at the same time, which requires high resource isolation.

Best Practices

Guide: In 2019, AISPEECH built a real-time and offline datawarehouse based on Apache Doris. Reling on its flexible query model, extremely low maintenance costs, high development efficiency, and excellent query performance, Apache Doris has been used in many business scenarios such as real-time business operations, AI chatbots analysis. It meets various data analysis needs such as device portrait/user label, real-time operation, data dashboard, self-service BI and financial reconciliation. And now I will share our experience through this article.

Author|Zhao Wei, Head Developer of AISPEECH's Big Data Departpment

Backgrounds

AISPEECH is a professional conversational artificial intelligence company in China. It has full-link intelligent voice and language technology. It is committed to becoming a platform-based enterprise for full-link intelligent voice and language interaction. Recently it has developed a new generation of human-computer interaction platform DUI and artificial intelligence chip TH1520, providing natural language interaction solutions for partners in many industry scenarios such as Internet of Vehicles, IoT, government affairs and fintech.

Aspire introduced Apache Doris for the first time in 2019 and built a real-time and offline data warehouse based on Apache Doris. Compared with the previous architecture, Apache Doris has many advantages such as flexible query model, extremely low maintenance cost, high development efficiency and excellent query performance. Multiple business scenarios have been applied to meet various data analysis needs such as device portraits/user tags, real-time operation of business scenarios, data analysis dashboards, self-service BI, and financial reconciliation.

Architecture Evolution

Offline data analysis in the early business was our main requirement. Recently, with the continuous development of business, the requirements for real-time data analysis in business scenarios have become higher and higher. The early datawarehouse architecture failed to meet our requirements. In order to meet the higher requirements of business scenarios for query performance, response time, and concurrency capabilities, Apache Doris was officially introduced in 2019 to build a real-time and offline integrated datawarehouse architecture.

In the following I will introduce the evolution of the AISPEECH Data Warehouse architecture, and share the reasons why we chose Apache Doris to build a new architecture.

Early Data Warehouse Architecture

As shown in the architecture diagram below, the offline data warehouse is based on Hive + Kylin while the real-time data warehouse is based on Spark + MySQL.

There are three main types of data sources in our business, business databases such as MySQL, application systems such as K8s container service logs, and logs of automotive T-Box. Data sources are first written to Kafka through various methods such as MQTT/HTTP protocol, business database Binlog, and Filebeat log collection. In the early time, the data will be divided into real-time and offline links after passing through Kafka. Real-time part has a shorter link. The data buffered by Kafka is processed by Spark and put into MySQL for further analysis. MySQL can basically meet the early analysis requirements. After data cleaning and processing by Spark, an offline datawarehouse is built in Hive, and Apache Kylin is used to build Cube. Before building Cube, it is necessary to design the data model in advance, including association tables, dimension tables, index fields, and aggregation functions. After construction through the scheduling system, we can finally use HBase to store the Cube.

Pain Points of Early Architecture:

  1. There are many dependent components. Kylin strongly relies on Hadoop and HBase in versions 2.x and 3.x. The large number of application components leads to low development efficiency, many hidden dangers of architecture stability, and high maintenance costs.

  2. The construction process of Kylin is complicated and the construction task always fail. When we do construction for Kylin, we always need to do the following: widen tables, de-duplicate columns, generate dictionaries, build cubes, etc. If there are 1000-2000 or more tasks per day, at least 10 or more tasks will fail to build, resulting in a lot of time to write automatic operation and maintenance scripts.

  3. Dimension/dictionary expansion is heavy. Dimension expansion refers to the need for multiple analysis conditions and fields in some business scenarios. If many fields are selected in the data analysis model without pruning, it will lead to severe cube dimension expansion and longer construction time. Dictionary inflation means that in some scenarios, it takes a long time to do global accurate deduplication, which will make the dictionary construction bigger and bigger, and the construction time will become longer and longer, resulting in a continuous decline in data analysis performance.

  4. The data analysis model is fixed and low in flexibility. In the actual application, if a calculation field or business scenario is changed, some or even all of the data needs to be backtracked.

  5. Data detail query is not supported. The early data warehouse architecture could not provide detailed data query. The official Kylin solution is to relate to Presto for detailed query, which introduces another architecture and increases development costs.

Architecture Selection

In order to solve the problems above, we began to explore other datawarehouse architecture solutions. And we conducted a series of research on OLAP engines such as Apache Doris and Clickhouse, which are most widely used in the market.

As the original creator, SelectDB provides commercial support and services for Apache Doris. With the new Apache Doris, SelectDB is now providing global users with a fully-managed database option for deployment.

Comparing with ClickHouse's heavy maintenance, various table types, and lack of support for associated queries, Apache Doris performed better. And combined with our OLAP analysis scenario, we finally decided to introduce Apache Doris.

New Data Warehouse Architecture

As shown in the figure above, we built a new real-time + offline data warehouse architecture based on Apache Doris. Unlike the previous architecture, real-time and offline data are processed separately and written to Apache Doris for analysis.

Due to some historical reasons, data migration is difficult. The offline data is basically consistent with the previous datawarehouse architecture, and it is entirely possible to directly build an offline data warehouse on Apache Doris.

Comparing with the earlier architecture, the offline data is cleaned and processed by Spark, which is possible to build data warehouse in Hive. Then the data stored in Hive can be written to Apache Doris through Broker Load. What I want to explain here is that the data import speed of Broker Load is very fast and it only takes 10-20 minutes to import 100-200G data into Apache Doris on a daily basis.

When it comes to the real-time data flow, the new architecture uses Doris-Spark-Connector to consume data in Kafka and write it to Apache Doris after simple tasks. As shown in the architecture diagram, real-time and offline data are analyzed and processed in Apache Doris, which meets the business requirements of data applications for both real-time and offline.

Benefits of the New Architecture:

  1. Simplified operation, low maintenance cost, and does not depend on Hadoop ecological components. The deployment of Apache Doris is simple. There are only two processes of FE and BE. Both FE and BE processes can be scaled out. A single cluster supports hundreds of machines and tens of PB storage capacity. These two types of processes pass the consistency agreement to ensure high availability of services and high reliability of data. This highly integrated architecture design greatly reduces the operation and maintenance cost of a distributed system. The operation and maintenance time spent in the three years of using Doris is very small. Comparing with the previous architecture based on Kylin, the new architecture spends little time on operation and maintenance.

  2. The difficulty of developing and troubleshooting problems is greatly reduced. The real-time and offline unified data warehouse based on Doris supports real-time data services, interactive data analysis, and offline data processing scenarios, which greatly reduces the difficulty of troubleshooting.

  3. Apache Doris supports JOIN query in Runtime format. Runtime is similar to MySQL's table association, which is friendly to the scene where the data analysis model changes frequently, and solves the problem of low flexibility in the early structured data model.

  4. Apache Doris supports JOIN, aggregation, and detailed query at the same time. Meanwhile, it solves the problem that data details could not be queried in the previous architecture.

  5. Apache Doris supports multiple accelerated query methods. And it also supports rollup index, materialized view, and implements secondary index through rollup index to speed up query, which greatly improves query response time.

  6. Apache Doris supports multiple types of Query Federation. And it supports Federation Query analysis on data lakes such as Hive, Iceberg, and Hudi, and also databases such as MySQL and Elasticsearch.

Applications

Apache Doris was first applied in real-time business and AI Chatbots analysis scenarios in AISPEECH. This chapter will introduce the requirements and applications of the two scenarios.

Real-Time Business

As shown in the figure above, the technical architecture of the real-time operation business is basically the same as the new version of the data warehouse architecture mentioned above:

  • Data Source: The data source is consistent in the new version with the architecture diagram in the new version, including business data in MySQL, event tracking data of the application system, device and terminal logs.

  • Data Import: Broker Load is used for offline data import, and Doris-Spark-Connector is used for real-time data import.

  • Data Storage and Development: Almost all real-time data warehouses are built on Apache Doris, and some offline data is placed on Airflow to perform DAG batch tasks.

  • Data Application: The top layer is the business analysis requirements, including large-screen display, real-time dashboard for data operation, user portrait, BI tools, etc.

In real-time operation business, there are two main requirements for data analysis:

  • Due to the large amount of real-time imported data, the query efficiency requirement is high.

  • In this scenario, a team of 20+ people is in charge. The data operation dashboard needs to be opened at the same time, so there will be relatively high requirements for real-time writing performance and query concurrency.

AI Chatbots Analysis

In addition, the second application of Apache Doris in AISPEACG is a AI Chatbots analysis.

As shown in the figure above, different from normal BI cases, our users only needs to describe the data analysis needs by typing. Based on our company's NLP capabilities, AI Chatbots BI will convert natural language into SQL, which similar to NL2SQL technology. It should be noted that the natural language analysis used here is customized. Comparing with open source NL2SQL, the hit rate is high and the analysis is more precise. After the natural language is converted into SQL, the SQL will give Apache Doris query to get the analysis result. As a result, users can view detailed data in any cases at any time by typing. Compared with pre-computed OLAP engines such as Apache Kylin and Apache Druid, Apache Doris performs better for the following reasons:

  • The query is flexible and the model is not fixed, which supports customization.

  • It needs to support table association, aggregation calculation, and detailed query.

  • Response time needs to be fast.

Therefore, we have successfully implemented AI Chatbots analysis by using Apache Doris. At the same time, feedback on the application in our company is awesome.

Experience

Based on the above two scenarios, we have accumulated some experience and insights and I will share them with you now.

Datawarehouse Table Design:

  1. Tables which contain about tens of millions of data(for reference, related to the size of the cluster) is better to use the Duplicate table type. The Duplicate table type supports aggregation and detailed query at the same time, without additional detailed tables required.

  2. When the amount of data is relatively large, we suggest to use the Aggregate aggregation table type, build a rollup index on the aggregation table type, use materialized views to optimize queries, and optimize aggregation fields.

  3. When the amount of data is large with many associated tables, ETL can be used to write wide tables, imports to Doris, combined with Aggregate to optimize the aggregation table type. Or we suggest you use the official Doris JOIN optimization refer to: https://doris .apache.org/en-US/docs/dev/advanced/join-optimization/doris-join-optimization

Storage:

We use SSD and HDD to separate hot and warm data storage. Data within the past year is stored in SSD, and data more than one year is stored in HDD. Apache Doris supports setting cooling time for partitions. The current solution is to set automatic synchronization to migrate historical data from SSD to HDD to ensure that the data within one year is placed in on the SSD.

Upgrade

Make sure to back up the metadata before upgrading. You can also use the method of starting a new cluster to back up the data files to a remote storage system such as S3 or HDFS through Broker, and then import the previous cluster data into the new cluster through backup and recovery.

Performance Comparison

Aspire started using Apache Doris from version 0.12. This year we completed the upgrade from version 0.15 to the latest version 1.1, and conducted performance tests based on real business data.

As can be seen from the test report, among the 13 SQLs test in total, the performance difference of the first 3 SQLs after the upgrade is not obvious, because these 3 scenarios are mainly simple aggregation functions, which do not require high performance of Apache Doris. Version 0.15 can meet demand. In the scenario after Q4, SQL is more complex while Group By needs multiple fields, aggregation functions and complex functions. Therefore, the performance improvement after upgrading is obvious to see: the average query performance is 2- 3 times. We highly recommend that you upgrade to the latest version of Apache Doris.

Summary and Benefits

  1. Apache Doris supports the construction of offline plus real-time unified data warehouses. One ETL script can support both real-time and offline data warehouses, which greatly greatly improved efficiency, reduces storage costs, and avoids problems such as inconsistencies between offline and real-time indicators.

  2. Apache Doris 1.1.x version fully supports vectorization, which improves the query performance by 2-3 times compared with the previous version. After testing, the query performance of Apache Doris version 1.1.x in the wide table is equal to that of ClickHouse.

  3. Apache Doris is powerful and does not depend on other components. Compared with Apache Kylin, Apache Druid, ClickHouse, Apache Doris does not need a second component to fill the technical gap. Apache Doris supports aggregation, detailed queries, and associated queries. Currently, more than 90% of AISPEECH' analysis have migrated to Apache Doris. Thanks to this advantage, developers operate and maintain fewer components, which greatly reduces the cost of operation and maintenance.

  4. It is extremely easy to use, supporting MySQL protocol and standard SQL, which greatly reduces user learning costs.

Best Practices

Introduction: To meet the needs of rapid growth, Orange Connex officially introduced Apache Doris in 2022, and built a new data warehouse architecture with Apache Doris. During the process, various aspects such as service stability, query stability, and data synchronization were optimized. At the same time, a data platform based on Apache Doris has been established. Meanwhile, a lot of experience in use and optimization has been accumulated, and I will share it with you all.

Author | Fu Shuai | Head Developer of Orange Connex Big Data Dept.

Backgrounds

Orange Connex(NEEQ: 870366) is a technology company serving global e-commerce. It is committed to providing customers with logistics, finance, big data and other service products through market analysis, system research and development and resource integration, and providing high-quality, all-round solutions. As a partner of eBay, eBay fulfillment by Orange Connex offers an exceptional delivery, including next-day delivery, same-day handling and late cut-off times.

With the development of the company's business, the traditional data warehouse architecture based on MySQL in the early days has not been able to cope with the rapid growth of company data. Business and decision-making require strongly for data timeliness and real-time capabilities of data warehouses. To meet the needs of rapid growth, Orange Connex officially introduced Apache Doris in 2022, and built a new data warehouse architecture with Apache Doris. During the process, service stability, query stability, data synchronization and other aspects were optimized. Meanwhile, a data platform with Apache Doris as the core has been established, and a lot of experience in use and optimization has been accumulated, and I will share with you all.

Data Architecture Evolution

| Early data warehouse architecture

When the company just got started, the business is relatively small with a few data team members. The demand for data was limited to a small number of T + 1 customized reports Therefore, the early data warehouse architecture is quite simple. As shown in the figure below, MySQL is directly used to build the DM(Data Mart) to develop reports requiring T+1 data from demand side.

Existing Problems

  1. The use of MySQL for data analysis is increasingly unable to meet the requirements from the company’s expansion, burst of data volume and data timeliness.
  2. There is no division of data warehouses. The chimney-type development model has poor data reusability, high development costs, and cannot quickly respond to the needs of the business.
  3. Lack of control over data quality and metadata management.

| New Data Warehouse Infrastructure

To solve the increasingly prominent problems of the old architecture and adapt to the rapidly growing data and business needs, Apache Doris was officially introduced this year to build a new data warehouse infrastructure.

The Reason Why Apache Doris is Chosen:

Ease of Use - In the current application, the introduction of new technologies will face a large number of migration problems, so the product usability problem must be considered. Apache Doris is quite friendly to new users with low migration costs and maintenance costs:

  1. It adopts MySQL protocol and syntax, supports standard SQL, can access Doris through various client tools, and can seamlessly connect with BI tools
  2. It can query multiple tables with JOINs and provides a variety of optimization for JOIN in different scenarios
  3. The ecological expansion is great. It could either efficiently batch import offline data or real-time import online streaming data.
  4. Compared with other popular OLAP databases in the industry, Apache Doris’s architecture is simpler, there are only two processes, FE(Frontend) and BE(Backend). And it does not depend on any third-party systems or tools.
  5. It supports elastic scaling and is very friendly to deployment, operation and maintenance.

Performance - There are many JOIN operations in multi-tables, which have extremely high requirements on query performance of multi-table JOIN operations and real-time query. Apache Doris is implemented based on MPP architecture and comes with an efficient columnar storage engine, which can support:

  1. Data pre-aggregation and automatic update of pre-aggregation results
  2. Real-time update of data
  3. High concurrent query Based on the above reasons, we finally chose to build a new data warehouse with Apache Doris.

Introduction of the Architecture

The data warehouse architecture of Apache Doris is quite simple, does not depend on Hadoop components, and has low construction and maintenance costs.

As shown in the above architecture diagram, we have 4 types of data sources: business data MySQL, file system CSV, event tracking data and third-party system API; For different needs, different data import methods are used, for example: We uses Doris Stream Load to do file data import; we use DataX Doriswriter for data initialization; we use Flink Doris Connector for real-time data synchronization; And in data storage and computing layers, we use Doris. When we design the layers for Doris, we adopt ODS (Operation Data Store data, also known as the source layer), detail layer DWD, middle layer DWM, service layer DWS, application layer ADS as our layer design idea. Layered data after ODS schedules Doris SQL through Dolphin Scheduler for incremental and full data updates. Finally, the upper-layer data application uses the one-stop data service platform, which can be seamlessly connected with Apache Doris to provide data application services such as self-service analytics reports, self-service data retrieval, data dashbord, and user behavior analysis.

The data warehouse architecture solution based on Apache Doris can support both offline and real-time application scenarios, and the real-time Apache Doris data warehouse can cover more than 80% of business scenarios. This architecture greatly reduces R&D costs and improves development efficiency.

Of course, there are also some problems and challenges in the process of architecture construction, and we have optimized the problems accordingly.

Apache Doris Metadata Management and Data Lineage Implementation Scheme

Before there is no metadata management and data lineage, we often encounter some problems. For example, we want to find an indicator, but we don't know which table the indicator is in. We can only find relevant developers to confirm. Of course, there are also developers who forget the indicator. Location and logical case. Therefore, it can only be confirmed through layer-by-layer screening, which is very time-consuming.

Previously, we put information such as the hierarchical division of the table, indicators, and person in charge in the Excel table. This maintenance method is difficult to ensure its integrity, and it is also difficult to maintain. When the data warehouse needs to be optimized, it is impossible to confirm which tables can be reused and which tables can be merged. When the table structure needs to be changed, or the logic of the indicator needs to be modified, it is impossible to determine whether the change will affect the downstream tables.

We often receive complaints from users. Next, we will introduce how to solve these problems through metadata management and data lineage analysis solutions.

| Solutions

Metadata management and data lineage revolve around Apache Doris, while integrating DolphinScheduler's metadata. In the above figure, the table on the right is the techical metadata business, the data services that metadata indicators and data lineage analysis can be provided.

We divide metadata into two categories: technical metadata and business metadata:

  • Attribute information and scheduling information of technical metadata maintenance table
  • Business metadata maintains the caliber and normative information agreed in the application process of data

Data lineage implements table-level lineage and field-level lineage:

  • Table-level lineage supports rough table relationships and cross-layer reference analysis
  • Field-level lineage supports fine-grained impact analysis Next, we will introduce the architecture and working principles of metadata management and data lineage.

| Architecture

Metadata Management and Data Lineage Implementation Solution

  • Data collection: use the audit log plug-in Doris Audit Plugin provided by Apache Doris for data collection

  • Data storage: Customized development of the audit log plug-in, using Kafka to store Doris audit log data

  • Bloodline parsing: Doris SQL parsing using Druid

  • Blood relationship storage: use Nebula Graph to store lineage data

  • Business metadata: Because business metadata often occurs CRUD, MySQL is used to store business metadata information

  • Search data: Use ElasticSearch to store data lineage query indexes and search index data for tables

    Next, we will introduce the four components of the architecture: audit log collection and cleaning services, data lineage analysis services, metadata information integration services, and application interface services.

Collection/Cleaning Service for Apache Doris Audit Logs

Considering that if the data cleaning logic is placed in the audit log plugin, when the data cleaning logic changes, data omission may occur, which will affect bloodline analysis and metadata management. So we decouple the process of audit log plugin data collection and data cleaning. After the transformation, the audit log plugin can format the audit log data and send the data to Kafka. The data cleaning service first adds data rearrangement logic to the cleaning logic, and reorders the data sent by multiple audit log plugins to solve the problem of data disorder. Secondly, convert non-standard SQL into standard SQL. Although Apache Doris supports MySQL protocol and standard SQL syntax, there are some table building statements and SQL query syntax that are different from standard SQL. Therefore, we make non-standard SQL converted into MySQL standard statements. Finally the data can be sent to ES and Kafka.

Data Lineage Analysis Service

The data lineage analysis service uses Druid to parse Doris SQL, and recursively obtains the data between tables and fields through the Druid abstract syntax tree layer by layer. Finally, the data lineage is encapsulated and sent to the graph database, and the linage analysis query index is sent to ES. During analysis, technical metadata and business metadata are sent to the corresponding storage location.

Metadata Information Integration Service

The metadata information integration service draws on the architecture implementation of Metacat.

  • Connector Manager is responsible for creating metadata links between Apache Doris and DolphinScheduler, and supports subsequent extensions of other types of data source access.
  • Meta Service is responsible for the specific implementation of metadata information acquisition. Apache Doris metadata information is mainly obtained from the information Schema library, Restful API, and query results of SHOW SQL. The workflow metadata information and scheduling record information of DolphinScheduler are obtained from the DolphinScheduler Metabase.

API Service

We provide 3 types of APIs, data lineage API, metadata API and data behavior API.

  • The data lineage API provides query services for tables, fields, data linage, and impact analysis.
  • Metadata API provides metadata query and field search services.
  • The data behavior analysis API provides query services for table structure change records, data read and write records, and output information. The above is the entire content introduction of the overall scheme of metadata management and data lineage analysis architecture.

Summary

This year, we completed the construction of a real-time data warehouse with Apache Doris. After half a year of use and optimization, Apache Doris has become stable and can meet our production requirements.

  1. The new real-time data warehouse greatly improves data computing efficiency and data timeliness. Taking the On Time Delivery business scenario as an example, to calculate the aging change of a 1000w single-track node, it takes more than 2 hours to calculate before using Apache Doris, and the calculation consumes a lot of resources. the off-peak calculation can only be performed during the idle period; After Doris, it only takes 3 minutes to complete the calculation. The full-link logistics timeliness table, which was updated once a week, can now be updated with the latest data every 10 minutes, achieving real-time data timeliness.
  2. Thanks to the standardized SQL of Apache Doris, the difficulty of getting started is small, the learning cost is low, and all staff can participate in the migration of tables. The original table was developed using PowerBI, which requires a very in-depth understanding of PowerBI. The learning cost is high, and the development cycle is exceptionally long. Moreover, PowerBI does not use standard SQL, and the code readability is poor; Now it is based on Doris SQL and self-developed drag-and-drop. The development cost of tables plummeted, and the development cycle of most requirements dropped from weeks to days.

Future Plans

In the future, we will also continue to promote the construction of the data platform based on Apache Doris and continue to optimize the metadata management and the resolution rate of data lineage. Considering that data lineage is an application that everyone desires, we will consider contributing it to the community after optimization.

At the same time, we are starting to build a user behavior analysis platform, and we are also considering using Apache Doris as the core storage and computing engine. At present, the functions supported by Apache Doris in some analysis scenarios are not rich enough. For example, in the ordered window funnel analysis scenario, although Apache Doris supports the window_funnel function, the Array-related calculation functions required for the calculation of each layer of funnel transformation have not yet been supported. Fortunately, the upcoming Apache Doris 1.2 version will include the Array type and related functions. It is believed that Apache Doris will be implemented in more and more analysis scenarios in the future.

About the author: Fu Shuai, Big Data R&D manager, digital team of Orange Connex (China) Co., Ltd., responsible for the digital team's data platform and the application of the OLAP engine.