Create an Amazon S3 sink connector by Confluent from Apache Kafka® ================================================================== The **Apache Kafka Connect® S3 sink connector by Aiven** enables you to move data from an **Aiven for Apache Kafka®** cluster to **Amazon S3** for long term storage. .. Note:: There are two versions of S3 sink connector available with Aiven for Apache Kafka Connect®: One is developed by Aiven, another developed by Confluent. This article uses the Confluent version. To learn about Aiven sink connector check out :doc:`the dedicated page `. .. note:: You can check the full set of available parameters and configuration options in the `connector's documentation `_. Prerequisites ------------- To setup the S3 sink connector by Confluent, you need an Aiven for Apache Kafka® service :doc:`with Apache Kafka Connect enabled ` or a :ref:`dedicated Aiven for Apache Kafka Connect cluster `. Furthermore you need to follow the steps :doc:`to prepare the AWS account and S3 sink ` and collect the following information about the target S3 bucket upfront: * ``AWS_S3_NAME``: The name of the S3 bucket * ``AWS_S3_REGION``: The AWS region where the S3 bucket has been created * ``AWS_USER_ACCESS_KEY_ID``: The AWS user access key ID * ``AWS_USER_SECRET_ACCESS_KEY``: The AWS user secret access key Set up an S3 sink connector with Aiven CLI ------------------------------------------ The following example demonstrates how to setup an Apache Kafka Connect® S3 sink connector using the :ref:`Aiven CLI dedicated command `. Define a Kafka Connect® configuration file '''''''''''''''''''''''''''''''''''''''''' Define the connector configurations in a file (we'll refer to it with the name ``s3_sink.json``) with the following content: .. code:: { "name": "", "topics": "", "connector.class": "io.confluent.connect.s3.S3SinkConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "flush.size": "1", "s3.bucket.name": "", "s3.region": "", "s3.credentials.provider.class": "io.aiven.kafka.connect.util.AivenAWSCredentialsProvider", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.credentials.provider.secret_access_key": "", "s3.credentials.provider.access_key_id": "" } The configuration file contains the following entries: * ``name``: The connector name * ``topics``: The list of Apache Kafka® topics to sink to the S3 bucket * ``key.converter`` and ``value.converter``: Data converters, depending on the topic data format. Check the `related documentation `_ for more information * ``format.class``: Defines the output data format in the S3 bucket. The ``io.confluent.connect.s3.format.bytearray.ByteArrayFormat`` writes messages in binary format. * ``flush.size``: Defines how many messages to write per file in the S3 bucket. E.g. setting ``flush.size`` to ``3`` generates a file every three messages in a topic and partition. * ``s3.bucket.name``: The name of the S3 bucket * ``s3.region``: The AWS region where the S3 bucket has been created * ``s3.credentials.provider.class``: The name of the class implementing the ``com.amazonaws.auth.AWSCredentialsProvider`` and ``org.apache.kafka.common.Configurable`` interfaces. Use ``io.aiven.kafka.connect.util.AivenAWSCredentialsProvider``. * ``s3.credentials.provider.secret_access_key``: The AWS user secret access key * ``s3.credentials.provider.access_key_id``: The AWS user access key ID Check out the `dedicated documentation `_ for the full list of configuration options. Create an S3 sink connector with Aiven CLI '''''''''''''''''''''''''''''''''''''''''' To create the connector, execute the following :ref:`Aiven CLI command `, replacing the ``SERVICE_NAME`` with the name of the existing Aiven for Apache Kafka® service where the connector needs to run: .. code:: avn service connector create SERVICE_NAME @s3_sink.json Check the connector status with the following command, replacing the ``SERVICE_NAME`` with the existing Aiven for Apache Kafka® service and the ``CONNECTOR_NAME`` with the name of the connector defined before: .. code:: avn service connector status SERVICE_NAME CONNECTOR_NAME With the connection in place, verify that the data is flowing to the target S3 bucket. Example: define a S3 sink connector ----------------------------------- The example creates an S3 sink connector with the following properties: * connector name: ``my_s3_sink`` * source topics: ``students`` * target S3 bucket name: ``my-test-bucket`` * target S3 bucket region: ``eu-central-1`` * AWS user access key id: ``AKIAXXXXXXXXXX`` * AWS user secret access key: ``hELuXXXXXXXXXXXXXXXXXXXXXXXXXX`` * generating a file in the S3 bucket every 10 messages The connector configuration is the following: .. code:: { "name": "my_s3_sink", "topics": "students", "connector.class": "io.confluent.connect.s3.S3SinkConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "flush.size": "10", "s3.bucket.name": "my-test-bucket", "s3.region": "eu-central-1", "s3.credentials.provider.class": "io.aiven.kafka.connect.util.AivenAWSCredentialsProvider", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.credentials.provider.secret_access_key": "hELuXXXXXXXXXXXXXXXXXXXXXXXXXX", "s3.credentials.provider.access_key_id": "AKIAXXXXXXXXXX" } With the above configuration stored in a ``s3_sink.json`` file, you can create the connector in the ``demo-kafka`` instance with: .. code:: avn service connector create demo-kafka @s3_sink.json