전개발
article thumbnail

들어가며

Kafka는 분산 환경에서의 실시간 데이터 스트리밍을 위한 플랫폼으로, 주로 백엔드 아키텍처에 사용됩니다.

백엔드 진영에서는 주로 Java 언어에 기초한 Spring Boot의 Kafka 지원 모듈인 spring-kafka를 사용하거나, JavaScript 언어에 기초한 Node.js 환경에서 Kafka.js와 같은 라이브러리를 활용하여 Kafka로부터 데이터를 주고 받습니다.

대부분의 웹 애플리케이션들은 서버-클라이언트 아키텍처를 따르고 있습니다. 따라서, 사용자에게 화면을 보여주는 역할을 하는 프론트엔드에서는 Kafka에 저장된 메시지를 보여주기 위해, 별도의 API 서버가 필요한 것이 일반적입니다.

그렇다면, 프론트엔드에서 Kafka에 직접 접근할 수는 없을까요?

가능합니다. 프론트엔드와 Kafka를 연결하여 데이터를 가져온다면, API 서버를 거치지 않기에 통신에 대한 지연을 줄이거나 데이터 파이프를 단순화할 수 있을 것입니다. 하지만, 이는 Kafka 클러스터의 위치를 외부 네트워크에 유출하는 일이기에 심각한 보안 문제를 유발할 수도 있습니다.

그렇다면, 프론트엔드가 Kafka와 통신하려면 항상 이러한 보안 문제를 감수해야 하는걸까요? Next.js의 API Routes를 활용하여 통신 환경을 구성한다면 이러한 위험을 줄일 수 있습니다.

 

 

Routing: API Routes | Next.js

Next.js supports API Routes, which allow you to build your API without leaving your Next.js app. Learn how it works here.

nextjs.org

 

API Routes는 프론트엔드(클라이언트, 웹 브라우저)가 아닌 서버에서 실행되므로 클라이언트 측에 Kafka 클러스터의 위치를 노출하지 않고도 통신을 안전하게 처리할 수 있습니다.

 

이번 글에서는 단일 Kafka 클러스터를 구성하고, Next.js의 API Routes를 활용하여 Next.js 환경에서 Kafka와 직접 데이터를 주고 받는 방법에 대한 실습을 진행해보겠습니다.

 

실습

실습은 다음과 같은 가상 환경에서 진행했습니다.

  • Rocky Linux 9.4 (Blue Onyx)
  • CPU 4 코어, 메모리 8GB

단일 Kafka 클러스터 구성하기

먼저, Kafka를 설치할 가상 환경 서버에 ssh 접속합니다.

Kafka를 설치하기 앞서, Kafka는 Java로 작성되었기 때문에 실행을 위한 환경인 JDK (Java Development Kit)을 설치해줍니다. 이번 실습에서는 레드햇 계열의 Rocky Linux를 사용하기 때문에, dnf 명령어를 활용합니다.

(각 리눅스 배포판에 맞는 패키지 관리 시스템을 이용하여 진행해주세요.)

sudo dnf install java-1.8.0-openjdk.x86_64

이번 실습에서는 kafka 3.8.1 version을 사용합니다.

Kafka 패키지를 다운로드하고, 압축을 해제합니다.

(다른 버전으로 진행하려면, https://kafka.apache.org/downloads에서 Binary 파일을 다운로드 받아 진행해주세요.)

# kafka 패키지 다운로드
sudo wget https://dlcdn.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz

# tgz 압축 해제
sudo tar xvf kafka-3.8.1-src.tgz

압축 해제가 완료되었다면, Kafka 브로커가 원격 클라이언트와 통신할 수 있도록 server.properties 파일의 내용을 수정합니다.

listeners와 advertised.listeners에 값을 지정합니다.

이때, listeners는 Kafka 브로커가 어떤 네트워크 인터페이스와 포트에서 연결을 수락할 것인지를 지정하는 설정입니다. (합니다)

advertised.listeners는 Kafka 클라이언트가 브로커에 연결할 때, 어떤 IP 주소와 포트로 연결할지를 알려주는 설정입니다. (각 실습을 진행하는 서버의 주소를 입력해주세요)

# kafka 폴더로 이동
cd kafka_2.13-3.8.1/

# config 폴더의 server.properties 파일을 vi 편집기로 열어 수정
sudo vi ./config/server.properties
# 파일 내에 다음 내용 작성 및 수정 (최초에 주석 처리 되어 있음)

...

# 실습 편의를 위해 모든 네트워크 인터페이스를 허용하도록 입력
listeners=PLAINTEXT://:9092

# 실습하는 서버의 주소:9092를 입력
advertised.listeners=PLAINTEXT://172.17.19.134:9092 

...

설정은 모두 완료되었습니다.

kafka를 실행하려면, 먼저 Zookeeper를 실행해야합니다. Zookeeper는 분산 코디네이터를 제공하는 오픈소스로, Kafka의 분산 환경을 관리합니다. 이러한 Zookeeper는 Kafka 패키지에 기본적으로 내장되어 있습니다.

# zookeeper 기본 port 개방 (리눅스 배포판에 따라 상이, 레드햇 기준)
sudo firewall-cmd --add-port=2181/tcp

# zookeeper server 실행
./bin/zookeeper-server-start.sh ./config/zookeeper.properties

Zookeeper가 잘 실행되었다면 다음과 같이 로그들이 보여집니다.

이후, 새로운 터미널을 열고, Kafka 서버를 실행해줍니다.

# kafka 기본 port 개방 (리눅스 배포판에 따라 상이, 레드햇 기준)
sudo firewall-cmd --add-port=9092/tcp

# Kafka Server 실행
./bin/kafka-server-start.sh ./config/server.properties

마찬가지로, 잘 실행되었다면 다음과 같이 로그들이 보여집니다.

이로써 단일 Kafka 클러스터 환경 구성이 완료되었습니다.

Next.js 프로젝트 생성 및 구성

해당 프로젝트의 /kafka/producer 경로의 페이지는 Kafka 토픽에 데이터를 넣을 수 있고, /kafka/consumer 경로의 페이지에서는 test 토픽의 메시지를 2초마다 consuming 합니다.

아래 보여지는 GIF와 같은 최종 결과물인 웹 프로듀서와 웹 컨슈머를 구현해볼 예정입니다.

Next.js 프로젝트 구성은 별도의 가상 환경이 아닌, 로컬 환경에서 진행하였습니다.

실습에 필요한 Node.js는 로컬 환경에 설치되어 있음을 가정하겠습니다. (설치되어 있지 않다면 별도 설치해주시길 바랍니다)

가장 먼저, Next.js 프로젝트를 생성하겠습니다.

npx create-next-app@latest

프로젝트 경로로 이동하여, 다음과 같이 폴더를 구성합니다.

// root
...
src
  ㄴapp
    ㄴapi
      ㄴkafka
        ㄴconsumer
          - route.ts
        ㄴproducer
          - route.ts
    ㄴkafka
      ㄴconsumer
        - page.tsx
      ㄴproducer
        - page.tsx
...

앱 라우터(src/app/…) 하위에 API 요청을 처리하는 API Route (src/app/api/…)를 구성하였습니다. 각 경로는 route.ts의 작성 내용에 따라, /api/kafka/consumer와 /api/kafka/producer경로에 대한 요청을 처리하게 됩니다.

먼저, producer api route를 작성하겠습니다. 해당 경로에 매핑된 POST 요청은 topic, message 필드를 받아서 kafka에 producing 합니다.

// src/app/api/kafka/producer/route.ts

import { Kafka } from 'kafkajs';
import { NextResponse } from 'next/server';

// Kafka 클라이언트 초기화
const kafka = new Kafka({
  clientId: 'jeon', // 임의의 ID 지정
  brokers: ['172.17.19.134:9092'], // Kafka를 구성했던 서버 주소
});
// 프로듀서 인스턴스
const producer = kafka.producer();
// 메시지 전송 함수 (프로듀서와 연결, 메시지 전송, 연결 해제)
const produceMessage = async (topic: string, message: string) => {
  await producer.connect();
  await producer.send({
    topic,
    messages: [{ value: message }],
  });
  await producer.disconnect();
};
// /api/kafka/producer에 대한 POST 메서드 처리
export async function POST(req: Request) {
  const body = await req.json();
  const { topic, message } = body;
  if (!topic || !message) {
    return NextResponse.json({ error: '토픽과 메시지는 필수입니다.' }, { status: 400 });
  }
  try {
    await produceMessage(topic, message);
    return NextResponse.json({ message: 'Message sent successfully' });
  } catch (error) {
    console.error(error);
    return NextResponse.json({ error }, { status: 500 });
  }
}

다음으로, consumer api route를 작성하겠습니다. 해당 경로에 매핑된 GET 요청은 Kafka로부터 Consuming한 메시지 배열을 리턴합니다.

// src/app/api/kafka/consumer/route.ts

import { Kafka } from 'kafkajs';
import { NextResponse } from 'next/server';

// Kafka 클라이언트 초기화
const kafka = new Kafka({
  clientId: 'jeon',
  brokers: ['172.17.19.134:9092'],
});
// 컨슈머 인스턴스
const consumer = kafka.consumer({ groupId: 'my-group' });
const messages: string[] = []; // 메시지를 저장할 배열
let isConsumerRunning = false; // 컨슈머 실행 상태 플래그
// 컨슈머 시작 함수 (컨슈머 연결, 메시지 구독, 메시지 처리)
const startConsumer = async () => {
  if (isConsumerRunning) {
    console.log('컨슈머가 이미 실행 중입니다.');
    return;
  }
  isConsumerRunning = true;
  try {
    await consumer.connect();
    await consumer.subscribe({ topic: 'test', fromBeginning: true });
    await consumer.run({
      eachMessage: async ({ message }) => {
        messages.push(`Received message: ${message.value?.toString()}`);
      },
    });
  } catch (error) {
    console.error(error);
    isConsumerRunning = false;
  }
};
// 컨슈머 시작
startConsumer();
// /api/kafka/consumer에 대한 GET 메서드 처리
export async function GET() {
  // 메시지가 없을 경우, 빈 배열 반환
  if (messages.length === 0) {
    return NextResponse.json({ messages: [], status: '메시지 없음' });
  }
  // 카프카로부터 받아온 메시지가 들어있는 배열 반환
  return NextResponse.json({ messages });
}

마지막으로, 사용자에게 보여질 간단한 페이지를 구성하고, 방금 정의한 API Routes로 요청하는 코드를 작성하겠습니다.

// src/app/kafka/producer/page.tsx

'use client';

import { useState } from 'react';
export default function Page() {
  const [topic, setTopic] = useState(''); // 토픽 입력값
  const [message, setMessage] = useState(''); // 메시지 입력값
  const [response, setResponse] = useState<string | null>(null); // 응답 메시지 저장
  // Produce Message 버튼 클릭 핸들러
  const handleSubmit = async (e: React.FormEvent) => {
    // 기본 이벤트 방지 및 기존 응답 초기화
    e.preventDefault();
    setResponse(null);
    try {
      // /api/kafka/producer로 POST 요청
      const res = await fetch('/api/kafka/producer', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ topic, message }),
      });
      // 응답 데이터를 JSON으로 변환하여 상태 업데이트
      const data = await res.json();
      setResponse(data.message);
    } catch (error) {
      console.log(error);
    }
  };
  return (
    <div style={{ padding: '2rem' }}>
      <h1>Produce Kafka Message</h1>
      <form onSubmit={handleSubmit}>
        {/* Topic Input */}
        <div>
          <label>
            Topic:
            <input
              type="text"
              value={topic}
              onChange={e => setTopic(e.target.value)}
              required
              style={{ marginLeft: '1rem', marginBottom: '1rem' }}
            />
          </label>
        </div>
        {/* Message Input */}
        <div>
          <label>
            Message:
            <input
              type="text"
              value={message}
              onChange={e => setMessage(e.target.value)}
              required
              style={{ marginLeft: '1rem', marginBottom: '1rem' }}
            />
          </label>
        </div>
        <button type="submit" style={{ marginTop: '1rem', padding: '0.5rem 1rem' }}>
          Produce Message
        </button>
      </form>
      {/* 응답 메시지 출력 */}
      {response && <p style={{ color: 'green' }}>Success: {response}</p>}
    </div>
  );
}

이제 http://localhost:3000/kafka/producer에 접속하여, Kafka에 메시지를 넣을 수 있습니다.

consumer 페이지도 마저 작성해보겠습니다.

// src/app/kafka/consumer/page.tsx

'use client';
import { useEffect, useState } from 'react';
export default function KafkaConsumerPage() {
  const [messages, setMessages] = useState<string[]>([]); // Kafka로부터 받아온 메시지 저장
  // Kafka 메시지 가져오기
  const fetchMessages = async () => {
    try {
      const res = await fetch('/api/kafka/consumer');
      // 응답 데이터를 JSON으로 변환하여 상태 업데이트
      const data = await res.json();
      setMessages(data.messages);
    } catch (error) {
      console.log(error);
    }
  };
  // 2초마다 /api/kafka/consumer로 GET 요청
  useEffect(() => {
    const interval = setInterval(fetchMessages, 2000);
    return () => clearInterval(interval);
  }, []);
  return (
    <div style={{ padding: '2rem' }}>
      <h1>Kafka Consumer Messages</h1>
      <ul>
        {messages.map((msg, index) => (
          <li key={index}>{msg}</li>
        ))}
      </ul>
    </div>
  );
}

이제 http://localhost:3000/kafka/consumer에 접속하여, Kafka로부터 받아온 메시지들을 확인할 수 있습니다.

마치며…

이번 실습에서는 단일 Kafka 클러스터 환경을 구성하고, 프론트엔드에서 Kafka와 통신하기 위해 KafkaJS 라이브러리를 활용하여 간단한 Producer, Consumer를 만들어 사용해보았습니다.

 

프론트엔드(클라이언트) 영역에서 Kafka를 직접 활용하는 것은 충분히 검토해야하는 일이며, 실 서비스에 도입하기에는 아직 한계가 있다고 생각합니다. 절대적인 사용자의 수도 적을 뿐더러, 당장 Kafka JS만 보더라도 최근 릴리즈가 작년 2월로, 라이브러리 업데이트 또한 그리 활발한 편은 아닌 것으로 보여집니다.

 

비록 실상이 그렇더라도, 위와 같이 현존하는 JavaScript 기반의 Kafka 클라이언트로도 매트릭, 메시지, 토픽, 브로커 정보 조회와 같은 간단한 정보들은 수집할 수 있었기에, 잘 다듬고 응용한다면 충분히 활용할 수 있다는 가능성을 확인해볼 수 있었습니다.

'Web' 카테고리의 다른 글

[WebSocket] 소켓으로 채팅 구현하기  (0) 2023.07.02
profile

전개발

@전개발

프론트엔드 개발자, 전인혁(Jeonny) 입니다.
포스팅이 좋았다면 "공감❤️" 과 "댓글👍🏻" 부탁드립니다. 😊