Content
ElintegroKafdrop bitbucket repo
Kafka Servers (Broker and Zookeeper) are installed in
Server Config
User | root |
IP Address | 65.108.91.70 |
Pass | xakmdWC9srJTpcdghjcN |
First, we need to build and transfer the jar file into the server (path: /root/kafdrop).
Here's an example with scp command:
scp target/*.jar [email protected]:/root/kafdrop
Next, we need to ssh into our server with the following command:
We need to go to the kafdrop directory and run the docker compose
cd /root/kafdrop
We need to create a Dockerfile:
FROM eclipse-temurin:17.0.10_7-jdk
WORKDIR /app
COPY kafdrop-4.0.2-SNAPSHOT.jar /app/
EXPOSE 9000
CMD ["java", "-jar", "kafdrop-4.0.2-SNAPSHOT.jar"]
We need to create a docker-compose.yaml file and should add the following code into the docker-compose.yaml
version: "2"
services:
kafdrop:
build:
context: .
container_name: elintegro-kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:29092"
depends_on:
- "kafka"
kafka:
image: obsidiandynamics/kafka
container_name: elintegro-kafka
restart: "no"
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://65.108.91.70:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
Let’s run the docker compose using the following command
docker compose up -d
Here, -d is a flag to run the docker compose in a detach mode
If everything goes well, we will be able to access the kafdrop with ip address and the port address:
65.21.111.153:9000
We need to document the following
Kafka Appender Configuration with logback.xml
<configuration>
<springProfile name="development">
<!-- Console Appender -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="error">
<appender-ref ref="STDOUT" />
</root>
</springProfile>
<springProfile name="!development">
<!-- kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<topic>test</topic>
<!-- Optional parameter to use a fixed partition -->
<!-- <partition>0</partition> -->
<!-- Optional parameter to include log timestamps into the kafka message -->
<!-- <appendTimestamp>true</appendTimestamp> -->
<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
<!-- bootstrap.servers is the only mandatory producerConfig -->
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
<!-- this is the fallback appender if kafka is not available. -->
</appender>
<root level="error">
<appender-ref ref="kafkaAppender" />
</root>
</springProfile>
</configuration>
Kafka Appender with log4j2.xml (No, need if working with logback.xml)
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" name="spring-boot-kafka-log" packages="com.reloadly">
<Appenders>
<Kafka name="Kafka" topic="omnipaycoinatm-dev-logs">
<PatternLayout pattern="%date %level %message"/>
<Property name="bootstrap.servers">127.0.0.1:9092</Property>
</Kafka>
<Async name="Async">
<AppenderRef ref="Kafka"/>
</Async>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5p [%-7t] %F:%L - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Kafka"/>
<AppenderRef ref="stdout"/>
</Root>
<Logger name="org.apache.kafka" level="WARN" /><!-- avoid recursive logging -->
</Loggers>
</Configuration>
build.gradle
compile "com.github.danielwegener:logback-kafka-appender:0.2.0-RC2"
compile "org.apache.kafka:kafka-clients:1.0.0"
compile "ch.qos.logback:logback-classic:1.2.3"
compile "ch.qos.logback:logback-core:1.2.3"
Note: comment the spring-boot-starter-logging (compile "org.springframework.boot:spring-boot-starter-logging")
Kafka Client to Send Message to a Topic
Kaftka Consumer to retrieve a message from a topic
I have a kafka topic and I want to use spring data stream to consume it - can you provide me a code example for this
Certainly! Below is an example of how you can use Spring Kafka with Spring Boot to consume messages from a Kafka topic using Spring Kafka's `@KafkaListener` annotation:
```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
@KafkaListener(topics = "your-topic-name")
public void listen(String message) {
System.out.println("Received message: " + message);
// Add your business logic to process the message here
}
}
```
Make sure you have the necessary dependencies in your `pom.xml` if you're using Maven:
```xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
```
And if you're using Gradle, in your `build.gradle`:
```gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
}
```
Replace `"your-topic-name"` with the name of your Kafka topic. This code sets up a Spring Boot application with a Kafka listener method annotated with `@KafkaListener`. This method will be invoked whenever a message is published to the specified Kafka topic, and the message payload will be passed to the `listen` method for processing.
Make sure you have your Kafka broker details properly configured in your `application.properties` or `application.yml` file. For example:
```properties
spring.kafka.bootstrap-servers=localhost:9092
```
This code assumes that your Kafka broker is running locally on the default port `9092`. Adjust the configuration according to your Kafka setup.
spring cloud stream example:
Sure! Spring Cloud Stream is another way to consume messages from Kafka with Spring Boot. It provides a higher level of abstraction compared to Spring Kafka.
First, make sure you have the necessary dependencies in your `pom.xml` if you're using Maven:
```xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
```
And if you're using Gradle, in your `build.gradle`:
```gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
}
```
Now, you can create a simple Spring Cloud Stream Kafka consumer application:
```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String message) {
System.out.println("Received message: " + message);
// Add your business logic to process the message here
}
}
```
In this example, `@EnableBinding(Sink.class)` enables the binding with the default input channel provided by Spring Cloud Stream. The `@StreamListener(Sink.INPUT)` annotation listens to messages from the input channel. When a message is received, the `listen` method is invoked for processing.
As before, make sure your Kafka broker details are properly configured in your `application.properties` or `application.yml` file:
```properties
spring.cloud.stream.bindings.input.destination=your-topic-name
spring.cloud.stream.bindings.input.group=your-consumer-group
spring.kafka.bootstrap-servers=localhost:9092
```
Replace `"your-topic-name"` with the name of your Kafka topic, and `"your-consumer-group"` with your consumer group. Adjust the Kafka broker configuration according to your setup.
With Spring Cloud Stream, you get the advantage of a higher level of abstraction and simpler configuration for Kafka consumers, making it easier to integrate Kafka with your Spring Boot applications.