본문으로 바로가기
본문으로 바로가기

Dataflow Pub/Sub to ClickHouse Template

Pub/Sub to ClickHouse Template은 Pub/Sub subscription에서 JSON으로 인코딩된 메시지를 읽어 ClickHouse 테이블에 기록하는 스트리밍 파이프라인입니다. 파싱에 실패하거나 대상 스키마(schema)에 매핑되지 못한 메시지는 데드 레터 대상으로 라우팅됩니다. 대상은 ClickHouse 테이블, Pub/Sub 토픽 또는 둘 다가 될 수 있습니다.

파이프라인 요구 사항

  • 원본 Pub/Sub subscription이 반드시 존재해야 합니다.
  • subscription에 게시되는 메시지는 유효한 JSON이어야 합니다.
  • 대상 ClickHouse 테이블이 반드시 존재해야 하며, 해당 컬럼 이름은 JSON 페이로드의 필드 이름과 일치해야 합니다.
  • Dataflow worker 머신에서 ClickHouse 호스트에 접근할 수 있어야 합니다.
  • 최소 1개의 데드 레터 대상(clickHouseDeadLetterTable 또는 deadLetterTopic)을 제공해야 합니다. 둘 다 제공하면 실패한 메시지는 두 대상 모두로 동시에 라우팅됩니다.
  • clickHouseDeadLetterTable이 설정된 경우, 데드 레터 테이블은 Dead-letter handling에 표시된 스키마로 ClickHouse에 이미 존재해야 합니다.
  • deadLetterTopic이 설정된 경우, Pub/Sub 토픽이 이미 존재해야 합니다.

Template 매개변수



매개변수 이름매개변수 설명필수참고
inputSubscription메시지를 읽어올 Pub/Sub subscription입니다. 예시: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.메시지는 JSON으로 인코딩되어 있어야 합니다.
clickHouseUrlClickHouse 엔드포인트 URL입니다. SSL 연결(ClickHouse Cloud)에는 https://를, 비SSL 연결에는 http://를 사용하십시오. 예시: https://<HOST>:8443 또는 http://<HOST>:8123.ClickHouse Cloud의 경우 포트 8443의 HTTPS 엔드포인트를 사용하십시오.
clickHouseDatabase대상 테이블이 있는 ClickHouse 데이터베이스 이름입니다. 예시: default.
clickHouseTable데이터를 기록할 ClickHouse 테이블 이름입니다.파이프라인을 실행하기 전에 테이블이 미리 존재해야 합니다.
clickHouseUsernameClickHouse 인증에 사용할 사용자 이름입니다.
clickHousePasswordClickHouse 인증에 사용할 비밀번호입니다.
clickHouseDeadLetterTable실패한 메시지를 기록할 ClickHouse 테이블입니다. 예시: my_table_dead_letter.clickHouseDeadLetterTable 또는 deadLetterTopic 중 하나 이상을 제공해야 합니다. 이 테이블은 Dead-letter handling에 나온 데드 레터 스키마로 미리 생성되어 있어야 합니다.
deadLetterTopic실패한 메시지를 게시할 Pub/Sub 토픽입니다. 예시: projects/<PROJECT_ID>/topics/<TOPIC_NAME>.clickHouseDeadLetterTable 또는 deadLetterTopic 중 하나 이상을 제공해야 합니다. 실패한 payload는 errorMessagefailedAt가 메시지 속성으로 설정된 상태로 해당 토픽에 게시됩니다.
windowSeconds시간 기반 배칭 윈도우의 지속 시간(초)입니다.batchRowCount와의 상호작용은 Batching and windowing을 참조하십시오. 둘 다 설정하지 않으면 결합 모드에서 기본값 30s1000행이 사용됩니다.
batchRowCountClickHouse로 플러시하기 전에 누적할 행 수입니다.windowSeconds와의 상호작용은 Batching and windowing을 참조하십시오.
maxInsertBlockSizeClickHouse로 전송되는 INSERT statement당 최대 행 수입니다. 기본값은 1,000,000입니다.ClickHouseIO 옵션입니다.
maxRetries실패한 ClickHouse 삽입에 대해 재시도할 수 있는 최대 횟수입니다. 기본값은 5입니다.ClickHouseIO 옵션입니다.
insertDeduplicate복제된 ClickHouse 테이블에서 INSERT 쿼리에 대한 중복 제거를 활성화할지 여부입니다. 기본값은 true입니다.ClickHouseIO 옵션입니다.
insertQuorum복제된 테이블의 INSERT 쿼리에서 지정한 수의 레플리카가 쓰기를 확인하고 데이터 추가가 선형화될 때까지 대기합니다. 0은 quorum 쓰기를 비활성화합니다.ClickHouseIO 옵션입니다. 기본 server settings에서는 비활성화되어 있습니다.
insertDistributedSync활성화하면 분산 테이블에 대한 INSERT 쿼리는 데이터가 클러스터의 모든 노드로 전송될 때까지 대기합니다. 기본값은 true입니다.ClickHouseIO 옵션입니다.
참고

모든 ClickHouseIO 매개변수의 기본값은 ClickHouseIO Apache Beam Connector에서 확인할 수 있습니다.

메시지 형식 및 스키마 매핑

Pub/Sub 메시지는 최상위 필드 이름이 대상 ClickHouse 테이블의 컬럼 이름과 정확히 일치하는 JSON 객체여야 합니다.

수신 메시지를 대상 테이블에 매핑하기 위해 파이프라인은 시작 시 다음 작업을 수행합니다:

  1. 대상 ClickHouse 테이블의 스키마를 가져옵니다.
  2. 해당 ClickHouse 스키마를 기반으로 Beam Row 스키마를 생성합니다.
  3. 수신되는 각 Pub/Sub 메시지에 대해 JSON 페이로드를 파싱하고, ClickHouse 스키마에 정의된 필드를 읽어 행을 구성합니다.

참조

JSON 필드 이름은 ClickHouse 컬럼 이름과 정확히 일치해야 합니다(일치는 대/소문자를 구분합니다). 메시지에 포함된 필드 중 ClickHouse 컬럼에 해당하지 않는 필드는 무시됩니다. ClickHouse 컬럼에 대응하는 필드가 JSON 페이로드에 없으면 파이프라인은 해당 컬럼에 NULL을 기록하려고 시도하며, 이는 해당 컬럼이 널 허용으로 선언된 경우에만 성공합니다. 파싱에 실패한 메시지, 값이 컬럼 타입으로 강제 변환될 수 없는 메시지, 또는 널 비허용 컬럼에 NULL을 기록하게 되는 메시지는 데드 레터 대상으로 라우팅됩니다.

타입 변환

JSON 값은 해당 ClickHouse 컬럼 타입(column type)에 맞게 강제 변환됩니다:

ClickHouse 타입참고
Float32Float.valueOf로 파싱됩니다.
Float64Double.valueOf로 파싱됩니다.
DateISO-8601 날짜 문자열로 파싱됩니다.
DateTimeISO-8601 날짜/시간 문자열(예: 2026-01-15T12:34:56Z)로 파싱됩니다.
Array(T)JSON 배열이며, 각 요소는 요소 타입 T로 변환됩니다. 비어 있거나 누락된 배열은 빈 배열로 처리됩니다.
Integer types (Int8/Int16/Int32/Int64, UInt8/UInt16/UInt32/UInt64)JSON 숫자 또는 해당 값의 문자열 표현에서 파싱됩니다.
String텍스트 필드에는 그대로 사용되며, 텍스트가 아닌 JSON 노드는 JSON 문자열 형태로 직렬화됩니다.

배칭 및 윈도잉

파이프라인이 스트리밍 방식이므로, 들어오는 행은 ClickHouse로 플러시되기 전에 윈도우 단위로 누적됩니다. 윈도잉 전략은 지정한 매개변수에 따라 선택됩니다.

windowSecondsbatchRowCount동작
설정됨설정되지 않음windowSeconds를 기준으로 하는 시간 기반 고정 윈도우입니다.
설정되지 않음설정됨개수 트리거가 있는 전역 윈도우이며, batchRowCount개 행마다 실행됩니다.
둘 다 설정됨둘 다 설정됨결합 트리거가 있는 전역 윈도우이며, 시간 또는 행 수 조건 중 먼저 충족되는 조건에서 실행됩니다.
둘 다 설정되지 않음둘 다 설정되지 않음기본값이 적용된 결합 모드입니다. 30초 또는 1000개 행 중 먼저 도달하는 조건이 사용됩니다.

이 값을 조정하면 지연 시간과 삽입 효율성 사이를 조정할 수 있습니다. 더 작은 윈도우는 종단 간 지연 시간을 줄이고, 더 큰 윈도우는 더 적지만 더 큰 INSERT 배치를 생성합니다.

데드 레터 처리

JSON 파싱, 스키마 매핑 또는 유형 강제 변환에 실패한 메시지는 구성된 데드 레터 대상에 라우팅됩니다. clickHouseDeadLetterTable 또는 deadLetterTopic 중 최소 하나를 지정해야 합니다. 둘 다 설정하면 실패한 메시지가 두 대상 모두로 전송됩니다.

ClickHouse 데드 레터 테이블

clickHouseDeadLetterTable을 설정한 경우, 데드 레터 테이블은 다음의 고정 스키마로 미리 생성되어 있어야 합니다:

컬럼유형설명
raw_messageString원본 Pub/Sub 메시지 페이로드를 UTF-8 텍스트로 저장한 값입니다.
error_messageString해당 행이 실패한 이유를 설명하는 예외 메시지입니다.
stack_traceString실패 시점에 캡처된 전체 Java 스택 트레이스입니다.
failed_atDateTime행 처리가 실패한 시점의 처리 시간 타임스탬프입니다.

단일 노드 배포를 위한 최소 정의:

CREATE TABLE my_table_dead_letter (
    raw_message   String,
    error_message String,
    stack_trace   String,
    failed_at     DateTime
) ENGINE = MergeTree()
ORDER BY failed_at;
참고

배포 환경에 맞게 엔진과 ORDER BY 절을 조정하십시오. 복제된 테이블(Replicated Table)에는 ReplicatedMergeTree를 사용하고, 분산 구성에서는 ON CLUSTER를 추가하며, 필요에 따라 파티셔닝 또는 TTL을 조정하십시오.

Pub/Sub 데드 레터 토픽

deadLetterTopic를 설정하면 실패한 각 메시지가 다음 정보와 함께 해당 토픽으로 다시 게시됩니다:

  • Payload: 원본 메시지 바이트입니다.
  • Attribute errorMessage: 실패 시점에 캡처된 예외 메시지입니다.
  • Attribute failedAt: 해당 행이 실패한 처리 시점의 timestamp입니다.

이렇게 하면 근본 원인인 스키마(스키마) 또는 프로듀서 문제가 해결된 후 실패한 메시지를 편리하게 재처리할 수 있습니다.

Template 실행

Pub/Sub to ClickHouse Template은 Google Cloud Console에서 사용할 수 있습니다.

참고

Template의 구성 요구 사항과 사전 요구 사항을 충분히 이해할 수 있도록 이 문서, 특히 위 섹션을 반드시 검토하십시오.

Google Cloud Console에 로그인한 다음 Dataflow를 검색하십시오.

  1. CREATE JOB FROM TEMPLATE 버튼을 클릭하십시오.

    Dataflow 콘솔
  2. Template 양식이 열리면 작업 이름을 입력하고 원하는 리전을 선택하십시오.

  3. Dataflow Template 입력란에 ClickHouse 또는 Pub/Sub를 입력한 다음 Pub/Sub to ClickHouse Template을 선택하십시오.

  4. Template을 선택하면 양식이 확장됩니다. 다음 항목을 입력하십시오.

    • projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> 형식의 Pub/Sub 입력 subscription.
    • ClickHouse 엔드포인트 URL — ClickHouse Cloud의 경우 https://<HOST>:8443를 사용하십시오.
    • ClickHouse 데이터베이스, 대상 테이블, 사용자 이름, 비밀번호.
    • 하나 이상의 데드 레터 대상: ClickHouse 테이블 또는 Pub/Sub 토픽(또는 둘 다).
  5. 필요에 따라 Template parameters 섹션에 설명된 대로 배칭(windowSeconds, batchRowCount) 및 ClickHouseIO 튜닝 매개변수를 사용자 지정하십시오.

작업 상태 모니터링

작업 상태를 모니터링하려면 Google Cloud Console의 Dataflow Jobs 탭으로 이동하십시오. 여기에서 진행 상황과 오류를 포함한 작업 세부 정보를 확인할 수 있습니다.

실행 중인 Pub/Sub to ClickHouse 작업이 표시된 Dataflow 콘솔

이 템플릿은 또한 PubSubToClickHouse 네임스페이스 아래에 다음과 같은 사용자 정의 메트릭을 내보냅니다. 이 메트릭은 Dataflow 작업 페이지에서 확인할 수 있습니다.

메트릭유형설명
messages-receivedCounter파싱 단계에서 수신한 Pub/Sub 메시지의 총수입니다.
rows-parsed-okCounter성공적으로 행으로 변환되어 기본 출력으로 라우팅된 메시지입니다.
rows-parse-failedCounter파싱 또는 스키마 매핑에 실패하여 데드 레터로 라우팅된 메시지입니다.
message-payload-bytesDistribution수신된 Pub/Sub 메시지 페이로드 크기의 분포(바이트 단위)입니다.

문제 해결

메모리 제한(전체) 초과 오류(코드 241)

이 오류는 ClickHouse가 대용량 데이터 배치를 처리하는 중 메모리가 부족할 때 발생합니다. 이 문제를 해결하려면 다음과 같이 하십시오.

  • 인스턴스 리소스를 늘리십시오: 데이터 처리 부하를 감당할 수 있도록 메모리가 더 많은 더 큰 인스턴스로 ClickHouse 서버를 업그레이드하십시오.
  • 배치 크기를 줄이십시오: Dataflow 작업 구성에서 batchRowCount(및/또는 maxInsertBlockSize)를 줄여 더 작은 데이터 청크를 ClickHouse로 전송하면 배치당 메모리 사용량을 줄일 수 있습니다.

모든 메시지가 데드 레터 대상(dead-letter destination)으로 이동하는 경우

가장 일반적인 원인은 다음과 같습니다.

  • JSON 필드 이름이 ClickHouse 컬럼 이름과 정확히 일치하지 않습니다(일치는 대소문자를 구분합니다).
  • JSON 값이 컬럼 유형으로 변환되지 않습니다(예: DateTime 컬럼에 ISO-8601 형식이 아닌 문자열이 있는 경우).
  • 파이프라인이 시작된 이후 대상 테이블 스키마가 변경되었습니다 — 스키마는 시작 시 한 번만 가져옵니다. 스키마 변경을 적용한 후 job을 다시 시작하십시오.

근본 원인을 파악하려면 ClickHouse 데드 레터 테이블의 error_messagestack_trace 컬럼(또는 Pub/Sub 데드 레터 메시지의 errorMessage 속성)을 확인하십시오.

파이프라인이 시작되지만 ClickHouse에 행이 도착하지 않습니다

  • subscription이 메시지를 수신하고 있는지 확인하십시오. Dataflow 작업 페이지에서 messages-received 메트릭을 확인하십시오.
  • 시간 기반 모드(windowSeconds만 사용)에서는 행이 윈도 경계에서만 플러시됩니다. 플러시가 발생하는지 확인하려면 windowSeconds 값을 낮추십시오.
  • Dataflow worker와 ClickHouse 엔드포인트 사이에 네트워크 연결이 가능한지 확인하십시오(방화벽, VPC 피어링 또는 Private Service Connect).

Template 소스 코드

이 Template의 소스 코드는 다음 리포지토리에서 확인할 수 있습니다: