본문으로 바로가기
본문으로 바로가기

pg_clickhouse 튜토리얼

개요

이 튜토리얼은 [ClickHouse 튜토리얼]을 따르되, 모든 쿼리를 pg_clickhouse를 통해 실행합니다.

ClickHouse 시작하기

먼저, 아직 ClickHouse 데이터베이스가 없다면 생성하십시오. Docker 이미지로 빠르게 시작할 수 있습니다:

docker run -d --network host --name clickhouse -p 8123:8123 -p9000:9000 --ulimit nofile=262144:262144 clickhouse
docker exec -it clickhouse clickhouse-client

테이블 만들기

간단한 데이터베이스를 만들기 위해 [ClickHouse 튜토리얼]의 예시를 참고하여 The New York City 택시 데이터셋을 사용하겠습니다:

CREATE DATABASE taxi;
CREATE TABLE taxi.trips
(
    trip_id UInt32,
    vendor_id Enum8(
        '1'      =  1, '2'      =  2, '3'      =  3, '4'      =  4,
        'CMT'    =  5, 'VTS'    =  6, 'DDS'    =  7, 'B02512' = 10,
        'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
        ''       = 15
    ),
    pickup_date Date,
    pickup_datetime DateTime,
    dropoff_date Date,
    dropoff_datetime DateTime,
    store_and_fwd_flag UInt8,
    rate_code_id UInt8,
    pickup_longitude Float64,
    pickup_latitude Float64,
    dropoff_longitude Float64,
    dropoff_latitude Float64,
    passenger_count UInt8,
    trip_distance Float64,
    fare_amount Decimal(10, 2),
    extra Decimal(10, 2),
    mta_tax Decimal(10, 2),
    tip_amount Decimal(10, 2),
    tolls_amount Decimal(10, 2),
    ehail_fee Decimal(10, 2),
    improvement_surcharge Decimal(10, 2),
    total_amount Decimal(10, 2),
    payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
    trip_type UInt8,
    pickup FixedString(25),
    dropoff FixedString(25),
    cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
    pickup_nyct2010_gid Int8,
    pickup_ctlabel Float32,
    pickup_borocode Int8,
    pickup_ct2010 String,
    pickup_boroct2010 String,
    pickup_cdeligibil String,
    pickup_ntacode FixedString(4),
    pickup_ntaname String,
    pickup_puma UInt16,
    dropoff_nyct2010_gid UInt8,
    dropoff_ctlabel Float32,
    dropoff_borocode UInt8,
    dropoff_ct2010 String,
    dropoff_boroct2010 String,
    dropoff_cdeligibil String,
    dropoff_ntacode FixedString(4),
    dropoff_ntaname String,
    dropoff_puma UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;

데이터 세트 추가

그런 다음 데이터를 가져오십시오:

INSERT INTO taxi.trips
SELECT * FROM s3(
    'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz',
    'TabSeparatedWithNames', "
    trip_id UInt32,
    vendor_id Enum8(
        '1'      =  1, '2'      =  2, '3'      =  3, '4'      =  4,
        'CMT'    =  5, 'VTS'    =  6, 'DDS'    =  7, 'B02512' = 10,
        'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
        ''       = 15
    ),
    pickup_date Date,
    pickup_datetime DateTime,
    dropoff_date Date,
    dropoff_datetime DateTime,
    store_and_fwd_flag UInt8,
    rate_code_id UInt8,
    pickup_longitude Float64,
    pickup_latitude Float64,
    dropoff_longitude Float64,
    dropoff_latitude Float64,
    passenger_count UInt8,
    trip_distance Float64,
    fare_amount Decimal(10, 2),
    extra Decimal(10, 2),
    mta_tax Decimal(10, 2),
    tip_amount Decimal(10, 2),
    tolls_amount Decimal(10, 2),
    ehail_fee Decimal(10, 2),
    improvement_surcharge Decimal(10, 2),
    total_amount Decimal(10, 2),
    payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
    trip_type UInt8,
    pickup FixedString(25),
    dropoff FixedString(25),
    cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
    pickup_nyct2010_gid Int8,
    pickup_ctlabel Float32,
    pickup_borocode Int8,
    pickup_ct2010 String,
    pickup_boroct2010 String,
    pickup_cdeligibil String,
    pickup_ntacode FixedString(4),
    pickup_ntaname String,
    pickup_puma UInt16,
    dropoff_nyct2010_gid UInt8,
    dropoff_ctlabel Float32,
    dropoff_borocode UInt8,
    dropoff_ct2010 String,
    dropoff_boroct2010 String,
    dropoff_cdeligibil String,
    dropoff_ntacode FixedString(4),
    dropoff_ntaname String,
    dropoff_puma UInt16
") SETTINGS input_format_try_infer_datetimes = 0

쿼리할 수 있는지 확인한 후 클라이언트를 종료하십시오:

SELECT count() FROM taxi.trips;
quit

pg_clickhouse 설치

PGXN 또는 GitHub에서 pg_clickhouse를 빌드해 설치합니다. 또는 [pg_clickhouse image]를 사용해 Docker 컨테이너를 실행할 수 있으며, 이는 Docker Postgres image에 pg_clickhouse만 추가한 이미지입니다:

docker run -d --network host --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \
       -d ghcr.io/clickhouse/pg_clickhouse:18

pg_clickhouse 연결

이제 Postgres에 연결하십시오:

docker exec -it pg_clickhouse psql -U postgres

다음으로 pg_clickhouse를 생성합니다:

CREATE EXTENSION pg_clickhouse;

ClickHouse 데이터베이스의 호스트 이름, 포트, 데이터베이스 이름을 사용하여 외부 서버를 생성합니다.

CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
       OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');

여기서는 ClickHouse 바이너리 프로토콜을 사용하는 바이너리 드라이버를 선택했습니다. HTTP 인터페이스를 사용하는 "http" 드라이버를 사용할 수도 있습니다.

다음으로, PostgreSQL 사용자를 ClickHouse 사용자에 매핑합니다. 가장 간단한 방법은 현재 PostgreSQL 사용자를 foreign server의 원격 사용자에 매핑하는 것입니다:

CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
       OPTIONS (user 'default');

password 옵션도 지정할 수 있습니다.

이제 taxi 테이블을 추가합니다. 원격 ClickHouse 데이터베이스의 모든 테이블을 Postgres 스키마로 가져오십시오:

CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;

이제 테이블이 임포트된 상태여야 합니다: psql에서 \det+를 사용하여 확인하십시오:

taxi=# \det+ taxi.*
                                       List of foreign tables
 Schema | Table |  Server  |                        FDW options                        | Description
--------+-------+----------+-----------------------------------------------------------+-------------
 taxi   | trips | taxi_srv | (database 'taxi', table_name 'trips', engine 'MergeTree') | [null]
(1 row)

성공했습니다! \d를 사용해 모든 컬럼을 확인하세요:

taxi=# \d taxi.trips
                                   Foreign table "taxi.trips"
        Column         |           Type           | Collation | Nullable | Default | FDW options
-----------------------+--------------------------+-----------+----------+---------+-------------
 trip_id               | bigint                   |           | not null |         |
 vendor_id             | text                     |           | not null |         |
 pickup_date           | date                     |           | not null |         |
 pickup_datetime       | timestamp with time zone |           | not null |         |
 dropoff_date          | date                     |           | not null |         |
 dropoff_datetime      | timestamp with time zone |           | not null |         |
 store_and_fwd_flag    | smallint                 |           | not null |         |
 rate_code_id          | smallint                 |           | not null |         |
 pickup_longitude      | double precision         |           | not null |         |
 pickup_latitude       | double precision         |           | not null |         |
 dropoff_longitude     | double precision         |           | not null |         |
 dropoff_latitude      | double precision         |           | not null |         |
 passenger_count       | smallint                 |           | not null |         |
 trip_distance         | double precision         |           | not null |         |
 fare_amount           | numeric(10,2)            |           | not null |         |
 extra                 | numeric(10,2)            |           | not null |         |
 mta_tax               | numeric(10,2)            |           | not null |         |
 tip_amount            | numeric(10,2)            |           | not null |         |
 tolls_amount          | numeric(10,2)            |           | not null |         |
 ehail_fee             | numeric(10,2)            |           | not null |         |
 improvement_surcharge | numeric(10,2)            |           | not null |         |
 total_amount          | numeric(10,2)            |           | not null |         |
 payment_type          | text                     |           | not null |         |
 trip_type             | smallint                 |           | not null |         |
 pickup                | character varying(25)    |           | not null |         |
 dropoff               | character varying(25)    |           | not null |         |
 cab_type              | text                     |           | not null |         |
 pickup_nyct2010_gid   | smallint                 |           | not null |         |
 pickup_ctlabel        | real                     |           | not null |         |
 pickup_borocode       | smallint                 |           | not null |         |
 pickup_ct2010         | text                     |           | not null |         |
 pickup_boroct2010     | text                     |           | not null |         |
 pickup_cdeligibil     | text                     |           | not null |         |
 pickup_ntacode        | character varying(4)     |           | not null |         |
 pickup_ntaname        | text                     |           | not null |         |
 pickup_puma           | integer                  |           | not null |         |
 dropoff_nyct2010_gid  | smallint                 |           | not null |         |
 dropoff_ctlabel       | real                     |           | not null |         |
 dropoff_borocode      | smallint                 |           | not null |         |
 dropoff_ct2010        | text                     |           | not null |         |
 dropoff_boroct2010    | text                     |           | not null |         |
 dropoff_cdeligibil    | text                     |           | not null |         |
 dropoff_ntacode       | character varying(4)     |           | not null |         |
 dropoff_ntaname       | text                     |           | not null |         |
 dropoff_puma          | integer                  |           | not null |         |
Server: taxi_srv
FDW options: (database 'taxi', table_name 'trips', engine 'MergeTree')

이제 테이블을 쿼리하십시오:

 SELECT count(*) FROM taxi.trips;
   count
 ---------
  1999657
 (1 row)

쿼리가 매우 빠르게 실행된 점에 주목하십시오. pg_clickhouse는 COUNT() 집계를 포함한 전체 쿼리를 푸시다운하므로, ClickHouse에서 실행되고 Postgres에는 단일 행만 반환됩니다. 이를 확인하려면 EXPLAIN을 사용하십시오:

 EXPLAIN select count(*) from taxi.trips;
                    QUERY PLAN
 -------------------------------------------------
  Foreign Scan  (cost=1.00..-0.90 rows=1 width=8)
    Relations: Aggregate on (trips)
 (2 rows)

"Foreign Scan"이 실행 계획의 최상위에 표시된다는 점에 유의하십시오. 이는 전체 쿼리가 ClickHouse로 푸시다운되었음을 의미합니다.

데이터 분석

몇 가지 쿼리를 실행해 데이터를 분석합니다. 다음 예시를 살펴보거나 직접 SQL 쿼리를 실행해 보십시오.

  • 평균 팁 금액을 계산합니다:

    taxi=# \timing
    Timing is on.
    taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips;
     round
    -------
      1.68
    (1 row)
    
    Time: 9.438 ms
    
  • 승객 수를 기준으로 평균 비용을 계산하세요:

    taxi=# SELECT
            passenger_count,
            avg(total_amount)::NUMERIC(10, 2) AS average_total_amount
        FROM taxi.trips
        GROUP BY passenger_count;
     passenger_count | average_total_amount
    -----------------+----------------------
                   0 |                22.68
                   1 |                15.96
                   2 |                17.14
                   3 |                16.75
                   4 |                17.32
                   5 |                16.34
                   6 |                16.03
                   7 |                59.79
                   8 |                36.40
                   9 |                 9.79
    (10 rows)
    
    Time: 27.266 ms
    
  • 동네별 일일 픽업 수를 계산합니다:

    taxi=# SELECT
        pickup_date,
        pickup_ntaname,
        SUM(1) AS number_of_trips
    FROM taxi.trips
    GROUP BY pickup_date, pickup_ntaname
    ORDER BY pickup_date ASC LIMIT 10;
     pickup_date |         pickup_ntaname         | number_of_trips
    -------------+--------------------------------+-----------------
     2015-07-01  | Williamsburg                   |               1
     2015-07-01  | park-cemetery-etc-Queens       |               6
     2015-07-01  | Maspeth                        |               1
     2015-07-01  | Stuyvesant Town-Cooper Village |              44
     2015-07-01  | Rego Park                      |               1
     2015-07-01  | Greenpoint                     |               7
     2015-07-01  | Highbridge                     |               1
     2015-07-01  | Briarwood-Jamaica Hills        |               3
     2015-07-01  | Airport                        |             550
     2015-07-01  | East Harlem North              |              32
    (10 rows)
    
    Time: 30.978 ms
    
  • 각 이동의 소요 시간을 분 단위로 계산한 다음, 결과를 소요 시간별로 그룹화합니다:

    taxi=# SELECT
        avg(tip_amount) AS avg_tip,
        avg(fare_amount) AS avg_fare,
        avg(passenger_count) AS avg_passenger,
        count(*) AS count,
        round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes
    FROM taxi.trips
    WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0
    GROUP BY trip_minutes
    ORDER BY trip_minutes DESC
    LIMIT 5;
          avg_tip      |     avg_fare     |  avg_passenger   | count | trip_minutes
    -------------------+------------------+------------------+-------+--------------
                  1.96 |                8 |                1 |     1 |        27512
                     0 |               12 |                2 |     1 |        27500
     0.562727272727273 | 17.4545454545455 | 2.45454545454545 |    11 |         1440
     0.716564885496183 | 14.2786259541985 | 1.94656488549618 |   131 |         1439
      1.00945205479452 | 12.8787671232877 | 1.98630136986301 |   146 |         1438
    (5 rows)
    
    Time: 45.477 ms
    
  • 하루 중 시간대별로 동네별 승차 건수를 표시합니다:

    taxi=# SELECT
        pickup_ntaname,
        date_part('hour', pickup_datetime) as pickup_hour,
        SUM(1) AS pickups
    FROM taxi.trips
    WHERE pickup_ntaname != ''
    GROUP BY pickup_ntaname, pickup_hour
    ORDER BY pickup_ntaname, date_part('hour', pickup_datetime)
    LIMIT 5;
     pickup_ntaname | pickup_hour | pickups
    ----------------+-------------+---------
     Airport        |           0 |    3509
     Airport        |           1 |    1184
     Airport        |           2 |     401
     Airport        |           3 |     152
     Airport        |           4 |     213
    (5 rows)
    
    Time: 36.895 ms
    
  • 표시 시간대를 뉴욕으로 설정하고 라과디아 또는 JFK 공항으로 가는 승차 기록을 조회합니다:

    taxi=# SET timezone = 'America/New_York';
    SET
    taxi=# SELECT
        pickup_datetime,
        dropoff_datetime,
        total_amount,
        pickup_nyct2010_gid,
        dropoff_nyct2010_gid,
        CASE
            WHEN dropoff_nyct2010_gid = 138 THEN 'LGA'
            WHEN dropoff_nyct2010_gid = 132 THEN 'JFK'
        END AS airport_code,
        EXTRACT(YEAR FROM pickup_datetime) AS year,
        EXTRACT(DAY FROM pickup_datetime) AS day,
        EXTRACT(HOUR FROM pickup_datetime) AS hour
    FROM taxi.trips
    WHERE dropoff_nyct2010_gid IN (132, 138)
    ORDER BY pickup_datetime
    LIMIT 5;
        pickup_datetime     |    dropoff_datetime    | total_amount | pickup_nyct2010_gid | dropoff_nyct2010_gid | airport_code | year | day | hour
    ------------------------+------------------------+--------------+---------------------+----------------------+--------------+------+-----+------
     2015-06-30 20:04:14-04 | 2015-06-30 20:15:29-04 |        13.30 |                 -34 |                  132 | JFK          | 2015 |  30 |   20
     2015-06-30 20:09:42-04 | 2015-06-30 20:12:55-04 |         6.80 |                  50 |                  138 | LGA          | 2015 |  30 |   20
     2015-06-30 20:23:04-04 | 2015-06-30 20:24:39-04 |         4.80 |                -125 |                  132 | JFK          | 2015 |  30 |   20
     2015-06-30 20:27:51-04 | 2015-06-30 20:39:02-04 |        14.72 |                -101 |                  138 | LGA          | 2015 |  30 |   20
     2015-06-30 20:32:03-04 | 2015-06-30 20:55:39-04 |        39.34 |                  48 |                  138 | LGA          | 2015 |  30 |   20
    (5 rows)
    
    Time: 17.450 ms
    

딕셔너리 생성

ClickHouse 서비스의 테이블과 연결된 딕셔너리를 생성합니다. 이 테이블과 딕셔너리는 New York City의 각 동네마다 하나의 행을 포함하는 CSV 파일을 기반으로 합니다.

이 동네들은 New York City의 5개 자치구 (Bronx, Brooklyn, Manhattan, Queens, Staten Island)와 Newark Airport(EWR) 이름에 매핑됩니다.

다음은 사용할 CSV 파일의 일부를 테이블 형식으로 나타낸 것입니다. 파일의 LocationID 컬럼은 trips 테이블의 pickup_nyct2010_giddropoff_nyct2010_gid 컬럼에 매핑됩니다:

LocationIDBoroughZoneservice_zone
1EWRNewark AirportEWR
2QueensJamaica BayBoro Zone
3BronxAllerton/Pelham GardensBoro Zone
4ManhattanAlphabet CityYellow Zone
5Staten IslandArden HeightsBoro Zone
  1. 계속해서 Postgres에서 clickhouse_raw_query 함수를 사용해 ClickHouse dictionary taxi_zone_dictionary를 생성하고 S3의 CSV 파일에서 딕셔너리를 채우십시오:

    SELECT clickhouse_raw_query($$
        CREATE DICTIONARY taxi.taxi_zone_dictionary (
            LocationID Int64 DEFAULT 0,
            Borough String,
            zone String,
            service_zone String
        )
        PRIMARY KEY LocationID
        SOURCE(HTTP(URL 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv' FORMAT 'CSVWithNames'))
        LIFETIME(MIN 0 MAX 0)
        LAYOUT(HASHED_ARRAY())
    $$, 'host=localhost dbname=taxi');
    
    참고

    LIFETIME을 0으로 설정하면 S3 버킷에 대한 불필요한 트래픽을 피하기 위해 자동 업데이트가 비활성화됩니다. 다른 경우에는 다르게 구성할 수도 있습니다. 자세한 내용은 LIFETIME을 사용한 딕셔너리 데이터 갱신을 참조하십시오.

    1. 이제 이를 가져오십시오:
    IMPORT FOREIGN SCHEMA taxi LIMIT TO (taxi_zone_dictionary)
    FROM SERVER taxi_srv INTO taxi;
    
    1. 쿼리할 수 있는지 확인합니다:
    taxi=# SELECT * FROM taxi.taxi_zone_dictionary limit 3;
     LocationID |  Borough  |                     Zone                      | service_zone
    ------------+-----------+-----------------------------------------------+--------------
             77 | Brooklyn  | East New York/Pennsylvania Avenue             | Boro Zone
            106 | Brooklyn  | Gowanus                                       | Boro Zone
            103 | Manhattan | Governor's Island/Ellis Island/Liberty Island | Yellow Zone
    (3 rows)
    
    1. 좋습니다. 이제 쿼리에서 dictGet 함수를 사용해 자치구 이름을 조회합니다. 이 쿼리는 LaGuardia 또는 JFK 공항에서 끝나는 자치구별 택시 탑승 횟수를 합산합니다:
    taxi=# SELECT
            count(1) AS total,
            COALESCE(NULLIF(dictGet(
                'taxi.taxi_zone_dictionary', 'Borough',
                toUInt64(pickup_nyct2010_gid)
            ), ''), 'Unknown') AS borough_name
        FROM taxi.trips
        WHERE dropoff_nyct2010_gid = 132 OR dropoff_nyct2010_gid = 138
        GROUP BY borough_name
        ORDER BY total DESC;
     total | borough_name
    -------+---------------
     23683 | Unknown
      7053 | Manhattan
      6828 | Brooklyn
      4458 | Queens
      2670 | Bronx
       554 | Staten Island
        53 | EWR
    (7 rows)
    
    Time: 66.245 ms
    

    이 쿼리는 LaGuardia 또는 JFK 공항에서 끝나는 자치구별 택시 탑승 횟수를 합산합니다. 승차 동네를 알 수 없는 이동이 꽤 많다는 점에 유의하십시오.

조인 수행하기

taxi_zone_dictionarytrips 테이블과 조인하는 몇 가지 쿼리를 작성해 보겠습니다.

  1. 먼저 위의 공항 쿼리와 비슷하게 동작하는 간단한 JOIN부터 살펴보겠습니다.

    taxi=# SELECT
        count(1) AS total,
        "Borough"
    FROM taxi.trips
    JOIN taxi.taxi_zone_dictionary
      ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
    WHERE pickup_nyct2010_gid > 0
      AND dropoff_nyct2010_gid IN (132, 138)
    GROUP BY "Borough"
    ORDER BY total DESC;
     total | borough_name
    -------+---------------
      7053 | Manhattan
      6828 | Brooklyn
      4458 | Queens
      2670 | Bronx
       554 | Staten Island
        53 | EWR
    (6 rows)
    
    Time: 48.449 ms
    
    참고

    JOIN 쿼리의 출력은 위의 dictGet 쿼리와 동일합니다. 단, Unknown 값은 포함되지 않습니다. 내부적으로 ClickHouse는 실제로 taxi_zone_dictionary 딕셔너리에 대해 dictGet 함수를 호출하지만, JOIN 구문이 SQL 개발자에게는 더 익숙합니다.

    taxi=# explain SELECT
            count(1) AS total,
            "Borough"
        FROM taxi.trips
        JOIN taxi.taxi_zone_dictionary
          ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
        WHERE pickup_nyct2010_gid > 0
          AND dropoff_nyct2010_gid IN (132, 138)
        GROUP BY "Borough"
        ORDER BY total DESC;
                                  QUERY PLAN
    -----------------------------------------------------------------------
     Foreign Scan  (cost=1.00..5.10 rows=1000 width=40)
       Relations: Aggregate on ((trips) INNER JOIN (taxi_zone_dictionary))
    (2 rows)
    Time: 2.012 ms
    
  2. 이 쿼리는 팁 금액이 가장 큰 1000건의 이동에 대한 행을 반환한 다음, 각 행을 딕셔너리와 inner join합니다.

    taxi=# SELECT *
    FROM taxi.trips
    JOIN taxi.taxi_zone_dictionary
        ON trips.dropoff_nyct2010_gid = taxi.taxi_zone_dictionary."LocationID"
    WHERE tip_amount > 0
    ORDER BY tip_amount DESC
    LIMIT 1000;
    
참고

일반적으로 PostgreSQL과 ClickHouse에서는 SELECT * 사용을 피합니다. 실제로 필요한 컬럼만 조회해야 합니다.