Bring your own Apache Kafka® Connect cluster#
Aiven provides Apache Kafka® Connect as a managed service in combination with the Aiven for Apache Kafka® managed service. However, there are circumstances where you may want to roll your own Kafka Connect cluster.
The following article defines the necessary steps to integrate your own Apache Kafka Connect cluster with Aiven for Apache Kafka and use the schema registry offered by Karapace. The example shows how to create a JDBC sink connector to a PostgreSQL® database.
Prerequisites#
To bring your own Apache Kafka Connector, you need an Aiven for Apache Kafka service up and running.
Furthermore, for the JDBC sink connector database example, you need to collect the following information about the Aiven for Apache Kafka service and the target database upfront:
APACHE_KAFKA_HOST
: The hostname of the Apache Kafka serviceAPACHE_KAFKA_PORT
: The port of the Apache Kafka serviceREST_API_PORT
: The Apache Kafka’s REST API port, only needed when testing data flow with REST APIsREST_API_USERNAME
: The Apache Kafka’s REST API username, only needed when testing data flow with REST APIsREST_API_PASSWORD
: The Apache Kafka’s REST API password, only needed when testing data flow with REST APIsSCHEMA_REGISTRY_PORT
: The Apache Kafka’s schema registry port, only needed when using Avro as data formatSCHEMA_REGISTRY_USER
: The Apache Kafka’s schema registry username, only needed when using Avro as data formatSCHEMA_REGISTRY_PASSWORD
: The Apache Kafka’s schema registry user password, only needed when using Avro as data formatPG_HOST
: The PostgreSQL service hostnamePG_PORT
: The PostgreSQL service portPG_USERNAME
: The PostgreSQL service usernamePG_PASSWORD
: The PostgreSQL service passwordPG_DATABASE_NAME
: The PostgreSQL service database name
Note
If you’re using Aiven for PostgreSQL and Aiven for Apache Kafka the above details are available in the Aiven console service Overview tab or via the dedicated avn service get
command with the Aiven CLI.
Attach your own Apache Kafka Connect cluster to Aiven for Apache Kafka®#
The following example demonstrates how to setup a local Apache Kafka Connect cluster with a working JDBC sink connector and attach it to an Aiven for Apache Kafka service.
Setup the truststore and keystore#
Create a Java keystore and truststore for the Aiven for Apache Kafka service. For the following example we assume:
The keystore is available at
KEYSTORE_PATH/client.keystore.p12
The truststore is available at
TRUSTSTORE_PATH/client.truststore.jks
For simplicity, the same secret (password) is used for both the keystore and the truststore, and is shown here as
KEY_TRUST_SECRET
Configure the Aiven for Apache Kafka service#
You need to enable the schema registry features offered by Karapace. You can do it in the Aiven Console in the Aiven for Apache Kafka service Overview tab.
Enable the Schema Registry (Karapace) and Apache Kafka REST API (Karapace)
In the Topic tab, create a new topic called
jdbc_sink
, the topic will be used by the Apache Kafka Connect connector
Download the required binaries#
The following binaries are needed to setup a Apache Kafka Connect cluster locally:
If you are going to use Avro as the data format, Avro Value Converter. The examples below show how to do this.
Setup the local Apache Kafka Connect cluster#
The following process defines the setup required to create a local Apache Kafka Connect cluster. The example shows the steps needed with the Apache Kafka 3.1.0
, Avro converter 7.1.0
and JDBC connector 6.7.0
versions:
Extract the Apache Kafka binaries
tar -xzf kafka_2.13-3.1.0.tgz
Within the newly created
kafka_2.13-3.1.0
folder, create aplugins
folder containing alib
sub-foldercd kafka_2.13-3.1.0 mkdir -p plugins/lib
Unzip the JDBC and Avro binaries and copy the
jar
files in theplugins/lib
folder# extract aiven connect jdbc unzip jdbc-connector-for-apache-kafka-6.7.0.zip # extract confluent kafka connect avro converter unzip confluentinc-kafka-connect-avro-converter-7.1.0.zip # copying plugins in the plugins/lib folder cp jdbc-connector-for-apache-kafka-6.7.0/*.jar plugins/lib/ cp confluentinc-kafka-connect-avro-converter-7.1.0/*.jar plugins/lib/
Create a properties file,
my-connect-distributed.properties
, under the mainkafka_2.13-3.1.0
folder, for the Apache Kafka Connect settings. Change the following placeholders:PATH_TO_KAFKA_HOME
to the path to thekafka_2.13-3.1.0
folderAPACHE_KAFKA_HOST
,APACHE_KAFKA_PORT
,SCHEMA_REGISTRY_PORT
,SCHEMA_REGISTRY_USER
,SCHEMA_REGISTRY_PASSWORD
, to the related parameters fetched in the prerequisite stepKEYSTORE_PATH
,TRUSTSTORE_PATH
andKEY_TRUST_SECRET
to the keystore, truststore location and related secret as defined in the related step
# Define the folders for plugins, including the JDBC and Avro plugin.path=PATH_TO_KAFKA_HOME/kafka_2.13-3.1.0/plugins # Defines the location of the Apache Kafka bootstrap servers bootstrap.servers=APACHE_KAFKA_HOST:APACHE_KAFKA_PORT # Defines the group.id used by the connection cluster group.id=connect-cluster # Defines the input data format for key and value: JSON without schema key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false # Defines the internal data format for key and value: JSON without schema internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Connect clusters create three topics to manage offsets, configs, and status # information. Note that these contribute towards the total partition limit quota. offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 # Defines the flush interval for the offset comunication offset.flush.interval.ms=10000 # Defines the SSL endpoint ssl.endpoint.identification.algorithm=https request.timeout.ms=20000 retry.backoff.ms=500 security.protocol=SSL ssl.protocol=TLS ssl.truststore.location=TRUSTSTORE_PATH/client.truststore.jks ssl.truststore.password=KEY_TRUST_SECRET ssl.keystore.location=KEYSTORE_PATH/client.keystore.p12 ssl.keystore.password=KEY_TRUST_SECRET ssl.key.password=KEY_TRUST_SECRET ssl.keystore.type=PKCS12 # Defines the consumer SSL endpoint consumer.ssl.endpoint.identification.algorithm=https consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 consumer.security.protocol=SSL consumer.ssl.protocol=TLS consumer.ssl.truststore.location=TRUSTSTORE_PATH/client.truststore.jks consumer.ssl.truststore.password=KEY_TRUST_SECRET consumer.ssl.keystore.location=KEYSTORE_PATH/client.keystore.p12 consumer.ssl.keystore.password=KEY_TRUST_SECRET consumer.ssl.key.password=KEY_TRUST_SECRET consumer.ssl.keystore.type=PKCS12 # Defines the producer SSL endpoint producer.ssl.endpoint.identification.algorithm=https producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.security.protocol=SSL producer.ssl.protocol=TLS producer.ssl.truststore.location=TRUSTSTORE_PATH/client.truststore.jks producer.ssl.truststore.password=KEY_TRUST_SECRET producer.ssl.keystore.location=KEYSTORE_PATH/client.keystore.p12 producer.ssl.keystore.password=KEY_TRUST_SECRET producer.ssl.key.password=KEY_TRUST_SECRET producer.ssl.keystore.type=PKCS12
Start the local Apache Kafka Connect cluster, executing the following from the
kafka_2.13-3.1.0
folder:./bin/connect-distributed.sh ./my-connect-distributed.properties
Add the JDBC sink connector#
The following steps define how you can add a JDBC connector to the local Apache Kafka Connect cluster:
Create the JDBC sink connector JSON configuration file named
jdbc-sink-pg.json
with the following content, replacing the placeholdersPG_HOST
,PG_PORT
,PG_USERNAME
,PG_PASSWORD
,PG_DATABASE_NAME
,APACHE_KAFKA_HOST
,SCHEMA_REGISTRY_PORT
,SCHEMA_REGISTRY_USER
,SCHEMA_REGISTRY_PASSWORD
.{ "name": "jdbc-sink-pg", "config": { "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://PG_HOST:PG_PORT/PG_DATABASE_NAME?user=PG_USERNAME&password=PG_PASSWORD&ssl=required", "tasks.max": "1", "topics": "jdbc_sink", "auto.create": "true", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT", "value.converter.basic.auth.credentials.source": "USER_INFO", "value.converter.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD" } }
Create the JDBC sink connector instance using Kafka Connect REST APIs
curl -s -H "Content-Type: application/json" -X POST \ -d @jdbc-sink-pg.json \ http://localhost:8083/connectors/
Check the status of the JDBC sink connector instance,
jq
is used to beautify the outputcurl localhost:8083/connectors/jdbc-sink-pg/status | jq
The result should be similar to the following
{ "name": "jdbc-sink-pg", "connector": { "state": "RUNNING", "worker_id": "10.128.0.12:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.128.0.12:8083" } ], "type": "sink" }
Tip
Check the dedicated blog post for an end-to-end example of how to setup a Kafka Connect cluster to host a custom connector.
Verify the JDBC connector using Karapace REST APIs#
To verify that the connector is working, you can write messages to the jdbc_sink
topic in Avro format using Karapace REST APIs, by following the steps below:
Create a new Avro schema using the
/subjects/
endpoint, after changing the placeholders forREST_API_USER
,REST_API_PASSWORD
,APACHE_KAFKA_HOST
,REST_API_PORT
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data ''' {"schema": "{\"type\": \"record\",\"name\": \"jdbcsinkexample\",\"namespace\": \"example\",\"doc\": \"example\",\"fields\": [{ \"type\": \"string\", \"name\": \"name\", \"doc\": \"person name\", \"namespace\": \"example\", \"default\": \"mario\"},{ \"type\": \"int\", \"name\": \"age\", \"doc\": \"persons age\", \"namespace\": \"example\", \"default\": 5}]}" }''' \ https://REST_API_USER:REST_API_PASSWORD@APACHE_KAFKA_HOST:REST_API_PORT/subjects/jdbcsinkexample/versions/
The above call creates a new schema called
jdbcsinkexample
with a schema containing two fields (name
andage
).Create a new message in the
jdbc_sink
topic using thejdbcsinkexample
schema, after changing the placeholders forREST_API_USER
,REST_API_PASSWORD
,APACHE_KAFKA_HOST
,REST_API_PORT
curl -H "Content-Type: application/vnd.kafka.avro.v2+json" -X POST \ -d ''' {"value_schema": "{\"namespace\": \"test\", \"type\": \"record\", \"name\": \"example\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}]}", "records": [{"value": {"name": "Eric","age":77}}]}''' \ https://REST_API_USER:REST_API_PASSWORD@APACHE_KAFKA_HOST:REST_API_PORT/topics/jdbc_sink
Verify the presence of a table called
jdbc_sink
in PostgreSQL containing the row with nameEric
and age77