본문 바로가기

AIoT

[Inside AIoT] 기계관리 지능화를 위한 AI 데이터 파이프라인 (2)

안녕하세요! 세상의 모든 기계를 스마트하게 바꾸고 있는 엣지크로스입니다.

지난 글( [AIoT] - [Inside AIoT] 기계관리 지능화를 위한 AI 데이터 파이프라인 (1) )에서는 AI 데이터 파이프라인이 무엇이고 왜 필요한지, 또 어떻게 구성되어 있는지 설명드렸는데요. 이번에는 실제로 이런 데이터 파이프라인을 엣지크로스가 어떻게 구현했는지 조금 더 자세하게 설명해드리려 합니다.


Written by. 오용석 전임(엣지크로스 AI 개발)

엣지크로스의 데이터 파이프라인 구현 방법

 

엣지크로스의 데이터 파이프라인은 그림에서 보시는 것처럼 1 MQTT Broker, 2 Data Pipeline, 3 Time Series DB로 크게 3개 요소로 구성됩니다. MQTT Broker는 IoT 디바이스로부터 데이터를 수집하고 Data Pipeline은 MQTT Broker로부터 데이터를 추출, 변환하여 Time Series DB에 저장되는 시스템입니다.

데이터 소스인 AIoT 디바이스와 Data Pipeline을 직접 연결하지 않고 중간에 메시지 브로커를 두었는데 이렇게 구성하면 몇 가지 이점이 있습니다.

1. 버퍼링 및 메시지 큐잉

MQTT 메시지는 IoT 장비에서 발생할 때마다 실시간으로 발송됩니다. 이러한 데이터 흐름은 트래픽의 급증 또는 네트워크 지연 등에 따라 불규칙할 수 있는데, 메시지 브로커를 사용하면 이러한 메시지를 버퍼링하고 큐잉하여 좀 더 안정적으로 데이터를 전송할 수 있습니다.

2. 신뢰성 및 내구성

메시지 브로커는 메시지 전달을 보장하며, 시스템 장애 발생 시 메시지를 안전하게 저장하고 복구할 수 있는 기능을 제공합니다. RabbitMQ는 메시지를 디스크에 저장하여 Flink나 다른 시스템의 장애 발생 시에도 데이터 손실을 방지할 수 있습니다.

3. 탄력성

RabbitMQ와 같은 메시지 브로커는 다운스트림 시스템의 가용성에 영향을 받지 않고 독립적으로 운영될 수 있습니다. 예를 들어, Flink 애플리케이션이 유지보수를 위해 일시 중지되어도 RabbitMQ는 계속해서 데이터를 수신하고 저장할 수 있습니다.

 

MQTT Broker

MQTT Broker를 구현하기 위한 툴을 선택하는 과정에서, 저희는 이런 부분들을 고민했습니다. 

우선 데이터 수집을 담당하는 부분은 데이터 소스로부터 전송되는 통신 방식을 지원해야 합니다. 이전 포스트에서 말씀드렸듯이 엣지크로스에서는 MQTT를 이용하여 데이터를 전송합니다. 따라서 MQTT를 지원하는 메시지 브로커 소프트웨어가 필요했습니다. 

메시지 브로커 역할을 할 수 있는 소프트웨어들 중에 RabbitMQ, Mosquitto를 고려했습니다. Kafka의 경우 높은 처리량과 내구성이 장점이지만, MQTT 프로토콜을 지원하지 않아서 제외했습니다. 그럼 RabbitMQ와 Mosquitto에선 어떤 소프트웨어가 적합할까요? 둘 다 IoT 통신을 위한 MQTT를 지원하지만 약간의 차이가 있다는 점에 주목했습니다.

RabbitMQ는 MQTT외에도 다양한 프로토콜을 제공하고 클러스터 구성을 통해 높은 가용성과 확장성을 확보할 수 있습니다. 또한 메시지 대기열, 내구성 있는 스토리지, 전달 확인과 같은 기능을 제공하기 때문에 높은 신뢰성을 가집니다. 

Mosquitto는 MQTT에 집중해서 설계되어 가볍고 효율적인 메시지 브로커입니다. 설치나 유지관리가 편하고 다양한 IoT기기와 호환도 잘됩니다.

정리하면 이렇게 이야기할 수 있는데요.

  • RabbitMQ는 클러스터 구성 등 추가 작업과 자원사용량이 높지만 안정적인 메시징과 확장성, 분산처리에 적합합니다.
  • Mosquitto는 MQTT에 한해서는 빠른 메시지 전달, 간편한 설치 및 유지관리 등 여러 장점이 있지만 소규모 시스템에 적합하다고 할 수 있습니다.

엣지크로스는 최종적으로 RabbitMQ를 선택했습니다. 안정적인 메시지 전달 및 향후 규모 확장성이 중요하다고 생각했기 때문이고요. 추가적으로 데이터 파이프라인을 구축하는데 사용한 Flink와 RabbitMQ가 결합해서 쓰기 좋기 때문입니다.

  • Flink는 본질적으로 분산 데이터 처리를 위해 설계된 시스템인데, RabbitMQ를 사용함으로써 여러 Flink 인스턴스 또는 클러스터 간에 메시지를 균등하게 분배할 수 있어 처리 능력을 확장하는 데 유리합니다. 또한 Flink가 자체적으로 RabbitMQ에 대한 커넥터를 제공하고 있어 추가적인 개발 없이 RabbitMQ와 Flink간 연결이 가능합니다. 


Data Pipeline

데이터 파이프라인을 구축하기 위해 어느정도 인지도가 있는 스트림 처리 프레임워크를 고려하였습니다. 전달보장, 내결함성, 상태 관리 등 스트림처리에서 중요한 요소를 중심으로 정리하면, 각 프레임워크들의 특징을 아래 표처럼 볼 수 있습니다. 

  Flink Spark Storm Kafka
데이터 처리 유형 하이브리드(배치 및 스트림) 하이브리드(배치 및 스트림) 스트림 전용 스트림 전용
복잡한 이벤트 처리 지원 지원 지원하지 않음 지원하지 않음(개발자가 직접 처리해야 함)
지연시간 스트리밍: 매우 낮은 지연 시간 (밀리초) 마이크로 배치: 거의 실시간 지연 시간 (초) 튜플 단위: 매우 낮은 지연 시간 (밀리초)             로그 기반: 매우 낮은 지연 시간 (밀리초)
전달보장 배치 및 스트림 처리에 대해 정확히 한 번 처리 보장             배치 처리에 대해 정확히 한 번, 스트림 처리에 대해 최소 한 번 처리 보장           구성에 따라 최소 한 번 또는 최대 한 번 처리 보장             정확히 한 번 처리 보장 (Kafka Streams, RocksDB)
 
상태관리 다양한 상태 백엔드 및 시간 시맨틱을 갖춘 상태 저장 작업에 대한 풍부한 지원             mapWithState 및 updateStateByKey 상태 저장 작업에함수로  대한 제한된 지원             상태 저장 작업에 대한 네이티브 지원 없음, 외부 데이터베이스 또는 Trident API에 의존             상태 저장 작업에 대한 네이티브 지원 없음, 외부 데이터베이스 또는 Kafka Streams API에 의존
 
내결함성             체크포인트와 상태 스냅샷을 외부 스토리지 시스템에 저장하여 고가용성과 빠른 장애 복구를 제공. 부분적 장애에 대해 로컬 복구를 지원.             체크포인트와 사전 로그를 외부 스토리지 시스템에 저장하여 내결함성을 제공. 또한 신뢰할 수 있는 분산 데이터 세트 (RDD)로부터 손실된 데이터를 재계산하기 위해 계보 정보를 사용. 확인 및 재시도를 사용하여 신뢰할 수 있는 메시지 전달을 보장함으로써 내결함성 제공. 로그 파티션을 여러 브로커에 복제하고 클러스터 메타데이터를 저장하기 위해 ZooKeeper를 사용하여 내결함성 제공. 또한 일관된 출력을 보장하기 위해 트랜잭션과 멱등성 프로듀서를 사용.

저희가 스트림 처리를 위한 주요 요소들 중에서 우선적으로 고려하였던 것은 전달 보장이었습니다. 학습에 사용할 데이터가 누락되거나 중복되면 데이터가 편향되거나 왜곡되어 잘못된 학습이 이루어 질 수 있기 때문에 이를 방지하기 위해 정확히 한번(exactly-once) 전달 보장을 지원해야 했고, 이를 지원하는 것이 Flink 와 Kafka였습니다. 같은 이유로 Spark의 경우 배치 방식에서는 정확히 한번을 보장하지만 스트림 방식에서 최소 한번을 보장하여 제외했습니다.

사실 Flink와 Kafka를 고려하는 시점에서 어느정도 Flink를 채택하는 쪽으로 정해지기도 했습니다. 이유는 Flink와 Kafka 모두 지연시간이나 상태관리, 내결함성을 갖추고 있지만 Kafka의 경우 타사 제품들과 통합이 어렵기 때문에 Kafka에 종속되는 상황이 발생할 가능성이 높은데다 메시지 브로커로 RabbitMQ를 써야하는 상황이었기 때문입니다. 

그래도 Flink와 Kafka(streams)를 활용도 측면에서 조금 더 비교해보자면 다음과 같습니다

  Flink Kafka Streams
성능 대규모 스트리밍 애플리케이션에서 높은 처리량, 낮은 지연 시간 시나리오에서 탁월한 성능 Kafka 생태계 내에서 효율적인 처리를 제공하며, 실시간 분석과 중간 수준의 작업 부하에 적합
사용의 용이성 숙련된 사용자를 위해 풍부한 기능과 광범위한 문서를 제공 Kafka에 익숙한 사람들에게는 통합과 간단한 설정 덕분에 사용하기 편리함
생태계 지원 다양한 스트리밍 요구 사항을 충족하기 위한 광범위한 통합 및 도구를 갖춘 광범위하고 활발한 커뮤니티를 갖추고 있음 Kafka 생태계 내에서 강력한 통합과 지원
사용 사례 복잡한 이벤트 처리, 대규모 애플리케이션 및 고급 스트리밍 기능이 필요한 시나리오에 이상적 중간 규모의 스트림 처리나 실시간 분석 등 Kafka와 긴밀하게 통합된 애플리케이션에 적합
프로그래밍 유연성 다양한 프로그래밍 언어를 지원, PyFlink, Table API와 같은 API 추상화를 제공 주로 Java에 중점을 두었으며, 다른 프로그래밍 언어에 대한 지원은 제한적

Kafka는 Kafka 생태계 내에서 다른 특화된 기능을 수행하는 것들을 간단한 설정과 구성으로 쉽고 빠르게 통합이 가능하지만 Kafka를 메인으로 사용하지 않는 시스템에서는 제한이 있습니다. 반면 Flink는 보다 유연하게 다른 시스템과 결합될 수 있다는 특징이 있죠. 결과적으로 저희는 이러한 점을 종합적으로 고려하여 최종적으로 Flink를 채택하게 되었습니다.

그렇다면, Flink는 데이터를 어떻게 처리하까요? Flink의 기본적인 데이터 처리구조는 Source Operator, Transform Operator, 그리고 Sink Operator Operator로 구성됩니다.

1. Source Operator

Source Operator는 데이터 스트림의 입력을 담당합니다. 이 연산자는 외부 시스템(예: 데이터베이스, 메시지 큐, 파일 시스템 등)에서 데이터를 읽어 Flink의 데이터 파이프라인으로 가져옵니다. Source Operator는 연속적인 데이터 스트림을 생성하거나, 정기적으로 데이터를 폴링하여 스트림을 생성할 수 있습니다.

2. Transform Operator

Transform Operator는 수집된 데이터에 대해 필터링, 집계, 조인, 윈도잉 등 다양한 전처리를 수행합니다. 데이터가 Flink을 통해 흐를 때, Transform Operator는 사용자가 정의한 연산을 수행하여 데이터 스트림의 각 요소를 변환하거나 정보를 추출합니다.

3. Sink Operator

Sink Operator는 처리된 데이터 스트림을 외부 시스템으로 출력합니다. 이 연산자는 데이터를 파일 시스템, 데이터베이스, 또는 다른 스트림 처리 시스템으로 전송할 수 있습니다. Sink는 데이터의 최종 목적지로, Flink에서의 데이터 처리가 완료된 후 외부 시스템에서 데이터를 사용할 수 있도록 합니다.

 

Time Series DB

마지막으로 데이터를 저장하는 부분이 남았죠. 저희는 Time Series DB(이하 TSDB)를 선택했습니다. 그 이유는 엣지크로스의 IoT 센서에서 수집되는 데이터가 시계열 형태였기 때문인데요. TSDB는 일반적인 관계형 데이터베이스보다 시계열 데이터를 관리하는 데에 특히 강점을 가지고 있습니다. TSDB는 시계열 데이터를 효율적으로 다루기 위해 고안된 데이터베이스로 쓰기 성능에 최적화 되어있습니다관계형 데이터베이스(RDB)는 인덱스가 걸려있으면 데이터의 양이 커질수록 쓰기 성능은 점점 저하되지만 TSDB의 인덱스는 이런 경우에도 성능이 떨어지지 않도록 만들어졌습니다. TSDB는 시간에 따라 저장공간을 분리하고 시간으로 쿼리를 할 수 있습니다. 

저희는 여기서 InfluxDB를 사용했습니다. InfluxDB 2013 Go 언어로 개발된 오픈소스 Time Series Database(이하 TSDB)입니다. InfluxDB의 경우 인덱싱 성능이 뛰어나 대용량 데이터에서도 빠른 검색이 가능하며, 데이터 스키마를 유연하게 변경할 수 있기 때문에 다양한 데이터 형태를 쉽게 처리할 수 있습니다. 또한 플러그인 아키텍쳐 설계로 타사 제품과 통합이 용이한데다, 공식 문서가 잘 관리되고 있고 실제로도 많이 알려져있어 참고할 사례등이 많다는 점도 장점입니다. 

 

InfluxDB 특징

InfluxDB의 주요 특징으로는 연속적인 쿼리와 보존 정책이 있습니다. 연속적인 쿼리는 지정된 쿼리를 주기적으로 실행하여 데이터를 분석하고, 결과를 새로운 시리즈로 저장하는 기능을 수행합니다. 예를 들어, 실시간 데이터 피드에서 일정 시간 간격으로 평균을 계산하거나, 데이터를 다운샘플링하는 작업을 자동으로 수행할 수 있습니다.

보존 정책의 경우 오래된 데이터를 자동으로 삭제해주는 정책으로서 데이터베이스 단위로 정의됩니다. 한 개 데이터베이스에 여러개의 보존 정책을 가질 수 있습니다. 데이터가 계속해서 쌓이면 저장 공간 및 처리 속도 등 문제가 발생할 수 있기 때문에 이러한 기능을 지원하고 있습니다.

내부구조

 

시계열 데이터베이스에서 measurement는 RDB의 table에 해당하며, RDB와 마찬가지로 하나의 데이터베이스 안에 여러 개의 measurement가 있을 수 있습니다. 하지만 RDB와는 조금 다른 개념이 있는데, influxDB는 NoSQL의 개념을 바탕으로 만들어져서 Schemaless(스키마가 없음)한 점입니다. 고정된 스키마를 구성하지 않아도 되기때문에 시간이 지나면서 데이터 구조가 변경되어도 쉽게 대응할 수 있습니다.

마무리하며

지금까지 엣지크로스에서 데이터 파이프라인 시스템을 어떻게 구현하였는지 알아보았습니다. 데이터 파이프라인 구축 방법에 정답이라는 것은 없지만, AIoT 디바이스를 중심으로 AI 서비스를 구현하는 회사에서는 어떠한 방식으로 데이터 파이프라인을 구성했는지 참고해주시면 좋겠습니다. 

 

 


엣지크로스 솔루션, 더 자세히 알아보고 싶으시다면 ✅ https://edgecross.ai/solution