Top Menu

Jump to content
Home
    • Projects
    • Work packages
    • News
    • Getting started
    • Introduction video
      Welcome to OpenProject
      Get a quick overview of project management and team collaboration with OpenProject. You can restart this video from the help menu.

    • Help and support
    • Upgrade to Enterprise edition
    • User guides
    • Videos
    • Shortcuts
    • Community forum
    • Professional support

    • Additional resources
    • Data privacy and security policy
    • Digital accessibility (DE)
    • OpenProject website
    • Security alerts / Newsletter
    • OpenProject blog
    • Release notes
    • Report a bug
    • Development roadmap
    • Add and edit translations
    • API documentation
  • Sign in
      Forgot your password?
      Create a new account

Side Menu

  • Overview
  • Roadmap
  • Work packages
  • News
  • Wiki
    • Table of contents
      • Hierarchy leafAI Tools for Development
      • Hierarchy leafDeployment To the Server(Modified)
      • Hierarchy leafHosting BTCPayServer in Linux server
      • Hierarchy leafHow to overcome bitbucket login problem (Windows)
      • Expanded. Click to collapseCollapsed. Click to showInstalling a project in Elintegro
        • Hierarchy leafPipeline setUp for Bitbucket
      • Expanded. Click to collapseCollapsed. Click to showInterview University
        • Expanded. Click to collapseCollapsed. Click to showDocker: Install an Elintegro App within Linux server
          • Hierarchy leafConnect to Docker of Omnipay
      • Expanded. Click to collapseCollapsed. Click to showKafka Installation Guide
        • Expanded. Click to collapseCollapsed. Click to showComparing Kafka with other topic technologies
          • Hierarchy leafDesign a messaging app, high level, using kafka
      • Hierarchy leafOpen Project
      • Hierarchy leafRebase with Development Branch
      • Hierarchy leafRemote Debugger with IntelliJ
      • Hierarchy leafTelegram API
      • Hierarchy leafUpgrading Elintegro apps to Grails 6
You are here:
  • Kafka Installation Guide

Content

Kafka Installation Guide

  • More
    • Table of contents

ElintegroKafdrop bitbucket repo

Kafka Servers (Broker and Zookeeper) are installed in

Server Config

User root
IP Address 65.108.91.70
Pass xakmdWC9srJTpcdghjcN

First, we need to build and transfer the jar file into the server (path: /root/kafdrop).

Here's an example with scp command:

scp target/*.jar [email protected]:/root/kafdrop

Next, we need to ssh into our server with the following command:

ssh [email protected]

We need to go to the kafdrop directory and run the docker compose

cd /root/kafdrop

We need to create a Dockerfile:

FROM eclipse-temurin:17.0.10_7-jdk

WORKDIR /app

COPY kafdrop-4.0.2-SNAPSHOT.jar /app/

EXPOSE 9000

CMD ["java", "-jar", "kafdrop-4.0.2-SNAPSHOT.jar"]

We need to create a docker-compose.yaml file and should add the following code into the docker-compose.yaml

version: "2"
services:
  kafdrop:
    build:
      context: .
    container_name: elintegro-kafdrop
    restart: "no"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka:29092"
    depends_on:
      - "kafka"
  kafka:
    image: obsidiandynamics/kafka
    container_name: elintegro-kafka
    restart: "no"
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://65.108.91.70:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

Let’s run the docker compose using the following command

docker compose up -d

Here, -d is a flag to run the docker compose in a detach mode

If everything goes well, we will be able to access the kafdrop with ip address and the port address:

65.21.111.153:9000

We need to document the following

Kafka Appender Configuration with logback.xml

<configuration>
    <springProfile name="development">
        <!-- Console Appender -->
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
        <root level="error">
            <appender-ref ref="STDOUT" />
        </root>
    </springProfile>


    <springProfile name="!development">
        <!-- kafkaAppender -->
        <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
            <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
            <topic>test</topic>

            <!-- Optional parameter to use a fixed partition -->
            <!-- <partition>0</partition> -->

            <!-- Optional parameter to include log timestamps into the kafka message -->
            <!-- <appendTimestamp>true</appendTimestamp> -->

            <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
            <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
            <!-- bootstrap.servers is the only mandatory producerConfig -->
            <producerConfig>bootstrap.servers=localhost:9092</producerConfig>

            <!-- this is the fallback appender if kafka is not available. -->

        </appender>

        <root level="error">
            <appender-ref ref="kafkaAppender" />
        </root>
    </springProfile>
</configuration>

Kafka Appender with log4j2.xml (No, need if working with logback.xml)

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" name="spring-boot-kafka-log" packages="com.reloadly">
    <Appenders>
        <Kafka name="Kafka" topic="omnipaycoinatm-dev-logs">
            <PatternLayout pattern="%date %level %message"/>
            <Property name="bootstrap.servers">127.0.0.1:9092</Property>
        </Kafka>
        <Async name="Async">
            <AppenderRef ref="Kafka"/>
        </Async>

        <Console name="stdout" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5p [%-7t] %F:%L - %m%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="Kafka"/>
            <AppenderRef ref="stdout"/>
        </Root>
        <Logger name="org.apache.kafka" level="WARN" /><!-- avoid recursive logging -->
    </Loggers>
</Configuration>

build.gradle

compile "com.github.danielwegener:logback-kafka-appender:0.2.0-RC2"
compile "org.apache.kafka:kafka-clients:1.0.0"
compile "ch.qos.logback:logback-classic:1.2.3"
compile "ch.qos.logback:logback-core:1.2.3"

Note: comment the spring-boot-starter-logging (compile "org.springframework.boot:spring-boot-starter-logging")

Kafka Client to Send Message to a Topic

Kaftka Consumer to retrieve a message from a topic

I have a kafka topic and I want to use spring data stream to consume it - can you provide me a code example for this

Certainly! Below is an example of how you can use Spring Kafka with Spring Boot to consume messages from a Kafka topic using Spring Kafka's `@KafkaListener` annotation:

```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;

@SpringBootApplication
public class KafkaConsumerApplication {

public static void main(String[] args) {
       SpringApplication.run(KafkaConsumerApplication.class, args);
   }

@KafkaListener(topics = "your-topic-name")
   public void listen(String message) {
       System.out.println("Received message: " + message);
       // Add your business logic to process the message here
   }
}
```

Make sure you have the necessary dependencies in your `pom.xml` if you're using Maven:

```xml
<dependencies>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
   </dependency>
</dependencies>
```

And if you're using Gradle, in your `build.gradle`:

```gradle
dependencies {
   implementation 'org.springframework.boot:spring-boot-starter'
   implementation 'org.springframework.kafka:spring-kafka'
}
```

Replace `"your-topic-name"` with the name of your Kafka topic. This code sets up a Spring Boot application with a Kafka listener method annotated with `@KafkaListener`. This method will be invoked whenever a message is published to the specified Kafka topic, and the message payload will be passed to the `listen` method for processing.

Make sure you have your Kafka broker details properly configured in your `application.properties` or `application.yml` file. For example:

```properties
spring.kafka.bootstrap-servers=localhost:9092
```

This code assumes that your Kafka broker is running locally on the default port `9092`. Adjust the configuration according to your Kafka setup.

spring cloud stream example:

Sure! Spring Cloud Stream is another way to consume messages from Kafka with Spring Boot. It provides a higher level of abstraction compared to Spring Kafka.

First, make sure you have the necessary dependencies in your `pom.xml` if you're using Maven:

```xml
<dependencies>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-stream-binder-kafka</artifactId>
   </dependency>
</dependencies>
```

And if you're using Gradle, in your `build.gradle`:

```gradle
dependencies {
   implementation 'org.springframework.boot:spring-boot-starter'
   implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
}
```

Now, you can create a simple Spring Cloud Stream Kafka consumer application:

```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaConsumerApplication {

public static void main(String[] args) {
       SpringApplication.run(KafkaConsumerApplication.class, args);
   }

@StreamListener(Sink.INPUT)
   public void listen(String message) {
       System.out.println("Received message: " + message);
       // Add your business logic to process the message here
   }
}
```

In this example, `@EnableBinding(Sink.class)` enables the binding with the default input channel provided by Spring Cloud Stream. The `@StreamListener(Sink.INPUT)` annotation listens to messages from the input channel. When a message is received, the `listen` method is invoked for processing.

As before, make sure your Kafka broker details are properly configured in your `application.properties` or `application.yml` file:

```properties
spring.cloud.stream.bindings.input.destination=your-topic-name
spring.cloud.stream.bindings.input.group=your-consumer-group
spring.kafka.bootstrap-servers=localhost:9092
```

Replace `"your-topic-name"` with the name of your Kafka topic, and `"your-consumer-group"` with your consumer group. Adjust the Kafka broker configuration according to your setup.

With Spring Cloud Stream, you get the advantage of a higher level of abstraction and simpler configuration for Kafka consumers, making it easier to integrate Kafka with your Spring Boot applications.

How to create a new Broker

How to create a new Topic

Loading...