11. Stream Processing

10장에서는 배치 처리 – 파일의 집합을 입력으로 읽어 새 출력 파일의 집합을 생산하는 기법을 알아보았다. 하지만 여기서 중요한 가정은 입력에 한계가 있다는 것, 즉 알려지고 유한한 크기를 가지고 있다는 것이다. 그래서 배치 과정은 입력을 읽는 것이 언제 끝날지를 알 수 있었다. 실제로는 많은 데이터는 점진적으로 시간에 따라 도착하기 때문에 한계가 없다. 날마다 배치 과정을 하는 것의 문제는 입력의 변화가 하루 뒤의 출력에 반영된다는 것이고 이것은 많은 성급한 사용자들에게 있어 너무 느리다는 것이다. 일반적으로, 스트림은 시간에 따라 증가하면서 가용 가능해지는 데이터이다. 이 장에서는 이벤트 스트림을 데이터 관리 메커니즘으로 알아볼 것이다.

Transmitting Event Streams

배치 처리 세계에서는 작업의 입출력은 파일이다. 스트림 처리 맥락에서는 기록은 이벤트로 더 잘 알려져 있으나 본질적으로는 같은 것이다. 작고, 자립하는, 어떤 시점에서 일어난 무언가의 세부 정보를 담는 변경 불가능한 오브젝트. 이는 텍스트 문자열, JSON, 또는 바이너리 형태로 인코딩되어있을 수도 있다. 배치 처리에서 파일은 한번 써진 뒤 복수의 작업에 의해 읽힐 수 있다. 스트리밍에서는 이벤트는 생산자로부터 한 번 생성된 뒤 복수의 소비자로부터 처리될 수 있다. 이론적으로 파일이나 데이터베이스는 생산자와 소비자를 잇는 데 있어 충분하지만, 폴링이 비싼 경우를 대비해 소비자가 새 이벤트가 등장할 때마다 알림을 받는 것이 좋다. 데이터베이스는 이런 알림 메커니즘을 전통적으로 잘 지원하지는 못했다.

Messaging Systems

소비자들에 새 이벤트에 대해 알림을 주는 흔한 방법은 메시징 시스템을 사용하는 것이다. 프생산자와 소비자 간 Unix 파이프나 TCP 연결같은 직접 통신 채널을 만드는 것은 메시징 시스템을 구현하는 간단한 방법이 될 것이다. 하지만 대부분의 메시징 시스템은 이를 확장한 출판/구독 모델을 사용하는데, 여기서는 다른 시스템이 폭넓은 접근법을 갖고 모든 목적에 대한 하나의 정답은 없다. 다음 질문을 할 수 있다: 생산자가 소비자가 처리하는 것보다 더 빠르게 메시지를 보낸다면? 세 가지 방법이 있다. 시스템이 메시지를 누락하거나, 메시지를 큐에 버퍼링하거나, 후방 압력을 적용하거나. 노드가 크래시되거나 일시적으로 오프라인이 된다면 메시지가 손실되는가? 데이터베이스와 같이, 견고성은 얼마간의 비용을 대가로 디스크에 대한 쓰기와 복제의 조합을 필요로 할 수 있다. 메시지 유실이 허용 가능한지는 애플리케이션에 크게 의존한다. 배치 처리 시스템의 좋은 특성은 강한 신뢰성 보장을 제공한다는 것이다. 스트리밍에서는 어떻게 할까?

Direct messaging from producers to consumers

여러 메시징 시스템들은 생산자와 소비자간 직접 네트워크 통신을 중간 노드를 거치는 것 없이 사용한다. UDP 멀티캐스트, ZeroMQ, StatsD, Brubeck, 직접 HTTP/RPC 요청 등. 이런 직접 메시징 시스템은 설계된 사황에 대해서는 잘 동작하지만 일반적으로 애플리케이션 코드가 메시지 유실의 가능성을 인지할 것을 요구한다. 소비자가 오프라인이면 그것이 도달 불가능한 동안 보내진 메시지들이 유실될 수 있다.

Message brokers

메시지를 전송하는 데 있어 넓게 사용되는 대안은 메시지 브로커 (메시지 큐)로, 메시지 스트림을 다루는 데 최적화된 종류의 데이터베이스이다. 브로커 내 데이터를 중앙화함으로써, 이 시스템들은 왔다 가는 클라이언트 (연결, 연결 끊기, 크래시)에 쉽게 대처할 수 있으며, 견고성의 문제는 브로커로 대신 이동된다. 큐를 쓰는 것의 결과는 또한 소비자가 일반적으로 비동기적이 된다는 것이다.

Message brokers compared to databases

어떤 메시지 브로커는 XA나 JTA를 사용한 두 단계 수행 프로토콜에서 등장하기도 한다. 하지만 메시지 브로커와 데이터베이스간에는 차이가 있는데, 메시지 브로커는 메시지가 소비자에게 전송되면 이를 삭제하고, 작업 집합의 크기가 작다 가정하고, 토픽의 특정한 패턴에 맞는 부분집합을 구독하는 어떤 방식을 지원하고, 임의의 쿼리를 지원하지 않는다는 차이점이 있다.

Multiple consumers

복수의 소비자가 같은 토픽에서 메시지를 읽을 때는 두 주된 메시징 패턴이 쓰인다.

  • 부하 분산. 각 메시지는 소비자 중 하나에 전송되어 소비자들이 토픽의 메시지를 처리하는 작업을 공유할 수 있게 한다.
  • 팬아웃. 각 메시지는 모든 소비자에 전송된다.

이 두 패턴은 조합될 수도 있다.

Acknowledgments and redelivery

소비자는 어느 때나 크래시할 수 있으므로 브로커가 소비자에게 메시지를 전송했지만 소비자가 이를 절대 처리하지 않거나 크래시 이전 부분적으로 처리하는 경우도 일어날 수 있다. 이를 처리하기 위해, 클라이언트로의 연결이 브로커가 인식을 받지 못한 채 닫히거나 타임아웃되면, 이는 메시지가 처리되지 않았음을 가정하고 메시지를 다른 소비자에게 재전송한다. 부하 분산과 조합되면 이 재전송 동작은 메시지의 순서화에 흥미로운 효과를 갖는다. 메시지 브로커가 그 외에 메시지의 순서를 보존하도록 시도하더라도 (JMS나 AMQP 표준에서 요구되는 것과 같이), 부하 분산과 재전송의 조합은 메시지가 재순서화되도록 할 수밖에 없다. 이룰 피하려면 소비자마다 독립된 큐를 써야 한다.

Partitioned Logs

패킷을 네트워크를 통해 보내거나 네트워크 서비스에 요청을 하는 것은 일반적으로 영구적 흔적을 남기지 않는 일시적 동작이다. 데이터베이스와 파일 시스템은 반대의 접근법을 수행한다. 데이터베이스나 파일에 써진 모든 것은 대개 영원히 기록될 것으로 예상된다. 누군가가 그것을 명시적으로 삭제할 것을 선택하지 않는 한. 이 관점의 차이는 파생 데이터가 어떻게 만들어지는지에 큰 효과를 갖는다. 하지만 데이터베이스의 견고한 저장소 접근법과 메시징의 저 지연시간 알림 기능을 혼합할 수는 없을까?

Using logs for message storage

로그는 디스크의 덧붙임 전용 기록의 나열이다. 같은 구조는 메시지 브로커를 구현하는 데 있어 쓸 수 있다. 단일 디스크가 제공할 수 있는 것보다 더 많은 처리량으로 확장하려면, 로그는 분할될 수 있다. 각 파티션 내에서, 브로커는 단조증가하는 순서 번호(오프셋)을 모든 메시지에 배정한다. 파티션은 덧붙임 전용이기 때문에 이런 순서 번호는 합리적이다. 파티션 내 메시지가 완전 순서화되기 때문이다.

Logs compared to traditional messaging

로그 기반 접근법은 당연하게도 팬아웃 메시징을 지원한다. 여러 소비자들이 서로를 영향을 주지 않고 독립적으로 로그를 읽을 수 있기 때문이다. 메시지를 읽는 것은 로그로부터 그것을 삭제하지 않으니까. 부하 분산을 위해, 각 클라이언트는 그것이 배정된 파티션 내에서 모든 메시지를 소비한다. 이런 조립질의 부하 분산은 단점이 있다: 토픽을 소모하는 작업을 공유하는 노드의 수는 그 토픽의 로그 파티션의 수로 제한된다. 또한, 단일 메시지가 처리하기에 느리면 그것은 그 파티션 내의 병목이 된다. 즉, 메시지가 처리하기 매우 비싸고 처리를 메시지-메시지 기반으로 병렬화하기를 원하고 메시지 순서화가 중요하지 않으면 JMS/AMQP 스타일의 메시지 브로커가 선호된다. 반면에, 높은 메시지 처리량을 갖고 각 메시지가 처리하기 빠르고 메시지 순서화가 중요하다면 로그 기반 접근법이 잘 동작한다.

Consumer offsets

파티션을 순차적으로 소모하는 것은 어느 메시지가 처리되었는지를 판단하기 쉽게 한다. 이를 위해 메시지에 오프셋을 주는 것은 단일 리더 데이터베이스 복제에서 흔히 찾아볼 수 있는 로그 순서 번호와 매우 비슷하다. 소비자 노드가 실패하면, 소비자 그룹 내의 다른 노드가 실패한 소비자의 파티션에 배정되고 마지막으로 기록된 오프셋으로부터 메시지를 소모하기 시작한다.

Disk space usage

로그에 덧붙이기만 한다면 디스크 공간을 다 소모하게 될 것이다. 디스크 공간을 되찾으려면 로그를 부분으로 나눠서 시간에 따라 오래된 부분은 삭제되거나 아카이브화된 저장소로 이동되어야 할 필요가 있다. 즉, 느린 소비자가 메시지의 속도를 따라잡지 못하고, 소비자가 삭제된 부분을 가리키는 지점 뒤까지 멀어지면, 메시지 일부를 놓치게 된다는 것을 의미한다. 메시지를 얼마나 오래 유지하느냐에 상관없이, 로그의 처리량은 거의 상수가 되는데, 모든 메시지는 어쨌든 디스크에 써지기 때문이다.

When consumers cannot keep up with producers

생산자가 소비자가 처리하는 것보다 더 빠르게 메시지를 보낸다면? 세 가지 방법이 있다. 시스템이 메시지를 누락하거나, 메시지를 큐에 버퍼링하거나, 후방 압력을 적용하거나. 이 관점에서 로그 기반 접근법은 큰 고정 크기의 버퍼를 쓰는 버퍼링 접근 방식이라 볼 수 있다. 소비자가 너무 뒤떨어져서 메시지를 놓치기 시작하더라도 그 소비자만 영향받을 뿐이다. 이런 동작은 셧다운된 소비자에 대한 큐를 삭제할 때 큐들이 불필요하게 메시지를 누적해서 활성 중인 소비자로부터 메모리를 뺏어가는 일을 막기 위해 주의를 기울여야 하는 전통적인 메시지 브로커와는 상반되는 것이다.

Replaying old messages

AMQP와 JMS 스타일 메시지 브로커와는 메시지를 처리하고 인식하는 것은 파괴적 연산임을 알아보았다. 이는 메시지가 브로커에서 삭제되는 것을 야기하기 때문이다. 반면, 로그 기반 메시지 브로커에서는 메시지 소모는 파일에서 읽는 것과 비슷하다. 이것은 로그를 변경하지 않는 읽기 전용 연산이다. 소비자의 출력 이외에 처리의 부가 효과는 소비자 오프셋이 전진한다는 것이다. 이런 관점은 로그 기반 메시지를 파생 데이터가 반복 가능한 변환 과정을 통해 입력 데이터와 깔끔하게 분리된 배치 처리처럼 보이게 만들었다.

Databases and Streams

메시지 브로커와 데이터베이스간 얼마간 비교를 알아보았다. 이전에 이벤트는 특정 시점에 일어난 무언가에 대한 기록이라고 하였다. 이는 데이터베이스에 대한 쓰기일 수도 있다. 사실, 복제 로그는 데이터베이스 쓰기 이벤트의 스트림으로, 트랜잭션을 처리함에 따라 리더에 의해 생산된다. 모든 이벤트가 데이터베이스에 대한 쓰기를 표현하고 모든 복제가 같은 이벤트를 같은 순서로 처리한다면 모든 복제는 동일한 마지막 상태로 끝날 것이라는 상태 기계 복제 원리도 알아보았다. 이 장에서는 불균일한 데이터 시스템에서 발생하는 문제를 알아보고 이벤트 스트림에서 아이디어를 데이터베이스로 가져와서 이를 어떻게 푸는지를 알아볼 것이다.

Keeping Systems in Sync

모든 데이터 저장소, 쿼리, 처리 요구를 만족시키는 하나의 시스템은 없다. 같거나 관계된 데이터가 여러 다른 장소에서 등장한다면 이들은 다른 것과 함께 동기화되어 저장되어야 한다. 주기적 완전 데이터베이스 덤프가 너무 느리다면, 가끔 사용되는 대안은 쌍대 쓰기로, 애플리케이션 코드가 시스템 각각에 데이터가 변경될 때 명시적으로 쓰는 것이다. 하지만 쌍대 쓰기는 심각한 문제점이 조금 있는데, 하나는 경합 조건이다. 추가적인 동시성 감지 메커니즘이 없다면, 동시적 쓰기가 일어났는지도 모를 것이다 – 한 값이 단순히 조용하게 다른 값을 덮어씌울 테니까. 쌍대 쓰기의 또 다른 문제는 쓰기 중 다른 쪽이 성공하는 동안 다른 하나는 실패할 수 있다는 것이다. 단일 리더를 가진 단일 복제 데이터베이스를 갖고 있다면, 그 리더가 쓰기의 순서를 결정하므로, 상태 기계 복제 접근법은 데이터베이스의 복제에 대해 동작한다. 리더가 하나가 아닐 때 리더가 하나만 있으면 좋을 것이다 – 그러나 이것이 실제로 가능한가?

Change Data Capture

대부분 데이터베이스의 복제 로그의 문제는 그것들이 공용 API가 아닌 데이터베이스의 내부 세부 구현으로 고려되어 왔다는 것이다. 수십 년 동안, 많은 데이터베이스는 그들에 써지는 변경 로그들을 얻는 문서화된 방식을 갖지 못했다. 최근에는, 변경 데이터 포착(CDC)에 대해 높은 관심이 생겼고, 이것은 데이터베이스에 써지는 모든 데이터 변경을 관찰하고 그것들이 다른 시스템으로 복제될 수 있는 형태로 추출하는 과정이다. 이는 이 변경들이 쓰이자마자 스트림으로 가용 가능해질 때 특히 흥미로워진다.

Implementing change data capture

로그 소비자는 파생 데이터 시스템이라 부를 수 있다. 근본적으로, 변경 데이터 포착은 한 데이터베이스를 리더로 만들고 (변경이 포착되는 데이터베이스) 다른 데이터베이스들은 팔로워로 만든다. 데이터베이스 트리거는 변경 데이터 포착을 구현하는 데 쓸 수 있다. 이 때 데이터 테이블에 대한 모든 변경을 관측하는 트리거가 등록되고 대응하는 엔트리가 변경 로그 테이블에 추가된다. 하지만 이들은 깨지기 쉽고 큰 성능 오버헤드를 갖고 있다. 메시지 브로커처럼, 변경 데이터 포착은 대개 비동기적이다.

Initial snapshot

데이터베이스에 수행된 모든 변경에 대한 로그를 갖고 있다면 로그를 재실행함으로써 데이터베이스의 전체 상태를 재구성할 수 있다. 전체 완전 텍스트 인덱스를 만드는 것은 예를 들면 전체 데이터베이스의 완전 복제를 필요로 한다. 최근의 변경에 대한 로그만을 적용하는 것은 충분치 않다. 그렇게 하면 최근에 업데이트되지 않은 항목들은 유실되기 때문이다. 데이터베이스의 스냅샷은 변경 로그의 오프셋이나 알려진 위치에 대응해야만 한다. 그럼으로써 스냅샷이 처리된 시점부터 어느 지점에서부터 변경을 적용해야 하는지를 알 수 있다.

Log compaction

로그 기록이 제한된 만큼밖에 없다면 새 파생 데이터 시스템을 추가하고자 할 때마다 스냅샷 과정을 수행해야 한다. 하지만 로그 컴팩트화는 좋은 대안을 제공해 준다. 원리는 단순하다: 저장소 엔진은 주기적으로 같은 키의 로그 기록을 찾고, 중복을 제거하고, 각 키에 대한 가장 최근 업데이트만을 유지한다. 로그 구조 저장소 엔진에서, 특별한 널 값에 대한 업데이트 (묘비)는 그 키가 지워졌음을 알리며, 로그 컴팩트화 중 지워지게 한다. 같은 발상은 로그 기반 메시지 브로커와 변경 데이터 포착의 맥락에서도 동작한다. 검색 인덱스와 같은 파생 데이터 시스템을 재구성하고 싶다면, 로그 컴팩트화된 토픽의 오프셋 0으로부터 새소비자를 시작해, 순차적으로 로그 내 모든 메시지를 스캔할 수 있다. 이는 메시지 브로커가 임시 메시징뿐만 아니라 견고한 저장소에 쓰일 수 있게 한다.

API support for change streams

데이터베이스는 점점 변경 스트림을 새로 장착되고 리버스 엔지니어된 CDC 산물이 아닌 일급 인터페이스로 지원하기 시작했다. VoltDB는 트랜잭션이 연속적으로 데이터를 스트림의 형태로 데이터베이스로부터 내보내기할 수 있도록 했다. Kafka Connect는 넓은 범위의 데이터베이스 시스템의 변경 데이터 포착 도구를 Kafka와 통합하기 위한 산물이다.

Event Sourcing

여기서 다룬 아이디어들과 이벤트 근원, 도메인 지향 설계 (DDD) 커뮤니티에서 개발된 기법과는 평행 지점이 존재한다. 변경 데이터 포착과 비슷하게, 이벤트 근원은 애플리케이션 상태에 대한 모든 변화를 변화 이벤트의 로그로 저장하는 것을 포함한다. 가장 큰 차이점은 이벤트 근원은 발상을 다른 수준의 추상화에 적용한다는 것이다. 이 때 애플리케이션 로직은 이벤트 로그에 써진 변경 불가능한 이벤트에 기반해서 명시적으로 세워진다. 이벤트 근원은 데이터 모델링의 강력한 기법이다. 애플리케이션의 관점에서는 사용자의 동작을 이 동작의 효과를 변경 가능한 데이터베이스에 기록하는 것보다는 변경 불가능한 이벤트로 기록하는 것이 낫다. 이벤트 근원은 연대기 데이터 모델과 비슷하며, 이벤트 로그와 별 스키마에서 찾을 수 있는 진리표 사이에도 유사점이 있다. Event Store와 같은 특수화된 데이터베이스는 이벤트 근원을 사용해 애플리케이션을 지원하기 위해 개발되었다. 하지만 일반적으로 이 접근법은 특별한 도구와는 독립적이다.

Deriving current state from the event log

이벤트 로그는 그 자체로는 그다지 쓸모가 없다. 사용자들은 시스템의 현재 상태를 보는 것을 기대하지, 수정 역사를 보는 것을 기대하지 않기 때문이다. 그러므로, 이벤트 근원을 사용하는 애플리케이션은 이벤트의 로그 (시스템에 쓰여진 데이터를 표현)를 취해 이를 사용자에게 보여줄 수 있을 만한 애플리케이션 상태 (시스템으로부터 어떤 데이터가 읽히는지에 대한 방식)로 변환하는 것이 필요하다. 변경 데이터 포착과 비슷하게, 이벤트 로그를 재실행하는 것은 시스템의 현재 상태를 재구성하게 해 준다. 하지만 로그 컴팩트화는 다르게 다뤄져야 한다. 나중의 이벤트들이 이전 이벤트들을 오버라이딩하지 않기 때문이다. 이벤트 근원을 쓰는 애플리케이션은 이벤트 로그로부터 파생되는 현재 상태의 스냅샷을 저장하는 메커니즘을 갖고 있어서, 전체 로그를 반복적으로 재처리할 필요가 없다.

Commands and events

이벤트 근원 철학은 이벤트와 명령을 주의 깊게 구별하는 ㅓㅅ이다. 사용자로부터의 요청이 처음 도달하면, 그것은 처음에는 명령이다. 그것에 대한 증명이 성공적이고 명령이 수용되면, 그것은 이벤트가 되고 견고해지고 불변이 된다. 이벤트가 생성된 시점에 그것은 사실이 된다. 이벤트 스트림의 소비자는 이벤트를 거절할 수 없다. 소비자가 이벤트를 본 시점에 그것은 이미 로그의 불변 부분이고 다른 소비자들에게도 보여졌을 수 있기 때문이다. 따라서 명령의 모든 검증은 이벤트가 되기 전에 동기적이어야 한다.

State, Streams, and Immutability

배치 처리는 입력 파일의 비변동성으로부터 이득을 얻는다는 것을 알아보았다. 그래서 존재하는 입력 파일에 실험 처리 작업을 손상을 끼칠 우려 없이 수행할 수 있다. 데이터베이스는 애플리케이션의 현재 상태를 저장하는 것이라 할 수 있다. 이 표현식은 읽기에 최적화되어 있고, 대개 쿼리를 수행하는 데 있어 가장 편리하다. 변경되는 상태가 있으면, 그 상태는 시간에 따라 변화된 이벤트의 결과이다. 상태가 어떻게 변했건 간에, 그 변경을 야기한 이벤트의 나열이 존재한다. 수학적으로 본다면 애플리케이션 상태는 이벤트 스트림을 시간에 따라 적분한 것으로 볼 수 있고 변화 스트림은 상태를 시간에 따라 미분한 것이라 볼 수 있다. 변경 로그를 견고하게 저장한다면, 이것은 상태를 재현 가능하게 만드는 효과가 있다. 로그 컴팩트화는 로그와 데이터베이스 상태간의 구분을 연결하는 하나의 방법이다.

Advantages of immutable events

데이터베이스의 불변성은 오래된 발상이다. 실수가 생기면 회계사들은 틀린 트랜잭션을 지우거나 변경하지 않는다. 그 대신, 실수를 상쇄할 만한 다른 트랜잭션을 추가한다. 이런 감사 가능성은 금융 시스템에서는 특히 중요하지만, 이런 강력한 규제가 없는 다른 많은 시스템에서도 이득이 된다. 변경 불가능한 이벤트는 현 상태 그 이상의 정보를 포착한다.

Deriving several views from the same event log

변경 가능한 상태를 불변 이벤트 로그와 분리함으로써, 같은 이벤트 로그로부터 여러 다른 읽기 지향 표현식을 파생할 수 있다. 이벤트 로그로부터 데이터베이스로 명시적인 변환 과정을 갖는 것은 애플리케이션이 시간에 따라 진화하기 쉽게 해 준다. 데이터를 저장하는 것은 그것이 어떻게 쿼리되고 접근될지를 걱정할 필요가 없다면 매우 쉽다. 데이터베이스와 스키마 설계에 대한 전통적 접근법은 데이터가 쿼리되는 것과 같은 형태로 쓰여져야 한다는 잘못된 믿음에 근거했다. 데이터를 읽기 최적화된 뷰로 비표준화하는 것은 실제로 완전히 납득할 만하다. 전환 과정이 이벤트 로그와 일관적이도록 하는 메커니즘을 주기 때문이다.

Concurrency control

이벤트 근원과 변경 데이터 포착의 가장 큰 단점은 이벤트 로그의 소비자가 대개 비동기적이어서, 사용자가 로그에 대한 쓰기를 한 뒤 로그 파생 뷰로부터 읽으면 그들의 쓰기가 읽기 뷰에 아직 반영되지 않았을 가능성이 있다는 것이다. 한 해법은 읽기 뷰의 업데이트를 이벤트를 로그에 덧붙여서 동기적으로 수행하는 것이다. 그 외에, 이벤트 로그로부터 현 상태를 파생시키는 것은 또한 동시성 제어의 어떤 반면을 단순화한다. 이벤트 로그와 애플리케이션 상태가 같은 방식으로 파티션되면 간단한 단일 스레드 로그 소비자가 쓰기에 대한 동시성 제어를 필요로 하지 않는다. 생성되면서부터, 이는 한 번에 한 이벤트만 처리하면 된다.

Limitations of immutability

이벤트 근원 모델을 사용하지 않는 많은 시스템은 그럼에도 불구하고 불변성에 의존한다. 모든 변경의 불변 기록을 얼마나 오래 가지고 있어야 할까? 이는 데이터셋의 변경 양에 따라 달라진다. 성능 이유 이외에도, 모든 불변성에도 불구하고 정부나 법적 이유로 인해 데이터가 삭제될 필요가 있는 상황이 존재한다. 이런 상황에서는, 로그에 다른 이벤트를 부착해 이전 데이터가 삭제된 것으로 고려하도록 하는 것으론 충분치 않다. 역사를 다시 써서 데이터가 처음부터 절대 써지지 않았던 것처럼 해야 한다. 완전히 데이터를 삭제하는 것은 매우 어렵다. 복제가 여러 군데에 있을 수 있기 때문이다.

Processing Streams

지금까지는 스트림이 어디서 오는지, 스트림이 어떻게 전송되는지를 알아보았다. 스트림을 어떻게 처리하는지를 알아보자. 세 가지 방법이 있다: 이벤트의 데이터를 취해 데이터베이스, 캐시, 탐색 인덱스, 또는 다른 저장소 시스템에 써서 다른 클라이언트로부터 쿼리될 수 있게 한다. 아니면 이벤트를 사용자에게 어떤 방식으로든 내보낸다. 또는 하나나 그 이상의 입력 스트림을 처리해 하나 이상의 출력 스트림을 생성할 수도 있다. 이 장의 남은 부분에서는 세 번째 방법을 다룬다. 스트림 프로세서를 파티셔닝하고 병렬화하는 패턴은 MapReduce와 데이터 흐름 엔진의 그것과 매우 비슷하므로 반복하지는 않는다. 배치 작업과의 한 핵심적인 차이점은 스트림은 절대 끝나지 않는다는 것이다.

Uses of Stream Processing

스트림 처리는 오랜 기간동안 감시 목적으로서, 조직이 특정한 일이 일어났을 때 경고를 받기 위해 많이 쓰여왔다. 예를 들면 사기 감지, 금융 시장에서의 가격 변동, 공장에서의 기계 상태 감시, 군용 시스템에서 잠재적 공격자의 감지. 이런 애플리케이션은 매우 복잡한 패턴 매칭과 그 상관 관계를 요구한다.

Complex event processing

복잡 이벤트 처리 (CEP)는 1990년에 개발된 접근법으로서 이벤트 스트림을 분석한다. 특히 특정 이벤트 패턴을 탐색하는 것을 필요로 하는 애플리케이션에 잘 맞는다. 복잡 이벤트 처리 시스템은 SQL과 같은 고수준의 선언적 쿼리 언어를 종종 사용하거나, 또는 그래픽 사용자 인터페이스를 써서 감지되어야 하는 이벤트의 패턴을 묘사한다. 이런 시스템에서, 쿼리와 데이터는 일반 데이터베이스에 비해 그 관계가 역전된다. 쿼리는 짧은 기간 동안 저장되고 입력 스트림으로부터의 이벤트는 그들을 연속적으로 흘러 지나가서 이벤트 패턴에 맞는 쿼리를 찾는다.

Stream analytics

스트림 처리의 다른 영역은 스트림에 대한 분석이다. 특정한 이벤트의 비율을 측정, 어떤 시간 기간동안 어떤 값의 이동 평균을 측정, 이전 시간 구간과 현재 통계량을 비교. 이런 통계량들은 고정된 시간 구간에 대해 대개 계산된다. 스트림 분석 시스템은 가끔 확률적 알고리즘을 사용하기도 한다. 분석을 염두에 두고 설계된 많은 오픈 소스 분산 스트림 처리 프레임워크가 존재한다.

Maintaining materialized views

데이터베이스에 대한 변경의 스트림은 파생 데이터 시스템을 보존하는 데 쓰일 수 있다 – 캐시, 검색 인덱스, 데이터 웨어하우스를 시작 데이터베이스와 업데이트되도록. 비슷하게, 이벤트 근원에서, 애플리케이션 상태는 이벤트의 로그를 적용함으로써 유지될 수 있다. 이론적으로, 모든 스트림 프로세서는 물질화된 뷰 유지보수를 위해 사용될 수 있다. 이벤트를 영원히 유지보수할 필요성은 분석적 지향 프레임워크들이 하는 가정 – 그들이 제한된 구간 동안만 동작한다는 – 과 역행하기는 하지만.

Search on streams

복수 이벤트로 이루어진 패턴을 찾는 것을 허용하는 복잡 이벤트 처리 외에도, 복잡한 기준에 기반한 각각 이벤트를 찾아야 할 필요가 있다. 예를 들면 완전 텍스트 탐색 쿼리. 전통적 검색 엔진은 문서를 인덱스화하고 인덱스에 대해 쿼리를 수행한다. 반면, 스트림에 대한 탐색은 처리를 그 헤드에 대해 수행한다. 쿼리는 저장되고, 문서들은 쿼리를 지나쳐 동작한다 – 복잡 이벤트 처리에서처럼.

Message passing and RPC

메시지 패싱 시스템을 원격 프로시져 호출의 대안으로서 알아보았다. 즉, 액터 모델의 예에서 사용되었듯 서비스가 통신하는 메커니즘으로. 하지만 이것은 스트림 프로세서로 고려되지는 않는다. 액터 프레임워크는 근본적으로 동시성과 통신 모듈의 분산된 실행을 관리하는 메커니즘이고, 액터간 통신은 수명이 짧고 일 대 일이고, 액터들은 임의의 방식으로 통신할 수 있기 때문이다. 그러므로, 원격 프로시져 호출류 시스템과 스트림 프로세싱간에는 겹치는 지점이 존재한다. 스트림 처리를 액터 프레임워크를 이용해 하는 것도 가능하다.

Reasoning About Time

스트림 프로세서는 시간에 대처해야 한다. 특히 분석적 목적으로 쓰였을 때. 배치 과정에서, 처리 작업은 많은 역사적 이벤트의 모음을 빠르게 지나쳐간다. 배치 과정은 일 년치의 역사적 이벤트를 몇 분만에 읽을 수 있다. 반면, 많은 스트림 프로세싱 프레임워크는 프로세싱 기기에서의 로컬 시스템 시계로 구간을 판정한다.

Event time versus processing time

프로세싱이 지연되는 데는 여러 이유가 있다. 큐잉, 네트워크 실패, 메시지 브로커나 프로세서에 대한 경합을 낳는 성능 이슈, 스트림 소비자의 재시작, 고장이나 코드의 버그 수정으로부터 복구하는 동안 과거 이벤트의 재처리. 또한, 메시지 지연은 메시지의 순서를 예상치 못한 방식으로 바꿀 수 있다. 혼란스러운 이벤트 시간과 처리 시간은 나쁜 데이터를 낳는다.

Knowing when you’re ready

이벤트 시간에 대해 구간을 정의할 때 까다로운 문제는 특정 구간에 대한 모든 이벤트를 언제 받을 수 있게 되는지, 아니면 받아야 할 이벤트가 아직 남아 있는지를 알 수 없다는 것이다. 그러므로 구간이 이미 완료된 것으로 선언된 이후에 도달하는 낙오자 이벤트를 다룰 수 있어야 한다. 두 가지 선택지가 있다: 낙오자 이벤트를 무시한다. 왜냐하면 그들은 일반적인 환경에서 적은 비율의 이벤트일 것이므로. 아니면 교정을 출판한다. 낙오자가 포함된 구간의 업데이트된 값을 출판하는 것이다. 어떤 경우에는 특별한 메시지를 사용해 소비자로부터 구간을 트리거할 수 있는 데 쓰일 수 있도록 할 수 있다.

Whose clock are you using, anyway?

타임스탬프를 이벤트에 할당하는 것은 이벤트가 시스템의 여러 지점에 버퍼링될 수 있을 때 더 복잡해진다. 이 맥락에서, 이벤트의 타임스탬프는 모바일 기기의 로컬 시계에 따라 사용자 상호작용이 발생한 시점이어야 한다. 부정확한 기기 시계를 보정하기 위한 한 가지 방법은 3가지의 타임스탬프를 로깅하는 것이다: 디바이스 시계에 따라, 이벤트가 발생한 시각. 디바이스 시계에 따라, 이벤트가 서버에 전송된 시각. 서버 시계에 따라, 이벤트가 서버에 도달한 시각. 3번째 타임스탬프로부터 2번째 타임스탬프를 빼서, 디바이스 시계와 서버 시계간의 오프셋을 알 수가 있다. 이 문제는 스트림 처리에만 나타나는 것은 아니면 배치 처리도 시간을 고려할 때 똑같은 문제를 겪는다.

Types of windows

이벤트의 타임스탬프가 어떻게 결정되어야 하는지를 알았다면, 다음 단계는 시간 주기에 따른 구간이 어떻게 정의되어야 하는지를 결정하는 것이다. 여러 타입의 구간이 쓰일 수 있다: 텀블링 윈도우, 호핑 윈도우, 슬라이딩 윈도우, 세션 윈도우.

Stream Joins

배치 작업들이 데이터셋을 키로 어떻게 연결하는지, 이런 연결들이 데이터 파이프라인에서 어떻게 중요한 부분을 담당하는지를 알아보았다. 스트림에서는 새 이벤트가 아무 때나 등장할 수 있기 때문에 스트림에서의 연결은 배치에서보다 더 어렵다.

Stream-stream join (window join)

웹사이트에서 검색 특성이 있는데 검색된 URL에 대해 대해 최근 트렌드를 알아내고 싶다고 하자. 사용자가 검색을 버리면 클릭은 이루어지지 않을 것이고 클릭이 이루어지더라도 검색과 클릭간 시간은 매우 가변적일 것이다. 이 때 클릭 이벤트 내에서 검색의 세부 사항을 끼워넣는 것은 이벤트를 연결시키는 것과 같지 않다. 이런 형태의 연결을 구현하기 위해서는 스트림 프로세서가 상태를 유지해야 한다.

Stream-table join (stream enrichment)

이전에 두 데이터셋을 연결하는 배치 작업을 알아보았다: 사용자 활동 이벤트의 집합과 사용자 프로필의 데이터베이스. 이 연결을 수행하기 위해서는 스트림 과정은 한 번에 하나의 활동 이벤트를 들여다보아야 하고, 데이터베이스 내 이벤트의 사용자 ID를 탐색하고, 활동 이벤트에 프로필 정보를 추가해야 한다. 다른 접근법은 데이터베이스의 복제를 스트림 프로세서에 로드해 네트워크를 순회하지 않고서도 국소적으로 쿼리될 수 있게 하는 것이다. 배치 작업과으 차이점은 배치 작업은 데이터베이스의 특정 시점 스냅샷을 입력으로 쓰지만 스트림 프로세서는 장기적으로 동작하고 데이터베이스의 내용은 시간에 따라 변화할 가능성이 높으므로 스트림 프로세서의 데이터베이스에 대한 국소적 사본은 업데이트되어야 할 필요가 있다는 것이다. 스트림-테이블 결합은 스트림-스트림 결합과 매우 비슷하다.

Table-table join (materialized view maintenance)

사용자가 홈 타임라인을 보고 싶어할 때 사용자가 팔로잉하는 사람들을 모두 순회하고 최근 트윗을 찾고 이를 병합하는 것은 너무 비싸다. 그 대신 타임라인 캐시를 유지할 수 있다. 이 캐시를 스트림 프로세서 내에서 유지시키려면, 트윗에 대해 이벤트의 스트림과 팔로우 관계의 트림이 필요하다. 이 스트림 과정을 보는 다른 방법은 이것이 두 테이블을 연결하는 쿼리의 물질화된 뷰로 보는 것이다.

Time-dependence of joins

여기서 논한 세 가지 유형의 연결은 많은 공통점이 있다: 이들은 모두 스트림 프로세서가 한 연결 입력에 기반한 상태를 유지해야 하며 다른 연결 입력으로부터 그 상태를 메시지로 쿼리해야 한다. 상태가 유지되는 이벤트의 순서는 중요하다. 다른 스트림의 이벤트가 비슷한 시간에 일어나면 그들은 어떤 순서로 처리될까? 이런 시간 의존성은 여러 군데에서 발생한다. 스트림들 사이에서의 이벤트의 순서가 결정되지 않으면 연결은 비결정론적이 된다. 즉, 같은 입력에 대해 같은 작업을 재수행해도 꼭 같은 결과가 나오지 않는다. 데이터 웨어하우스에서 이 문제는 느리게 변화하는 차원(SCD)라 불리며 이는 연결된 기록의 특별한 버전에 대한 고유 식별자를 사용하여 해결된다.

Fault Tolerance

스트림 프로세서가 실패에 대처하는 법을 알아보자. 실패에 대한 대처를 할 떄 배치 접근법은 어느 작업들이 실패했더라도 배치 작업의 출력이 아무 것도 잘못되지 않은 것과 똑같이 나옴을 보장한다. 스트림 처리에서도 실패에 대한 대처는 같은 문제가 있지만 다루기에는 덜 간단하다.

Microbatching and checkpointing

한 해법은 스트림을 작은 블록으로 쪼개고 각 블록을 작은 배치 과정으로 다루는 마이크로배칭이다. 이는 암시적으로 배치 크기와 같은 텀블링 윈도우를 제공한다. 또 다른 접근법은 주기적으로 상태에 대한 롤링 체크포인트를 생성해 이를 견고한 저장소에 쓰는 것이다. 스트림 처리 프레임워크에 국한한다면, 마이크로배칭과 체크포인팅 접근법은 배치 처리와 똑같이 정확히 한 번 시맨틱을 제공한다. 하지만 출력이 스트림 프로세서를 떠나면 이들로는 불충분해진다.

Atomic commit revisited

실패가 존재할 때 정확히 한 번 처리하는 모습을 보여 주기 위해서는 모든 출력과 이벤트 처리의 부가 효과가 처리가 성공적인 것과 완전히 동치임을 보장해야 한다. 이러려면 모든 것들이 원자적으로 발생하거나, 그 중 어느 것도 반드시 일어나지는 않아야 하며, 다른 것과 합이 맞지 않으면 안 된다. 제한된 환경에서는 이러한 원자적 수행 기능을 더 효율적으로 구현할 수 있다.

Idempotence

목표는 실패한 작업의 부분적 출력을 버려서 이들이 두 번 효과를 일으키지 않고 안전하게 재시도될 수 있도록 하는 것인데, 이는 멱등 연산에 의존한다. 멱등 연산은 여러 번 수행해도 한 번 수행한 것과 같은 효과를 내는 연산이다. 연산자가 자연적으로 멱등이 아니더라도 약간의 메타데이터를 추가하면 멱등으로 만들 수 있다. 이렇게 되면 실패한 작업을 재시작하는 것은 각 메시지를 같은 순서로 재실행해야 하며, 처리는 결정론적이어야 하며, 다른 노드가 같은 값을 동시에 업데이트하면 안 된다. 한 처리 노드에서 다른 노드로 실패해 넘어갈 때에는 죽은 줄 알았지만 사실은 살아 있는 노드로부터의 간섭을 막기 위해 펜싱이 필요할 수 있다.

Rebuilding state after a failure

상태를 요구하는 스트림 과정은 이 상태가 실패로부터 복구될 수 있음을 보장해야 한다. 한 방법은 상태를 원격 데이터스토어에 보존하고 이를 복제하는 것이다. 다만 각 메시지에 대해 원격 데이터베이스에 쿼리를 하는 것은 느릴 수 있다. 다른 대안은 상태를 스트림 프로세서에 국소적으로 유지하고 이를 주기적으로 복제하는 것이다. 어떤 경우에는 상태를 복제하는 것이 필요하지 않을 수도 있다. 입력 스트림으로부터 재구성될 수 있기 때문이다. 하지만 이 모든 트레이드오프는 기반하는 인프라의 성능 특성에 의존하게 된다.

Summary

이 장에서는 이벤트 스트림, 그들이 동작하는 목적, 어떻게 그들을 처리하는지에 대해 논해 보았다. 어떤 방식으로는, 스트림 처리는 배치 처리와 매우 비슷하지만, 고정 크기 입력보다는 한계가 없는 (끝나지 않는) 스트림에 대해 연속적으로 이루어진다. 이 관점에서, 메시지 브로커와 이벤트 로그는 파일시스템의 스트리밍에서의 대응의 역할을 한다.

메시지 브로커의 두 유형을 비교해 보았다.

  • AMQP/JMS-형 메시지 브로커. 이 브로커는 각 메시지를 소비자에게 할당하고 소비자들은 각 메시지가 성공적으로 처리되었을 때 이를 인식한다. 메시지는 인식되었다면 브로커로부터 삭제된다. 이 접근법은 원격 프로시져 호출의 비동기적 형태로서 적절하다. 예를 들면 태스크 큐에서, 메시지 처리의 정확한 순서는 중요하지 않고 이들이 처리된 이후에 뒤로 돌아가서 오래 된 메시지들을 읽을 필요가 없기 때문이다.
  • 로그 기반 메시지 브로커. 이 브로커는 파티션 내 모든 메시지를 같은 소비자 노드에 할당하고, 항상 메시지를 같은 순서로 전송한다. 병렬화는 파티셔닝을 통해 얻어지고, 소비자들은 이들의 과정을 그들이 처리한 마지막 메시지의 오프셋을 체크포인트화해서 추적한다. 브로커는 디스크의 메시지를 보유함으로써, 필요할 경우 뒤로 점프해 오래 된 메시지를 다시 읽을 수 있다.

로그 기반 접근법은 데이터베이스에서 찾을 수 있는 복제 로그 및 로그 구조 저장소 엔진과 유사점을 갖는다. 이 접근법은 입력 스트림을 소모해 파생 상태와 파생 출력 스트림을 생성하는 스트림 처리 시스템에 특히 적합하다.

스트림이 어디에서 오는지에 대해서는, 여러 가능성을 논해 보았다. 사용자 활동 이벤트, 주기적 읽기를 제공하는 센서, 데이터 피드 (ex. 금융에서의 시장 데이터) 들이 자연적으로 스트림으로 표현된다. 데이터베이스에 대한 쓰기를 스트림으로 생각하는 것은 유용하다. 변경 기록들을 포착하는 것은 (e.x. 데이터베이스에 만들어진 모든 변경의 역사) 변경 데이터 포착을 통해 묵시적으로 이뤄지거나 이벤트 근원을 통해 명시적으로 이뤄질 수 있다. 로그 컴팩트화는 스트림이 데이터베이스의 내용에 대한 전체 사본을 유지하도록 한다.

데이터베이스를 스트림으로 표현하는 것은 시스템을 통합하는 데 있어 강력한 기회를 열어 준다. 검생 인덱스, 캐시, 분석 시스템과 같은 파생 데이터 시스템을 변경 로그를 소모하고 이를 파생 시스템에 적용함으로써 연속적으로 업데이트 시킬 수가 있다. 존재하는 데이터에 대한 신선한 뷰를 바닥부터 시작해서 처음부터 현재까지 이뤄진 변경 로그들을 모두 소모해서 만들 어낼 수도 있다.

상태를 스트림으로 유지하고 메시지를 재작동하는 기능은 여러 스트림 처리 프레임워크에서 스트림 연결과 실패에 대한 대처를 가능케 하는 기능들에 대한 기반이 된다. 이벤트 패턴 (복잡한 이벤트 처리)에 대한 탐색, 구간 합계 계산 (스트림 분석), 파생된 데이터 시스템을 업데이트되게 유지하는 것 (물질화된 뷰) 등 스트림 처리의 여러 목적을 논해보았다.

스트림 프로세서 내에서 처리 시간과 이벤트 타임스탬프간 구분, 그리고 구간이 끝났다고 생각한 뒤 도착하는 낙오자 이벤트를 다루는 문제 등 시간에 대해 고려하는 어려움도 알아보았다.

스트림 처리에서 발생할 수 있는 세 가지 유형의 연결에 대해서도 알아보았다.

  • 스트림-스트림 연결. 두 입력 스트림은 활동 이벤트로 이루어지고, 연결 연산자는 어떤 시간의 구간 내에 발생하는 관계된 이벤트를 탐색한다. 예를 들어, 같은 사용자에 의해 이루어진 두 동작이 서로 30분 이내일 때 이들을 매치시킬 수 있다. 이 두 연결 입력은 관계된 이벤트를 그 하나의 스트림 내에서 찾으려고 한다면 사실 같은 스트림 (자가-연결)이 된다.
  • 스트림-테이블 연결. 한 입력스트림은 활동 이벤트로 이루어지고 다른 스트림은 데이터베이스 변경 로그로 이루어진다. 변경 로그는 데이터베이스의 국소적 사본을 업데이트된 상태로 유지한다. 각 활동 이벤트에 대해, 연결 연산자는 데이터베이스를 쿼리해서 풍부한 활동 이벤트를 출력한다.
  • 테이블-테이블 연결. 양쪽 입력 스트림은 데이터베이스 변경 로그이다. 이 경우 한 쪽의 모든 변경은 다른 쪽의 마지막 상태와 연결된다. 결과는 두 테이블간 연결의 물질화된 뷰에 대한 변경들의 스트림이다.

마지막으로, 스트림 프로세서 내에서 실패에 대처하는 기법과 정확히 한 번 시맨틱에 대해 알아보았다. 배치 처리와 마찬가지로, 실패한 작업에 대해서는 부분적 출력을 버려야 한다. 하지만, 스트림 과정은 장기적이고 출력을 연속적으로 배출하기 때문에, 모든 출력을 버릴 순 없다. 대신, 미세 조정된 복구 메커니즘이 사용될 수 있다. 이는 마이크로배칭, 체크포인팅, 트랜잭션, 멱등 쓰기 등에 기반할 수 있다.

답글 남기기

아래 항목을 채우거나 오른쪽 아이콘 중 하나를 클릭하여 로그 인 하세요:

WordPress.com 로고

WordPress.com의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

Google photo

Google의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

Twitter 사진

Twitter의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

Facebook 사진

Facebook의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

%s에 연결하는 중