Exploring Actual-Time Information Ingestion Into Snowflake – DZone – Uplaza

Earlier Articles on Snowflake

Earlier Articles on CockroachDB CDC


This text builds upon the earlier dialogue in “Tour of Snowflake ingestion using CockroachDB and Redpanda Connect,” the place we investigated the method of streaming changefeeds from CockroachDB to Snowflake utilizing Redpanda Join and Snowpipe in batch mode. Right here, we’ll shift our focus to Kafka Join and show how each batch and streaming modes may be utilized for knowledge ingestion into Snowflake.

  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy Snowflake
  • Deploy Kafka Join
  • Confirm
  • Conclusion

Deploy a CockroachDB Cluster With Enterprise Changefeeds

Begin by both launching a CockroachDB occasion or using a managed service.

  • To allow CDC, execute the next instructions:
SET CLUSTER SETTING cluster.group = '';
SET CLUSTER SETTING enterprise.license="";
SET CLUSTER SETTING kv.rangefeed.enabled = true;
  • Confirm that changefeeds are enabled:
SHOW CLUSTER SETTING kv.rangefeed.enabled;

If the worth is false, replace it to true.

CREATE TABLE cockroachdb (
     id INT PRIMARY KEY,
     worth STRING DEFAULT md5(random()::textual content),
     created_at TIMESTAMPTZ DEFAULT now(),
     updated_at TIMESTAMPTZ DEFAULT NULL);
INSERT INTO cockroachdb SELECT
   (generate_series(1, 10000));
UPDATE cockroachdb SET worth="UPDATED", updated_at = now() WHERE id = 1;
  • Create a changefeed job pointing to a neighborhood occasion of Redpanda:
CREATE CHANGEFEED FOR TABLE cockroachdb INTO 'kafka://redpanda:29092';
SELECT * FROM cockroachdb LIMIT 5;
  id |              worth               |          created_at           |          updated_at
-----+----------------------------------+-------------------------------+--------------------------------
   1 | UPDATED                          | 2024-09-09 13:17:57.837984+00 | 2024-09-09 13:17:57.917108+00
   2 | 27a41183599c44251506e2971ba78426 | 2024-09-09 13:17:57.837984+00 | NULL
   3 | 3bf8bc26a750a15691ec4d7ddbb7f5e5 | 2024-09-09 13:17:57.837984+00 | NULL
   4 | b8c5786e8651ddfb3a68eabeadb52f2e | 2024-09-09 13:17:57.837984+00 | NULL
   5 | 3a24df165773639ce89d0d877e7103b7 | 2024-09-09 13:17:57.837984+00 | NULL
(5 rows)

The following step is to arrange the Snowflake Kafka connector.

  • Create a database and schema for outputting changefeed knowledge:
USE ROLE SYSADMIN;
CREATE OR REPLACE DATABASE KAFKADB;
CREATE OR REPLACE SCHEMA kafka_schema;

Observe the Snowflake documentation to configure the Kafka connector.

  • Create the mandatory tables:
create or exchange desk kafkatb_batch(
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT
);

create or exchange desk kafkatb_streaming(
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT
);
  • Arrange roles and permissions:
-- Use a job that may create and handle roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake function with the privileges to work with the connector.
CREATE OR REPLACE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafkadb TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;

-- Solely required if the Kafka connector will load knowledge into an current desk.
GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_batch TO ROLE kafka_connector_role_1;
GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_streaming TO ROLE kafka_connector_role_1;

-- Grant the customized function to an current consumer.
GRANT ROLE kafka_connector_role_1 TO USER username;

-- Set the customized function because the default function for the consumer.
-- In case you encounter an 'Inadequate privileges' error, confirm the function that has the OWNERSHIP privilege on the consumer.
ALTER USER username SET DEFAULT_ROLE = kafka_connector_role_1;

Make sure you comply with the documentation for establishing key pair authentication for the Snowflake Kafka connector.

docker compose -f compose-redpandadata.yaml up -d
  • As soon as up, navigate to the Redpanda Console.

  • Click on into the cockroachdb subject:

  • Set up the Snowflake Kafka connector:
confluent-hub set up --no-prompt snowflakeinc/snowflake-kafka-connector:newest
  • Use the next configuration for Kafka Join in distributed mode, saved as connect-distributed.properties:
bootstrap.servers=172.18.0.3:29092
group.id=connect-cluster
key.converter=org.apache.kafka.join.json.JsonConverter
worth.converter=org.apache.kafka.join.json.JsonConverter
key.converter.schemas.allow=true
worth.converter.schemas.allow=true
offset.storage.subject=connect-offsets
offset.storage.replication.issue=1
config.storage.subject=connect-configs
config.storage.replication.issue=1
standing.storage.subject=connect-status
standing.storage.replication.issue=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/confluent-hub-components,plugin.path=/usr/native/share/kafka/plugins,/usr/share/filestream-connectors
  • Deploy Kafka Join in distributed mode:
./kafka-connect/bin/connect-distributed.sh connect-distributed.properties
  • Register the Snowflake connector with the next configuration, saved as snowflake-sink-batch.json:
{
    "name":"snowflake-sink-batch",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_batch",
      "buffer.count.records":"10000",
      "buffer.flush.time":"60",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":"account-name:443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
    }
  }
  • Publish the connector configuration:
curl -d @"snowflake-sink-batch.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors
  • Confirm the connector within the Kafka Join UI and within the Kafka Join part of the Redpanda Console.

In case you click on on the snowflake-sink-batch sink, you possibly can see further data.

The excellent steps wanted to set this up are totally outlined within the tutorial.

Information will now circulation into Snowflake in batch mode, with updates occurring each 60 seconds as decided by the buffer.flush.time parameter.

Now you can question the information in Snowflake:

choose * from kafkatb_batch restrict 5;

If every thing is configured appropriately, the information from CockroachDB ought to be accessible in Snowflake in real-time or in batches, relying in your configuration.

{
  "CreateTime": 1725887877966,
  "key": "[3]",
  "offset": 30007,
  "partition": 0,
  "topic": "cockroachdb"
}
{
  "after": {
    "created_at": "2024-09-09T13:17:57.837984Z",
    "id": 1,
    "updated_at": "2024-09-09T13:17:57.917108Z",
    "value": "UPDATED"
  }
}
  • The following step is to configure the connector in streaming mode. First, cease the present connector with the next command:
curl -X DELETE http://localhost:8083/connectors/snowflake-sink-batch
  • The up to date connector configuration will seem as follows:
{
    "name":"snowflake-sink-streaming",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming",
      "buffer.count.records":"10000",
      "buffer.flush.time":"10",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":":443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":"false",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "errors.log.enable":"true",
      "schemas.enable":"false"

    }
  }

Be aware of the snowflake.ingestion.technique parameter. This characteristic removes the necessity to wait 60 seconds to push knowledge to Snowflake, permitting us to scale back the buffer.flush.time to 10 seconds.

  • To deploy the connector, use the next command:
curl -d @"snowflake-sink-streaming.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors

Shortly after deployment, the information might be accessible within the Snowflake desk.

The earlier examples demonstrated how knowledge was ingested into predefined Snowflake tables. The next technique will robotically infer the schema from the Kafka messages:

  {
    "name":"snowflake-sink-streaming-schematized",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming_schematized",
      "buffer.count.records":"10000",
      "buffer.flush.time":"10",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":":443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":"false",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "errors.log.enable":"true",
      "schemas.enable":"false",
      "snowflake.enable.schematization": "TRUE"
    }
  }
  • Save this as snowflake-sink-streaming-schematized.json and deploy it utilizing:
curl -d @"snowflake-sink-streaming-schematized.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors
  • Upon deployment, a brand new desk might be created in Snowflake with the next schema:
create or exchange TABLE KAFKADB.KAFKA_SCHEMA.KAFKATB_STREAMING_SCHEMATIZED (
    RECORD_METADATA VARIANT COMMENT 'created by computerized desk creation from Snowflake Kafka Connector',
    AFTER VARIANT COMMENT 'column created by schema evolution from Snowflake Kafka Connector'
);
  • To examine the desk, use the next question:
SELECT after AS report FROM kafkatb_streaming_schematized LIMIT 5;

Pattern outcome:

{
  "created_at": "2024-09-09T16:39:34.993226Z",
  "id": 18712,
  "updated_at": null,
  "value": "0d6bd8a4a790aab95c97a084d17bd820"
}
  • We are able to flatten the information for simpler manipulation utilizing the next question:
USE ROLE securityadmin;
GRANT CREATE VIEW ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;

USE ROLE kafka_connector_role_1;
USE DATABASE KAFKADB;
USE SCHEMA KAFKA_SCHEMA;
CREATE VIEW v_kafkatb_batch_flattened AS
    SELECT PARSE_JSON(record_content:after):id AS ID,
        PARSE_JSON(record_content:after):worth AS VALUE,
        PARSE_JSON(record_content:after):created_at AS CREATED_AT,
        PARSE_JSON(record_content:after):updated_at AS UPDATED_AT
    FROM kafkatb_batch;

SELECT * FROM v_kafkatb_batch_flattened restrict 1;
ID    VALUE        CREATED_AT                      UPDATED_AT
1   "UPDATED"    "2024-09-09T13:17:57.837984Z"    "2024-09-09T13:17:57.917108Z"
  • Alternatively, for the schematized desk, the view creation assertion could be:
CREATE VIEW v_kafkatb_streaming_schematized_flattened AS
    SELECT PARSE_JSON(after):id AS ID,
        PARSE_JSON(after):worth AS VALUE,
        PARSE_JSON(after):created_at AS CREATED_AT,
        PARSE_JSON(after):updated_at AS UPDATED_AT
    FROM kafkatb_streaming_schematized;
  • To confirm the information circulation, make an replace in CockroachDB and verify for the modifications in Snowflake:
UPDATE cockroachdb 
  SET worth="UPDATED", updated_at = now() 
WHERE  
  id = 20000; 
  • In Snowflake, execute the next question to verify the replace:
SELECT * FROM v_kafkatb_streaming_schematized_flattened the place VALUE = 'UPDATED';

Pattern outcome:

ID    VALUE        CREATED_AT                      UPDATED_AT
20000    "UPDATED"    "2024-09-09T18:15:13.460078Z"    "2024-09-09T18:16:56.550778Z"
19999    "UPDATED"    "2024-09-09T18:15:13.460078Z"    "2024-09-09T18:15:27.365272Z"

The architectural diagram is under:

On this course of, we explored Kafka Join as an answer to stream changefeeds into Snowflake. This strategy supplies higher management over how messages are delivered to Snowflake, leveraging the Snowflake Kafka Connector with Snowpipe Streaming for real-time, dependable knowledge ingestion.

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version