Home>Articles>Real-time processing and aggregation of Call Detail Records (CDRs) using Kafka and Flink
women holding a phone with technology graphics and kafka and flink logo
Articles Popular

Real-time processing and aggregation of Call Detail Records (CDRs) using Kafka and Flink

With the exponential growth in data, the biggest problem plaguing the telecom industry is how to process and aggregate huge volumes of call detail records (CDRs).

Telcos are taking  fresh approaches to get the real-time value of streaming data. They are shifting from heavy batch processing systems to faster, scalable and resilient IT architectures.

However, integrating devices and machines to enable real-time data processing is complex and time-consuming. Kafka ecosystem, along with Kafka Connect, Kafka Streams and Flink, is a preferred technology for integration of various real-time data sources and aggregation. It is also relevant to IoT, banking/insurance transactions or any other large incoming data sets. In this article, we explore CDRs aggregation use case.

Use case: CDRs aggregation:

The CDR Generator generates call detail records with Spring Boot Application and publishes it to Kafka. The published CDRs are consumed by Flink application that aggregates call duration and data consumed per Mobile Station International Subscriber Directory Number (MSISDN) and publishes the aggregated data back to Kafka.

In the next step, CDRs and aggregated CDRs are consumed by Elastic Search. Kibana dashboard reads the data from Elastic Search and displays the result in the chart format.

 

CDR Aggregation Flow Scheme

 

 

The architecture above shows a cloud-native ecosystem with Kafka, Flink, Kafka Connect, Kafka Stream, Microservices etc. This architecture is consisting of four related parts:

  1. Kafka as the heart of the architecture,
  2. Microservices using Spring Boot,
  3. Flink application,
  4. Elastic Search
  5. Kibana

We are using Debezium Kafka, Zookeeper, an enriched Debezium Kafka Connect image as well as Elastic Search with Kibana for visualisation.

Instruction to install this Use Case

Prerequisites

Windows 10, 64or 32bit, MacOS or Linux

  • Docker (Desktop at Windows)
  • Postman

Set Memory for Docker to 8GB

snapshot of settings for CDR use case

 

Installation Step by Step

Run appropriate Services

Some useful commands to remember before start

docker-compose up
OR
docker-compose -f docker-compose.yml up > docker-composelog.log
docker-compose stop
docker-compose down

Please save the below attached docker-compose.yml  Zip file in one of your working directory. Then open a command prompt window and navigate to that working directory before running the commands mentioned.

docker-compose.yml

A.1 Run Docker Compose file for running all required services

docker-compose up

Below is the docker compose file sections with some explanation of the different services:

version: "2.1"
services:
# start the debezium zookeeperzookeeper:
image: debezium / zookeeper: 0.9
ports:
-2181: 2181 -
2888: 2888 -
3888: 3888
# start the debezium kafka and create the required topics for cdr_data_topic (inserting cdr data) and cdr_compressed_topic (inserting compressed cdr data) kafka:
image: debezium / kafka: 0.9
ports:
-9092: 9092
links:
-zookeeper
environment:
-ZOOKEEPER_CONNECT = zookeeper: 2181 -
CREATE_TOPICS = cdr_data_topic: 1: 2 -
CREATE_TOPICS = cdr_compressed_topic: 1: 2

# start telco call data record generator and pass the required environment parameters

telco_cdr_application:
image: technauraservices / technaura - telco - cdr - generator: 2.0
expose:
-"7777"
ports:
-"7777:7777"
command: telco_cdr_application
environment:
-server.port = 7777 -
cdr.file.path = C: \\temp\\ -
cdr.kafka.topic.request - topics = cdr_data_topic -
kafka.bootstrap - servers = kafka: 9092

# start Flink Jobmanager and execute the class com.technaura.FlinkCdrStreaming from the flink-aggregator jar file loaded in the image by passing the required arguements (–address kafka:9092 –inputtopic cdr_data_topic –outputtopic cdr_compressed_topic)

jobmanager:
image: technauraservices / technaura - telco - cdr - flink - aggregator: 1.0
expose:
-"6123"
ports:
-"8081:8081" -
"6123:6123"
command: job - cluster - Djobmanager.rpc.address = jobmanager--job - classname com.technaura.FlinkCdrStreaming--address kafka: 9092--inputtopic cdr_data_topic--outputtopic cdr_compressed_topic
environment:
-JOB_MANAGER_RPC_ADDRESS = jobmanager -
JOB_MANAGER = jobmanager

# start Flink Taskmanager that would run the tasks assigned by the jobmanager

taskmanager:
image: technauraservices / technaura - telco - cdr - flink - aggregator: 1.0
expose:
-"6121" -
"6122"
depends_on:
-jobmanager
command: task - manager - Djobmanager.rpc.address = jobmanager
links:
-"jobmanager:jobmanager"
environment:
-JOB_MANAGER_RPC_ADDRESS = jobmanager -
JOB_MANAGER = jobmanager

# start the latest elasticsearch which would receive the kafka messages by using the kafka-connect.

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:6.0.1
ports:
- 9200:9200
- 9300:9300
environment:
- discovery.type=single-node

# start the latest kibana which would receive the messages from elastic search and these messages can be used for analytics.

kibana:
image: docker.elastic.co / kibana / kibana: 6.0 .1
ports:
-5601: 5601
depends_on:
-elasticsearch
environment:
-"ELASTICSEARCH_URL=http://elasticsearch:9200"

# start kafka-connect that would publish the kafka messages to elastic search.

#Note: You can always docker pull the image from our docker hub. This customised Debezium connect image can additionally support Elastic Search (source and sink). This can be found under technauraservices/kafka-connect:1.0 image.

connect:
image: technauraservices / kafka - connect: 1.0
expose:
-"8083"
ports:
-8083: 8083
depends_on:
-zookeeper -
kafka -
elasticsearch
links:
-zookeeper -
kafka -
elasticsearch
environment:
-BOOTSTRAP_SERVERS = kafka: 9092 -
GROUP_ID = 1 -
CONFIG_STORAGE_TOPIC = my_connect_configs -
OFFSET_STORAGE_TOPIC = my_connect_offsets -
STATUS_STORAGE_TOPIC = my_connect_statuses

Check if the Elasticsearch is running

B.1 Type the below URL of elastic search in address bar of the browser.

http://localhost:9200/

The response would be like below:

Running response of elastic search graphic

Check if the Kibana is running

C.1 Type the URL mentioned below in browser address bar.

http://localhost:5601/

You get the following page:

Kibana page search result image

 

Check if the kafka connect is running

D.1 type the below URL of elastic search in address bar of the browser.

http://localhost:8083/connectors/

It should return the following:

[]

Checking for available services in Kafka connect using Postman

GET: localhost: 8083 / connectors / []
Should
return Status 200

Graphic on checking for available services in Kafka connect using Postman

Configure Pipelines

E.1 Creating a service with Kafka connect for elasticsearch-source with topic name “cdr_data_topic”, using Postman.

 

Paste the below json in the body in postman.

{
"name": "elastic-sink_data",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"tasks.max": "1",
"topics": "cdr_data_topic",
"connection.url": "http://elasticsearch:9200",
"type.name": "test-type",
"key.ignore": "true",
"schema.ignore": "true"
}
}

Click Send.

It is returning the following response:

{
"name": "elastic-sink_data",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"tasks.max": "1",
"topics": "cdr_data_topic",
"connection.url": "http://elasticsearch:9200",
"type.name": "test-type",
"key.ignore": "true",
"schema.ignore": "true",
"name": "elastic-sink_data"
},
"tasks": [],
"type": "sink"
}

E.2 Creating a service with Kafka connect for elasticsearch-source with topic name “cdr_compressed_topic”, using Postman.

Paste the below json in the body in postman.

{
"name": "elastic-sink_compressed",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"tasks.max": "1",
"topics": "cdr_compressed_topic",
"connection.url": "http://elasticsearch:9200",
"type.name": "test-type",
"key.ignore": "true",
"schema.ignore": "true"
}
}

Click Send.

It is returning the following response:

{
"name": "elastic-sink_compressed",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"tasks.max": "1",
"topics": "cdr_compressed_topic",
"connection.url": "http://elasticsearch:9200",
"type.name": "test-type",
"key.ignore": "true",
"schema.ignore": "true",
"name": "elastic-sink_compressed"
},
"tasks": [],
"type": "sink"
}

Now rechecking available Kafka connect services using postman, by doing a GET as below:

GET - localhost: 8083 / connectors /

Response is as shown below:

So far, we have completed the complete pipeline of Kafka to Elasticsearch with Kafka connect for the topics cdr_data_topic, cdr_compressed_topic.

Run the Telco CDR application (data generator) to start the generation of Test Data

F.1 Type the URL mentioned below in browser address bar to check if the Telco CDR application is running.

http://localhost:7777/swagger-ui.html#/sample-controller/testUsingGET

You get the following page:

G.1 Click on the controller

 

G.2 Click on GET button to get the following section

G.3 Click on Try it out button (top-right side) to get this section with the topic textbox.

 

G.4 Enter in the topic textbox “cdr_data_topic” and click Execute button (below the topic textbox)

G.5 Onclick of Execute the data generation is started and you would see the docker-compose window as follows (after a while)

Setup the Kibana

H.1 Type the URL mentioned below in browser address bar.

http://localhost:5601/

You get the following page:

H2. Type in the Index pattern textbox the topic name:”cdr_data_topic” then click on Create button.

 

 

You get the following page is as shown below:

H3. Click on Create Index Patter button to create another index on the page above.

H4. Type in the Index pattern textbox the topic name:”cdr_compressed_topic” then click on Create button.

 

You get the following page is as shown below:

Check if Data present in Kibana in the two index patterns created

H5. Click the Discover Menu on the left-hand side and sect the index pattern “cdr_data_topic”. Then you see the data that has arrived which was published to “cdr_data_topic” topic in kafka by the telco cdr application.

H5. Click the Discover Menu on the left-hand side and select the index pattern “cdr_compressed_topic”. Then you see the data that has arrived which was published to “cdr_compressed_topic” topic in kafka by the telco flink compression application.

 

 

Create Analytics in the Kibana

I.1 Click on the Visualise menu on the left-hand side.

 

I.2 Click on the Create Visualisation button to get the list of basic charts.

 

I.3 Click on the Pie Chart icon to the following page.

 

I.4 Now select the “cdr_compressed_topic” to get the following chart.

I.5 Now click on the Split Slices in the Buckets section. Then select Terms from the Aggregation dropdown box and also select the field payload.callType.keyword. Then click on the blue Arrow button on top-right.

You get the following changes to the chart. You can move your mouse on top of the chart to find out more details.

I.6 Now click on the Add sub-buckets at the bottom of the above page. Then click on the second Split Slices in the Buckets section that was created last. Then select Terms from the Aggregation dropdown box and also select the field payload.msisdn.keyword in the latest section. Then click on the blue Arrow button on top-right.

You get the following changes to the chart. You can move your mouse on top of the chart to find out more details.

I.7 Now click on the Add sub-buckets at the bottom of the above page. Then click on the third Split Slices in the Buckets section that was created last. Then select Terms from the Aggregation dropdown box and also select the field payload.duration in the latest section. Then click on the blue Arrow button on top-right.

You get the following changes to the chart. You can move your mouse on top of the chart to find out more details.

 

 

I.8 Similarly you can create a different Pie Chart with the third Split Slices in the Buckets section that was created last and select the field payload.bytesReceived in the latest section. Then click on the blue Arrow button on top-right. Then you would get the following chart.

Snapshot of CDR compressed topic in Kibana

Closing all the services

J.1 Run the following commands to bring down the docker-compose services.

docker - compose stop
OR
Ctrl - c

 

docker - compose down

Congrats. You are done. This is the complete implementation of the Telco Call Data Record Streaming and Aggregation real time using Kafka.

Green emoji with thumbs up

 

 

Written by Vinod Janardhan Nair.

Vinod is our contributing author. He is a Data Engineer at Technaura Systems GmbH.  When not writing, Vinod is busy building stream pipelines using Kafka, Akka, Flink, Spark with SQL and NoSQL databases.  

For similar articles, please read further:

Failure detection and alert from real-time data flow using Kafka and KSQL
Event Streaming and Apache Kafka in Telco Industry
What is event streaming? The next step for business
Real Time vs Batch Processing vs Stream Processing
Real-Time Data Processing
What are Call Detail Records (CDRs)?