-
Data Logistics using Apache Kafka, Spark, and Spring Boot
This post summarizes how Apache Kafka works and demonstrates how it can be used to ingest data and transfer it to another system. It will therefore only highlight the fundamentals of Apache Kafka, its justification, the context of usage as well as using its Producer and Consumer API.
What is Apache Kafka?
Even though it was initially commonly described as a high throughput distributed messaging system but has evolved over the years to become a distributed streaming platform as described on its official website. Regardless of the adopted description, the most important thing is the fact that it is a platform that facilitates the transfer of data from one system to another rapidly in a scalable and reliable way, some sort of data logistics. It is therefore safe to conclude that Kafka is needed whenever the need arises to facilitate the transfer of data among multiple systems in a fast, reliable, and durable way.
The image above shows how Kafka works at the topmost level leaving out its two other APIs (Streams and Connectors). Multiple applications produce data in different structures and rates, pushing it to specified Kafka Topics and multiple applications also consume the pushed data from the same or different Topics.
Topics are :
- central Kafka abstraction as seen in the figure
- the named category for messages which are stored in a time-ordered sequence.
- a logical entity physically represented as a log.
The Topics reside in the Kafka Clusters which might contain one or more brokers with one or more partitions. Partitions are how Kafka achieves its high throughput by allowing the splitting of data for a topic so faster simultaneous writing can occur. There are key-hash, Round-Robin, Direct, or custom implementation approaches that can be used to split the data across available partitions for each topic. Too many partitions can lead to overhead on the Zookeeper side. Each Partition is maintained on one or more broker(s) depending on the replicator factor set. A broker is a typical Kafka server responsible for running the partition(s).
A typical message has three main parts: Timestamp – assigned by Kafka and used for ordering the sequence, a Unique Identifier for referencing each message, and the Payload which is the binary value of the message. A payload is a binary form for effective network, storage resources usage, and compression amidst other possible reasons. Apache Kafka was built on the foundation of Transaction or Commit Logs with the following objectives:
- High throughput
- Horizontal Scalability
- Reliability and Durability
- Loosely coupled producers and consumers
- flexible publish-subscribe semantics
Two factors worthy of mention when working with Kafka are the number of nodes to make up the cluster as well as the Replication Factor. The number of nodes in the cluster denotes the number of brokers that can work on tasks with one of them serving as the controller at a time and they can reside on the same or different machine. Replication Factor is responsible for the redundancy or duplication of the messages, cluster resiliency, and fault tolerance and it can also be configured on a per-topic basis. It is advisable that the replication factor should be set optimally, especially as regards network I/O resources usage.
It is good to point out that Kafka is able to achieve the distributed part of these objectives by relying on another Apache project by the name Zookeeper which helps maintain its clusters of nodes as regards configuration information, health status, and group membership.
Apache Zookeeper is a centralized service for maintaining configuration information, naming, proving, distributed synchronization and providing group services.
https://zookeeper.apache.org/Why Apache Kafka?
Before the emergence of Kafka, there were in existence large systems handling and transferring data in various ways; replication of data – for the relational database-powered systems, shipping of logs, custom extract-transform-load setups, traditional messaging, or a custom middle logic implementation.
However, the aforementioned approaches were plagued with issues like tight coupling as regards data schemas, technology type lock-in e.g RDBMS-To-RDBMS replication, limited scalability, the complexity of implementation, limited reliability, and performance issues. Addressing these issues was central to what made up the design objectives of Kafka.
Apache Kafka API
There are four core APIs present in the Kafka system and they are:
- Producer API – this is the API that allows an application(s) to push their data records to one or more Kafka topics.
- Consumer API – this is the API responsible for application(s) to subscribe and listen to existing one or more topics to pull data records.
- Streams API – this allows an application to act as a data stream processor.
- Connector API – this allows the building of reusable features (producer and consumer) that can connect Kafka topics to other applications of data systems.
Example of Data Logistics Use-Cases
- A farmer owns numerous farmlands across many states or time zones and there is a need to measure the weather conditions or any other entity in these farms in near real-time to make business and farming decisions.
- A Telecommunication marketing manager needs near real-time analytics on the pattern of both call trends as well as prepaid recharge patterns across millions of customer bases in other to make effective and data-driven decisions.
To install Kafka on your local machine, you can follow the instructions here
This sample code here is where I try to simulate the production of data using the IoT scenario. Simulated data produced are pushed to Kafka and streamed with Spark, transformed, and persisted to HBase in the preferred transformed format. It should be noted that the streaming aspect can also be achieved with Kafka and the persistence layer can be achieved using any preferred storage e.g HBase, Cassandra, MongoDB, or ElasticSearch.
- https://medium.com/startlovingyourself/design-lessons-learned-from-apache-kafka-af26d8d73332
- https://kafka.apache.org/intro
- https://insidebigdata.com/2018/04/12/developing-deeper-understanding-apache-kafka-architecture/
- https://blog.newrelic.com/engineering/kafka-best-practices/
PS: This post was migrated from my old blog. Its original version was published sometime in 2018.
-
How To Connect Java: Spring MVC to Neo4J Graph Database
This is a simple stepwise approach to show the basic steps involved in connecting a typical Java application to the Neo4J graph database. Your final output should look similar to what we have in the image below. The data displayed are the ones retrieved from Neo4J, the creation of the data is also covered later in this article.
The core tools used for this demo:
1. Maven for dependency management.
2. Spring MVC: for the core development
3. Neo4J-Java Driver for communicating with Neo4JStep 1. Import Sample Data into Neo4J on Windows
My preference for the Windows platform is basically because that is the OS of my development machine. I am going to be using a CSV data sample “book data” which you can download here. Place the data in CSV format inside the import folder in the Neo4J document directory (“…\Documents\Neo4j\default.graphdb\import\bookdatatest.csv”).
In a rare case, the import folder might be non-existent, then you will have to create it manually and copy the CSV file inside afterward. You will run the following Cypher query, which should automatically import the data into the Neo4J database on your running Neo4J platform. The community version of Neo4J is downloaded on the official website here[text] USING PERIODIC COMMIT LOAD CSV WITH HEADERS FROM "file:///Bookdatatest.csv" as row CREATE (:Books{AmazonID:row.AMAZON,FileName:row.Filename,ImageURL:row.IMAGEURL,Title:row.Title, AUTHOR:row.AUTHOR,CATEGORYID:row.CATEGORYID,CATEGORY:row.CATEGORY}) MERGE (a:Author {Name:row.AUTHOR}) MERGE (t:title {Name:row.TITLE}) MERGE (t)-[:WRITTENBY]-(a); [/text]Neo4J also allows the importation of data in diverse formats like JSON, CSV as seen above or a plain text file with a delimiter of preference.
Step 2: Download Neo4J- Java Driver.I am using the basic driver as it works just fine like others with little or no abstraction required at the user end. There are other forms of Java-compatible drivers as listed here, which can be used to achieve the same purpose based on the preferred style of coding. The following code snippet below shows the basic way to connect with Neo4J using the aforementioned driver.
[java] Driver driver = GraphDatabase.driver("bolt://localhost", AuthTokens.basic("username", "password")); Session session = driver.session(); StatementResult result = session.run("your cypher command"); while(result.hasNext){ //process the retrieved result } session.close(); driver.close(); [/java]- Supply your Neo4J URL and authentication credentials.
- Open a session and run your cypher command which should be in string format.
- Process the result returned by the ran session.
- Close the session and the connection.
Step 3. Write A Sample Query to retrieve and display the data
You can download the complete code from the tutorial branch of this repository.
The complete demo project in the repository is a simple maven web project and should, therefore, run seamlessly.
PS: This post was migrated from my old blog. Its original version was published sometime in 2018.
-
Container-based Virtualization of a Hadoop Cluster using Docker

Hadoop is a collection of open-source software utilities, a framework that supports the processing of large data sets in a distributed computing environment. Its core part consists of HDFS (Hadoop Distributed File System) which is used for storage, MapReduce which is used to define the data processing task and YARN which actually runs the data processing task and also serves as the resource negotiator for the cluster management. Hadoop was initially made up of the first two but later underwent restructuring which birthed YARN as an additional and separate framework. Distributed systems and computing are characterized among other things the following factors which generally lead to a lot of complexity regardless of their advantages:
- Requires coordinating all the nodes that make up the cluster
- Data partitioning
- Coordination of the computing tasks
- Handling fault tolerance and recovery
- Allocation of capacity to processes
A system of connected computers (generally referred to as the NODES) that work together as a single system is called a Cluster and if such a connected system is used for Hadoop then it will be termed a Hadoop Cluster. Hadoop Clusters are also an example of a “Shared nothing” distributed data systems architecture in that the only thing shared among the nodes is the network connecting them together and not any other resources.
Container-based or Containerization is the process of performing an operating-system-level virtualization by software thereby allowing the existence of multiple isolated user-space instances called Containers. Containerization offers other benefits:
- Abstraction of the host system away from the containerized application
- Easy scalability
- Simple dependency management and application versioning.
- Shared Layering
- Composability and predictability.
Core Components of a Hadoop Cluster
The hadoop cluster consists of three core components as shown in Figure 1 namely: the client, the master, and the slave. Their role is as described as follows:
The Client: This plays the role of loading data into the cluster, submitting the MapReduce jobs describing how the loaded data should be processed, and then retrieving the data afterward to see the response after job completion.
The Master: This consists of three further components: NameNode, Secondary NameNode, and JobTracker.
NameNode – It does not store the files but only the file’s metadata. It oversees the health of DataNode and coordinates access to the data stored in them. It keeps track of all the file system information such as: which section of the file is saved in which part of the cluster, the last access time for the file, and user permissions like which user can access which file.
JobTracker – coordinates the parallel processing of data using MapReduce.
Secondary NameNode: Its task is to contact the NameNode periodically. NameNode which keeps all filesystem metadata in RAM cannot process that metadata on the disk. So if NameNode crashes, you lose everything in RAM itself and you don’t have any backup of a filesystem. What the secondary node does is it contacts NameNode in an hour and pulls a copy of metadata information out of NameNode. It shuffles and merges this information into a clean file folder and sends it back again to NameNode while keeping a copy for itself. The secondary Node does the job of housekeeping. In case of NameNode failure, saved metadata can be rebuilt easily
The Slaves: These are the majority of the machines in a typical Hadoop Cluster setup and are responsible for storing the data and processing the computation as later shown in the use-cases demonstration. Each slave runs both a DataNode and Task Tracker daemon which communicates to their masters. The Task Tracker daemon is a slave to the JobTracker and the DataNode daemon a slave to the NameNode.
Use Case and Code Description
Two use cases: a simple one and a complex one, were considered during the examination which were both demonstrated to show the capabilities of the setup.
The simple use-case was the chart analysis for Daimler AG (DAI.DE) stocks using the Exponential Moving Average (EMA) indicator with the data from the Yahoo Finance platform ranging from October 30th, 1996 to May 11th, 2018. The data was preprocessed, filtering out the other parameters aside from the dates and the closing prices which are needed for the indicator estimation leaving the total data size be 372 kb.
The complex use case was the estimation of the top-10 vehicle type that received the highest number of tickets in the New York area in the 2017 fiscal year. The data which is about 2 GB in size was also preprocessed, filtering out all other parameters aside from the vehicle type needed for the estimation.
In addition to the reduce and map methods which were overridden in their respective classes in the Map-Reduce implementation of the estimation, the cleanup method was also overridden so as to sort the estimated output in descending order.
Setting up the cluster for the use cases
The default set-up configuration for the cluster was a 5-node setup with one master and 4 slaves, Java version 8 and Hadoop version 2.7.4 were used. However, scaling the setup is very easy by making use of the cluster resizing script provided.
In the setup folder made available together with this documentation, there exist four (4) script files namely: build-image, clean-image, resize-cluster, and start-container which are responsible for managing the container for the cluster. There is a folder “config” in the root directory which contains all the script files needed to configure the cluster, start the cluster, copy the map-reduce jar files needed for the use-cases, the data files needed, and execute the use-cases.
Lastly, in the root directory exists a file named “Dockerfile” which is primarily responsible for setting up the cluster container, downloading all the required dependencies as well as copying all the files in the “config” sub-directory from the host to the cluster-host (Docker environment). The files were named and comprehensively commented on to make understanding all the instructions-commands used therein and their purpose easier.
On any host machine that has Docker installed and running. Setting up and running the cluster host, the two use cases involved seven sequential steps which are highlighted as follows (all scripts are run via the terminal) and of which the most important ones are later extensively explained:
PS: This post was migrated from my old blog. Its original version was published sometime in 2018. -
Feign Clients: Handling Multiple Authorization Approach

In a typical application platform of any size, communication is either synchronous or asynchronous. For synchronous communication that is HTTP-based, there exist, numerous clients, to choose from. However, for the Java ecosystem, feign (https://github.com/OpenFeign/feign) provides an abstraction over this and offers among other benefits: multiple encoding, better error handling, logging, etc.
Feign offers a declarative way of making HTTP calls in Java and Spring Boot supports it as part of its Spring could project (https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/)
Scenario
In a situation where you have a set of feign clients to make calls to internal services – this means they arguably use the same authorization approach; we can tag this set as the primary ones since they will use the same configuration. We can tag the other (set of) feign clients used to communicate with third-party providers with a different authorization approach as the secondary client.
Together with spring security, it is possible to enable these clients for autoconfiguration in such a way that requests through the clients are intercepted and wrapped with the preferred authorization approach e.g., oauth2. This leads to the question of how to exclude the secondary client from using the (auto) configuration of the primary one.Solution
The logical way to go about this is to have a different configuration class for the to-be-excluded feign client and inside this class, add a bean to disable inheriting configuration from the parent one. If the authorization approach of the secondary client is not directly inclusive (not part of the headers/parameter), you might need to add a request interceptor bean in the config class.
public class CustomSecondaryFeignClientConfiguration{ @Bean public FeignClientConfigurer feignClientConfigurer() { return new FeignClientConfigurer() { @Override public boolean inheritParentConfiguration() { return false; } }; } }This was tested with Spring Boot v2.7.2, Spring Cloud v2021.0.5
-
Optimizing Scheduled Job Execution with Jobrunr and Shedlock

In a setup where multiple instances of an application are running simultaneously, it is imperative to make sure only one of those instances can start the execution of a scheduled job. However, in a scenario where the tasks of the scheduled job are not only heavy but also voluminous, it is important to find a way to distribute their execution to the multiple running instances. For example, a platform that is heavy on daily report generation, notification, and transfer to hundreds of thousands of users. This reporting will be a scheduled operation that needs to be concluded as fast as possible due to time constraints on the users’ business operation. There is, therefore, a need to trigger this scheduled job and preferably complete the execution within an acceptable duration. This is different from the use case described here.A feasible approach to handle the scenario described above is through the combination of Jobrunr and Shedlock. Jobrunr excels at executing fire-and-forget tasks of any scale while Shedlock makes sure that execution happens at most once at the same time. Shedlock ensures that only a single running instance of the application (out of as many as possible) can start a scheduled job while Jobrunr makes sure the tasks of the scheduled job are distributed to the instances if needed.
Implementation source code and instructions on how to run it: HereReferences:
- https://www.youtube.com/watch?v=2KFeeFuM9og
- Feature image by Jake Givens on Unsplash
Leave a comment