Constructing Kafka Producer With Spring Boot – DZone – Uplaza

On this article, I’m going to indicate you the way to construct Message Writer utilizing Apache Kafka and Spring Boot. First, we are going to speak about what Apache Kafka is. 

Apache Kafka is an open-source, distributed streaming platform designed for real-time occasion processing. It offers a dependable, scalable, and fault-tolerant strategy to deal with massive volumes of knowledge streams. Kafka lets you publish and subscribe to knowledge matters, making it very best for constructing event-driven functions, log aggregation, and knowledge pipelines. 

Stipulations

  1. Apache Kafka
  2. Java
  3. Apache Maven
  4. Any IDE (Intellij or STS or Eclipse)

Venture Construction

On this challenge, we are going to expose an endpoint to create a consumer and we are going to publish UserCreatedEvent to Kafka Subject. 

utility.yml file

spring:
  utility:
    identify: message-publisher

  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.frequent.serialization.StringSerializer
      value-serializer: org.springframework.kafka.help.serializer.JsonSerializer

app:
  topic_name: users-topic

server:
  port: 8089
  • spring.utility.identify is used to outline the appliance identify.
  • bootstrap-servers specifies the hostname and port variety of Kafka.

Serializer specifies which serializer must be used to transform Java object to bytes earlier than sending it to Kafka. Primarily based on key sort we will use StringSerializer or IntegerSerializer.

(Instance: org.apache.kafka.frequent.serialization.StringSerializer)

  • key-serializer is utilized in a state of affairs when the identical keys ought to go to the identical partition.
  • value-serializer specifies which serializer must be used to transform Java objects to bytes earlier than sending Kafka. If we’re utilizing a customized java class as worth, then we will use JSONSerializer as value-serializer.

pom.xml



 4.0.0
 
  org.springframework.boot
  spring-boot-starter-parent
  3.3.0
   
 
 com.lights5.com
 message-publisher
 0.0.1-SNAPSHOT
 message-publisher
 Demo challenge for Kafka Producer utilizing Spring Boot
 
  17
 
 
  
   org.springframework.boot
   spring-boot-starter-web
  
  
   org.springframework.kafka
   spring-kafka
  

  
   org.projectlombok
   lombok
   true
  
 

 
  
   
    org.springframework.boot
    spring-boot-maven-plugin
    
     
      
       org.projectlombok
       lombok
      
     
    
   
  
 

spring internet, spring kafka are required dependencies.

ApplicationConfiguration class

package deal com.lights5.com.message.writer;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "app")
public class AppConfig {

    personal String topicName;
}

This class is used to bind configuration values from utility.yml file to the respective fields. 

Utility class

package deal com.lights5.com.message.writer;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.shoppers.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;

@SpringBootApplication
@RequiredArgsConstructor
public class Utility {

 personal ultimate AppConfig appConfig;

 public static void major(String[] args) {
  SpringApplication.run(Utility.class, args);
 }

 @Bean
 NewTopic usersTopic() {

  return TopicBuilder.identify(appConfig.getTopicName())
    .partitions(3)
    .replicas(2)
    .construct();
 }
}

NewTopic Bean is used to create a subject if the subject doesn’t exist already on the Kafka dealer. We are able to configure the required variety of partitions and replicas as we want.

Mannequin Courses

Person class

package deal com.lights5.com.message.writer;

import java.time.LocalDateTime;

file Person (
        String firstName,
        String lastName,
        String electronic mail,
        Lengthy phoneNumber,
        Tackle tackle,
        LocalDateTime createdAt) {

    file Tackle (
            String metropolis,
            String nation,
            String zipcode) {

    }
}

EventType enum

package deal com.lights5.com.message.writer;

enum EventType {

    USER_CREATED_EVENT;
}

EventPayload class

package deal com.lights5.com.message.writer;

file EventPayload (
        EventType eventType,
        String payload) {

}

Endpoint to Create Person (UserController class)

package deal com.lights5.com.message.writer;

import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.internet.bind.annotation.*;

import static com.lights5.com.message.writer.EventType.USER_CREATED_EVENT;

@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/users")
class UsersController {

    personal ultimate UsersService usersService;

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public void createUser(@RequestBody Person consumer) {

        usersService.publishMessage(consumer, USER_CREATED_EVENT);
    }
}

UsersController class exposes the POST methodology to create a consumer, which in flip calls a way within the UsersService class. 

UsersService class

package deal com.lights5.com.message.writer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.frequent.serialization.StringSerializer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
class UsersService {

    personal ultimate AppConfig appConfig;
    personal ultimate ObjectMapper objectMapper;
    personal ultimate KafkaTemplate kafkaTemplate;

    public void publishMessage(Person consumer, EventType eventType) {

        attempt {

            var userCreatedEventPayload = objectMapper.writeValueAsString(consumer);
            var eventPayload = new EventPayload(eventType, userCreatedEventPayload);
            kafkaTemplate.ship(appConfig.getTopicName(), eventPayload);
        }
        catch (JsonProcessingException ex) {

            log.error("Exception occurred in processing JSON {}", ex.getMessage());
        }
    }
}

KafkaTemplate is used to ship messages to Kafka. Spring Boot autoconfigures KafkaTemplate and injects to the required class.

KafkaTemplate is of this type. Right here Okay is the important thing sort and V is the worth sort.

In our case secret is String sort and V is EventPayload class sort. So we have to use StringSerializer for the important thing and JsonSerializer (EventPayload is the customized Java class sort) for values.

kafkaTemplate.ship() methodology takes topicName as 1st parameter and knowledge to be revealed as 2nd argument.

Working Kafka in Native

To run this utility regionally, first, we have to run Kafka regionally after which begin the Spring Boot utility.

Please use this docker-compose file to run Kafka regionally.

model: '2.1'

providers:
  zoo1:
    picture: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    surroundings:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888


  kafka1:
    picture: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    surroundings:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.inside:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 5
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    depends_on:
      - zoo1

  kafka2:
    picture: confluentinc/cp-kafka:7.3.2
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    surroundings:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.inside:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 6
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    depends_on:
      - zoo1


  kafka3:
    picture: confluentinc/cp-kafka:7.3.2
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    surroundings:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.inside:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 7
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    depends_on:
      - zoo1

docker-compose -f up . Run this command within the listing the place the compose file is positioned.

The above command begins the Kafka regionally.

Testing Utilizing Postman

Endpoint: (POST methodology)

Payload

{
    "firstName": "John",
    "lastName": "Albert",
    "email": "johnalbert@gmail.com",
    "phoneNumber": "9999999999",
    "address": {
        "city": "NewYork",
        "country": "USA",
        "zipcode": "111111"
    },
    "createdAt": "2024-06-06T16:46:00"
}

You’ll be able to confirm utilizing kafka-console-consumer command whether or not the information is revealed or not.

Supply Code.

Conclusion

Spring Boot offers simple integration with Kafka and helps us create pub sub-model functions simply with minimal configurations. We are able to develop Microservices event-driven functions simply with Spring Boot and Kafka.

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version