Snowflake Ingestion Utilizing CockroachDB and Redpanda – DZone – Uplaza

Earlier Articles on Snowflake

Earlier Articles on CockroachDB CDC


Motivation

I work with monetary providers shoppers, and it’s normal to come across a necessity for streaming adjustments within the operational information retailer into a knowledge warehouse or a knowledge lake. A former colleague not too long ago reached out for recommendation on the quickest and best solution to load commerce information into Snowflake. I’ve provide you with at the least three strategies, which I’ll discover in a follow-up collection of articles. Nonetheless, I’ve determined to first discover Redpanda Join, an answer that has not too long ago caught my consideration. That is in no way a conclusive information on how changefeed information have to be loaded into Snowflake; we’re merely exploring the probabilities and discussing the professionals and cons in later articles.

CockroachDB changefeeds are an enterprise characteristic and require a license. On this tutorial, I am utilizing a free-to-start model of CockroachDB Serverless, which has enterprise changefeeds enabled.

Excessive-Degree Steps

  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy Redpanda Join
  • Deploy Snowflake
  • Confirm
  • Conclusion

Step-By-Step Directions

Deploy a CockroachDB Cluster With Enterprise Changefeeds

Begin an occasion of CockroachDB or use the managed service.

To allow CDC we have to execute the next instructions:

SET CLUSTER SETTING cluster.group = '';

SET CLUSTER SETTING enterprise.license="";

SET CLUSTER SETTING kv.rangefeed.enabled = true;

I’m utilizing CockroachDB Serverless and the above steps aren’t crucial. Chances are you’ll verify whether or not the changefeeds are certainly enabled utilizing the next command:

SHOW CLUSTER SETTING kv.rangefeed.enabled;

If the worth is false, change it to true.

Generate pattern information:

CREATE TABLE office_dogs (
     id INT PRIMARY KEY,
     identify STRING);

INSERT INTO office_dogs VALUES
   (1, 'Petee'),
   (2, 'Carl');

UPDATE office_dogs SET identify="Petee H" WHERE id = 1;

We have populated the desk after which up to date a document. Let’s add extra information to make it fascinating:

INSERT INTO office_dogs SELECT generate_series(3, 10000), md5(random()::string);
SELECT * FROM office_dogs LIMIT 5;
id,identify
1,Petee H
2,Carl
3,6e19280ae649efffa7a58584c7f46032
4,5e4e897f008bb752c8edfa64a3aed356
5,abc0d898318d27f23a43060f89d62e34
SELECT COUNT(*) FROM office_dogs;

I am working Redpanda Join in a Docker Compose file.

docker compose -f compose-redpanda.yaml up -d

The contents of the file are:

providers:

  redpanda:
    container_name: redpanda-connect
    hostname: redpanda-connect
    picture: docker.redpanda.com/redpandadata/join
    volumes:
      - ./redpanda/join.yaml:/join.yaml
      - /Customers/aervits/.ssh/rsa_key.pem:/rsa_key.pem

I will likely be utilizing the join.yaml file as the inspiration to attach all of the elements on this article. For extra detailed info, you possibly can seek advice from the documentation supplied by Redpanda.

Probably the most fundamental configuration appears like so:

enter:
  stdin: {}

pipeline:
  processors: []

output:
  stdout: {}

Since I am utilizing CockroachDB enter, mine appears like this:

enter:
  # CockroachDB Enter
  label: ""
  cockroachdb_changefeed:
    dsn: postgresql://:@:/?sslmode=verify-full
    tls:
      skip_cert_verify: true
      #enable_renegotiation: false
      #root_cas: ""
      #root_cas_file: ""
      client_certs: []
    tables: [table_for_cdc] # No default (required)
    cursor_cache: "" # No default (non-obligatory)
    auto_replay_nacks: true

pipeline:
  processors: []

output:
  stdout: {}

Go away the pipeline and output as default.

For reference, I am together with the repo with my supply code the place you possibly can reference the values.

You probably have been following alongside, you could have seen that I have not began a changefeed job in CockroachDB. The cockroachdb_changefeed enter immediately subscribes to the desk, which could be noticed by inspecting the logs utilizing the command docker logs redpanda-connect --follow. When you take a look at the join.yaml file, the output is distributed to stdout:

{"primary_key":"[9998]","row":"{"after": {"id": 9998, "identify": "0794a9d1c99e8e47ee4515be6e0d736f"}}","table":"office_dogs"}
{"primary_key":"[9999]","row":"{"after": {"id": 9999, "identify": "c85a6b38154f7e3085d467d567141d45"}}","table":"office_dogs"}
{"primary_key":"[10000]","row":"{"after": {"id": 10000, "identify": "aae9e0849fff8f47e0371a4c06fb255b"}}","table":"office_dogs"}

The subsequent step is to configure Snowflake. We’re not going to take a look at the out there processors in the present day.

Deploy Snowflake

I am utilizing a Snowflake trial account. You get a beneficiant credit score which needs to be enough to finish this tutorial.

We have to create a database and a desk the place we are going to output the changefeed information.

CREATE OR REPLACE DATABASE FROM_COCKROACH;
CREATE OR REPLACE TABLE OFFICE_DOGS (RECORD variant);

We additionally must create a consumer with key-pair authentication as we’ll be utilizing the Snowpipe service.

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

We should use an encrypted key as Redpanda does not help unencrypted variations.

Generate a public key:

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 

Lastly, generate a pem file from the personal key:

openssl pkcs8 -in rsa_key.p8 -out rsa_key.pem

In Snowflake, alter the consumer to make use of the important thing pair generated within the earlier step.

ALTER USER username SET rsa_public_key='MIIB...';

We will now populate the join.yaml file with the required info for the snowflake_put output. This output sort is for business use and requires a license, however since we’re utilizing it for demo functions, we’re in a position to proceed.

output:
  # Snowflake Output
  label: ""
  snowflake_put:
    account: 
    consumer: 
    private_key_file: rsa_key.pem
    function: ACCOUNTADMIN
    database: 
    warehouse: 
    schema: 
    stage: "@%implicit_table_stage_name"
    path: "path"
    upload_parallel_threads: 4
    compression: NONE
    batching:
      rely: 10
      interval: 3s
      processors:
        - archive:
            format: json_array
    max_in_flight: 1

If we restart the compose atmosphere and tail the logs, we are able to see the next:

degree=information msg="Running main config from specified file" @service=benthos benthos_version=v4.32.1 path=/join.yaml
degree=information msg="Listening for HTTP requests at: http://0.0.0.0:4195" @service=benthos
degree=information msg="Launching a Redpanda Connect instance, use CTRL+C to close" @service=benthos
degree=information msg="Output type snowflake_put is now active" @service=benthos label="" path=root.output
degree=information msg="Input type cockroachdb_changefeed is now active" @service=benthos label="" path=root.enter

Let’s take a look at the implicit desk stage and observe if something has modified.

| canines/f2f3cf47-d6bc-46f4-88f2-c82519b67481.json | 1312 | 30f709e4962bae9d10b48565d22e9f32 | Wed, 14 Aug 2024 18:58:43 GMT |
| canines/f6adbf39-3955-4848-93c3-06f873a88078.json | 1312 | 28be7a619ef1e139599077e977ea130b | Wed, 14 Aug 2024 18:58:13 GMT |
| canines/f8705606-eb07-400a-9ffe-da6834fa1a30.json | 1296 | 5afbdce0e8929fc38a2eb5e0f12b96d6 | Wed, 14 Aug 2024 18:57:29 GMT |
| canines/f9e5c01a-7dda-4e76-840d-13b8a1e4946a.json | 1296 | 5480c01f1578f67afe2761c7619e9123 | Wed, 14 Aug 2024 18:57:32 GMT |
| canines/fad4efe7-3f3f-48bc-bdb4-9f0310abcf4d.json | 1312 | 5942c6e2dbaef5ee257d4a9b8e68827d | Wed, 14 Aug 2024 18:58:04 GMT |

The recordsdata are able to be copied right into a desk. Let’s create a pipe:

CREATE OR REPLACE PIPE FROM_COCKROACH.PUBLIC.cockroach_pipe AUTO_INGEST = FALSE AS COPY INTO FROM_COCKROACH.PUBLIC.OFFICE_DOGS FROM (SELECT * FROM @%office_dogs) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO STRIP_OUTER_ARRAY = TRUE);

The final remaining step is to refresh the pipe.

ALTER PIPE cockroach_pipe REFRESH;
| canines/ff0871b1-6f49-43a4-a929-958d07f74046.json | SENT   |
| canines/ff131d8d-3781-4cf6-8700-edd50dbb87de.json | SENT   |
| canines/ff216da1-4f9d-4b37-9776-bcd559dd4a6f.json | SENT   |
| canines/ff221430-4c3a-46be-bbc2-d335cc6cc9e3.json | SENT   |
| canines/ffbd7d45-5084-4e36-8907-61874ac652b4.json | SENT   |
| canines/fffb5fa6-23cc-4450-934a-29ccf01c67b9.json | SENT   |

Let’s question the desk in Snowflake:

SELECT * FROM OFFICE_DOGS LIMIT 5;
| {                                                                                       |
|   "primary_key": "[5241]",                                                              |
|   "row": "{"after": {"id": 5241, "identify": "5e0360a0d10d849afbbfa319a50bccf2"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5242]",                                                              |
|   "row": "{"after": {"id": 5242, "identify": "62be250249afe74bfbc5dd356e7b0ad9"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5243]",                                                              |
|   "row": "{"after": {"id": 5243, "identify": "7f286800a8a03e74938d09fdba52f869"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5244]",                                                              |
|   "row": "{"after": {"id": 5244, "identify": "16a330b8f09bcd314f9760ffe26d0ae2"}}", |
|   "table": "office_dogs"                                                                |
| }

We anticipate 10000 rows:

SELECT COUNT(*) FROM OFFICE_DOGS;
+----------+                                                                    
| COUNT(*) |
|----------|
|    10000 |
+----------+

The info is in JSON format. Let’s create a view and flatten the information out.

CREATE VIEW v_office_dogs AS
    SELECT PARSE_JSON(document:row):after:id::INTEGER AS id,
           PARSE_JSON(document:row):after:identify::STRING AS identify FROM OFFICE_DOGS;

Question the view:

SELECT * FROM v_office_dogs WHERE id 
+----+----------------------------------+                                       
| ID | NAME                             |
|----+----------------------------------|
|  1 | Petee H                          |
|  2 | Carl                             |
|  3 | 6e19280ae649efffa7a58584c7f46032 |
|  4 | 5e4e897f008bb752c8edfa64a3aed356 |
|  5 | abc0d898318d27f23a43060f89d62e34 |
+----+----------------------------------+

Let’s make issues a bit extra fascinating and delete information in CockroachDB.

DELETE FROM office_dogs WHERE identify="Carl";
DELETE FROM office_dogs WHERE id = 1;

In Snowflake, let’s refresh the pipe as of some minutes in the past:

ALTER PIPE cockroach_pipe REFRESH MODIFIED_AFTER='2024-08-14T12:10:00-07:00';

Discover there are a few recordsdata.

+------------------------------------------------+--------+                     
| File                                           | Standing |
|------------------------------------------------+--------|
| canines/2a4ee400-6b37-4513-97cb-097764a340bc.json | SENT   |
| canines/8f5b5b69-8a00-4dbf-979a-60c3814d96b4.json | SENT   |
+------------------------------------------------+--------+

I need to warning that in case you run the REFRESH manually, it’s possible you’ll trigger duplicates in your Snowflake information. We are going to take a look at higher approaches in a future article.

Let’s take a look at the row rely:

+----------+                                                                    
| COUNT(*) |
|----------|
|    10002 |
+----------+

The elimination course of did not correctly replace in Snowflake as anticipated; it acknowledged the deleted information however did not mirror the state in CockroachDB. We have to incorporate further logic to realize this. This will likely be a activity for an additional time.

Lastly, I wish to be aware that utilizing Redpanda Join as a compose file is non-obligatory. You’ve gotten the choice to run the Docker container by executing the next command:

docker run --rm -it -v ./redpanda/join.yaml:/join.yaml -v ./snowflake/rsa_key.pem:/rsa_key.pem docker.redpanda.com/redpandadata/join run

Conclusion

Right now, we explored Redpanda Join as a method to ship streaming changefeeds into Snowflake. We have solely simply begun to delve into this subject, and future articles will construct upon the foundations laid in the present day.

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version