On this article, I’m going to indicate you the way to construct Message Writer utilizing Apache Kafka and Spring Boot. First, we are going to speak about what Apache Kafka is.
Apache Kafka is an open-source, distributed streaming platform designed for real-time occasion processing. It offers a dependable, scalable, and fault-tolerant strategy to deal with massive volumes of knowledge streams. Kafka lets you publish and subscribe to knowledge matters, making it very best for constructing event-driven functions, log aggregation, and knowledge pipelines.
Stipulations
- Apache Kafka
- Java
- Apache Maven
- Any IDE (Intellij or STS or Eclipse)
Venture Construction
On this challenge, we are going to expose an endpoint to create a consumer and we are going to publish UserCreatedEvent
to Kafka Subject.
utility.yml file
spring:
utility:
identify: message-publisher
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.frequent.serialization.StringSerializer
value-serializer: org.springframework.kafka.help.serializer.JsonSerializer
app:
topic_name: users-topic
server:
port: 8089
spring.utility.identify
is used to outline the appliance identify.bootstrap-servers
specifies the hostname and port variety of Kafka.
Serializer specifies which serializer must be used to transform Java object to bytes earlier than sending it to Kafka. Primarily based on key sort we will use StringSerializer
or IntegerSerializer
.
(Instance: org.apache.kafka.frequent.serialization.StringSerializer)
key-serializer
is utilized in a state of affairs when the identical keys ought to go to the identical partition.value-serializer
specifies which serializer must be used to transform Java objects to bytes earlier than sending Kafka. If we’re utilizing a customized java class as worth, then we will use JSONSerializer as value-serializer.
pom.xml
4.0.0
org.springframework.boot
spring-boot-starter-parent
3.3.0
com.lights5.com
message-publisher
0.0.1-SNAPSHOT
message-publisher
Demo challenge for Kafka Producer utilizing Spring Boot
17
org.springframework.boot
spring-boot-starter-web
org.springframework.kafka
spring-kafka
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-maven-plugin
org.projectlombok
lombok
spring internet, spring kafka are required dependencies.
ApplicationConfiguration class
package deal com.lights5.com.message.writer;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "app")
public class AppConfig {
personal String topicName;
}
This class is used to bind configuration values from utility.yml file to the respective fields.
Utility class
package deal com.lights5.com.message.writer;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.shoppers.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
@SpringBootApplication
@RequiredArgsConstructor
public class Utility {
personal ultimate AppConfig appConfig;
public static void major(String[] args) {
SpringApplication.run(Utility.class, args);
}
@Bean
NewTopic usersTopic() {
return TopicBuilder.identify(appConfig.getTopicName())
.partitions(3)
.replicas(2)
.construct();
}
}
NewTopic Bean is used to create a subject if the subject doesn’t exist already on the Kafka dealer. We are able to configure the required variety of partitions and replicas as we want.
Mannequin Courses
Person class
package deal com.lights5.com.message.writer;
import java.time.LocalDateTime;
file Person (
String firstName,
String lastName,
String electronic mail,
Lengthy phoneNumber,
Tackle tackle,
LocalDateTime createdAt) {
file Tackle (
String metropolis,
String nation,
String zipcode) {
}
}
EventType enum
package deal com.lights5.com.message.writer;
enum EventType {
USER_CREATED_EVENT;
}
EventPayload class
package deal com.lights5.com.message.writer;
file EventPayload (
EventType eventType,
String payload) {
}
Endpoint to Create Person (UserController class)
package deal com.lights5.com.message.writer;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.internet.bind.annotation.*;
import static com.lights5.com.message.writer.EventType.USER_CREATED_EVENT;
@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/users")
class UsersController {
personal ultimate UsersService usersService;
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public void createUser(@RequestBody Person consumer) {
usersService.publishMessage(consumer, USER_CREATED_EVENT);
}
}
UsersController
class exposes the POST methodology to create a consumer, which in flip calls a way within the UsersService
class.
UsersService class
package deal com.lights5.com.message.writer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.frequent.serialization.StringSerializer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
class UsersService {
personal ultimate AppConfig appConfig;
personal ultimate ObjectMapper objectMapper;
personal ultimate KafkaTemplate kafkaTemplate;
public void publishMessage(Person consumer, EventType eventType) {
attempt {
var userCreatedEventPayload = objectMapper.writeValueAsString(consumer);
var eventPayload = new EventPayload(eventType, userCreatedEventPayload);
kafkaTemplate.ship(appConfig.getTopicName(), eventPayload);
}
catch (JsonProcessingException ex) {
log.error("Exception occurred in processing JSON {}", ex.getMessage());
}
}
}
KafkaTemplate
is used to ship messages to Kafka. Spring Boot autoconfigures KafkaTemplate
and injects to the required class.
KafkaTemplate
is of this type. Right here Okay
is the important thing sort and V
is the worth sort.
In our case secret is String sort and V is EventPayload
class sort. So we have to use StringSerializer
for the important thing and JsonSerializer
(EventPayload
is the customized Java class sort) for values.
kafkaTemplate.ship()
methodology takes topicName
as 1st parameter and knowledge to be revealed as 2nd argument.
Working Kafka in Native
To run this utility regionally, first, we have to run Kafka regionally after which begin the Spring Boot utility.
Please use this docker-compose file to run Kafka regionally.
model: '2.1'
providers:
zoo1:
picture: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
surroundings:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
picture: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
surroundings:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.inside:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 5
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zoo1
kafka2:
picture: confluentinc/cp-kafka:7.3.2
hostname: kafka2
container_name: kafka2
ports:
- "9093:9093"
- "29093:29093"
surroundings:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.inside:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 6
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zoo1
kafka3:
picture: confluentinc/cp-kafka:7.3.2
hostname: kafka3
container_name: kafka3
ports:
- "9094:9094"
- "29094:29094"
surroundings:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.inside:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 7
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zoo1
docker-compose -f up .
Run this command within the listing the place the compose file is positioned.
The above command begins the Kafka regionally.
Testing Utilizing Postman
Endpoint: (POST methodology)
Payload
{
"firstName": "John",
"lastName": "Albert",
"email": "johnalbert@gmail.com",
"phoneNumber": "9999999999",
"address": {
"city": "NewYork",
"country": "USA",
"zipcode": "111111"
},
"createdAt": "2024-06-06T16:46:00"
}
You’ll be able to confirm utilizing kafka-console-consumer
command whether or not the information is revealed or not.
Supply Code.
Conclusion
Spring Boot offers simple integration with Kafka and helps us create pub sub-model functions simply with minimal configurations. We are able to develop Microservices event-driven functions simply with Spring Boot and Kafka.