HelloKoding

Practical coding guides

Streaming Data from Kafka to Postgres with Kafka Connect, AVRO, Schema Registry and Python

This post walks you through the process of Streaming Data from Kafka to Postgres with Kafka Connect AVRO, Schema Registry and Python.

What you’ll need

Stack

  • Python 3
  • Pipenv
  • Flake8
  • Docker Compose
  • Postgres
  • Kafka
  • Kafka Connect
  • AVRO
  • Confluent Schema Registry

Project structure

├── Pipfile
├── Pipfile.lock
├── avro_producer.py
├── docker-compose.yml
├── setup.cfg
└── sink-postgres.json

Project dependencies

Pipfile

[[source]]
url = "https://pypi.python.org/simple"
verify_ssl = true
name = "pypi"

[packages]
confluent-kafka = {extras = ["avro"], version = "==0.11.4"}
requests = "==2.18.4"
"avro-python3" = "==1.8.2"
"flake8" = "==3.5.0"

[dev-packages]

[requires]
python_version = "3.6"

Kafka AVRO Producer

avro_producer.py

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import time


value_schema_str = """
{
   "namespace": "hellokoding.kafka",
   "name": "value",
   "type": "record",
   "fields" : [
     {"name" : "browser", "type" : "string"},
     {"name" : "created_at", "type" : "int", "logicalType": "date"}
   ]
}
"""

key_schema_str = """
{
   "namespace": "hellokoding.kafka",
   "name": "key",
   "type": "record",
   "fields" : [
     {"name" : "url", "type" : "string"}
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"browser": "Chrome", "created_at": int(time.time())}
key = {"url": "http://localhost:8081"}

avroProducer = AvroProducer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='page_1', value=value, key=key)
avroProducer.flush()

Kafka Connect Sink Postgres Config

sink-postgres.json

{
    "name": "sink-1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",

        "connection.url": "jdbc:postgresql://localhost:5432/kafka-sink",
        "connection.user": "hellokoding",
        "connection.password": "pass",
        "insert.mode": "upsert",
        "auto.create": true,
        "auto.evolve": true,
        "topics": "page_1",
        "pk.mode": "record_key"
    }
}

Docker Compose

docker-compose.yml

version: '3.6'

services:
    postgres:
        image: postgres:10
        container_name: some-postgres
        environment:
          POSTGRES_PASSWORD: pass
          POSTGRES_USER: hellokoding
          POSTGRES_DB: kafka-sink
        ports:
          - 5432:5432

Start Confluent Platform

confluent start

You would see this

connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]

ZooKeeper, Kafka, Schema Registry and Kafka Connect should be start listening connections on port 2181, 9092, 8081, 8083 respectively.

Start PostgreSQL Database

docker-compose up

PostgreSQL Database Server should be start listening connections on port 5432

Install project’s dependencies

pipenv install
pipenv shell

Sending data to Kafka topic with AVRO Producer

python avro_producer.py

Load Kafka Connector

confluent load sink-1 -d sink-postgres.json

Postgres should have a table page_1 with data streaming from the consumer.

Source code

git@github.com:hellokoding/kafka-connect-sink-postgres-with-avro-schemaregistry-python.git

https://github.com/hellokoding/kafka-connect-sink-postgres-with-avro-schemaregistry-python

Follow HelloKoding