Replicando dados com Kafka e Oracle CDC – Parte II
Olá pessoal !
Como prometido, estou de volta com a nossa série de artigos sobre replicação utilizando Oracle CDC, Kafka e a plataforma da Confluent.
Se ainda não leu o primeiro artigo, você pode fazê-lo clicando aqui !
O foco hoje será exatamente na instalação do Confluent Platform. O procedimento será relativamente fácil, pois utilizaremos o docker compose para baixar as imagens, configurar o nosso ambiente e subir os contêineres.
Abaixo vou fazer uma breve descrição de cada imagem.
CP All in One – O que será instalado
ZooKeeper
Apache ZooKeeper é um serviço centralizado para manter informações de configuração, nomear, fornecer sincronização distribuída e fornecer serviços de grupo. No Kafka ele não apenas armazena logs de partição de tópicos e lida com solicitações de consumo/produção como outros agentes, mas também mantém metadados de cluster como IDs e racks de agentes, tópicos, partições, informações de líder e ISR e configurações de todo o cluster e por tópico, como bem como credenciais de segurança. Atualmente ele está sendo substituído pelo Apache KRaft (Kafka Raft) e se mantém na instalação apenas por questão de compatibilidade com implementações mais antigas.
Se tiver interesse, CLIQUE AQUI para ler um artigo que explica bem o porquê dessa mudança.
Confluent Platform Server
O Confluent Server é um componente do Confluent Platform que inclui Kafka e recursos comerciais adicionais. A seguir estão alguns dos principais recursos incluídos no Confluent Server:
- Controle de acesso baseado em função (RBAC)
- Armazenamento em camadas Clusters de autobalance
- Confluent para Kubernetes
Schema Registry
O Schema Registry é um serviço de registro de esquema RESTful para gerenciar, armazenar e recuperar seus esquemas Avro®, JSON Schema e Protobuf.
Para saber mais sobre o Schema Registry, CLIQUE AQUI !
Datagen Source Connector
O conector Kafka Connect Datagen Source gera mock data para desenvolvimento e teste.
Enterprise Control Center
Centro de controle da plataforma.
Ksql DB Server
Contêm interceptores de monitoramento Confluent. Os interceptores de monitoramento permitem que conectores e consultas SQL coletem as métricas que podem ser visualizadas no Confluent Control Center.
ksqlDB é um banco de dados criado especificamente para ajudar os desenvolvedores a criar aplicativos de processamento de fluxo em cima do Apache Kafka.
Se quiser saber mais sobre o KsqlDB, CLIQUE AQUI !
KsqlDB cli
É o command line interface do Ksql
KsqlDB Examples
Mock data para o KsqlDB.
REST Proxy
O Confluent REST Proxy fornece uma interface RESTful para um cluster Apache Kafka®, facilitando a produção e o consumo de mensagens, a visualização do estado do cluster e a execução de ações administrativas sem usar o protocolo ou clientes Kafka nativos.
Se quiser saber mais sobre o REST Proxy, CLIQUE AQUI !
Agora que temos uma idéia de todos os compoinentes que fazem parte do Confluent Platform, vamos para instalação !
Instalando o Confluent Platform
Precisamos do arquivo docker-compose.yml. Esse arquivo possui todas as instruções para o download das imagens e a criação dos contêineres e suas dependências.
Ele deve possuir o código abaixo:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.2.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.2.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:7.2.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.2.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.2.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.2.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.2.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
Execute os seguintes passos:
- Copie o código acima e salve-o no arquivo docker-compose.yml
- Salve o arquivo no diretório /home/<seu usuario>
Agora conecte como super usuário:
sudo su -
No diretório /home/<seu usuario> execute o seguinte comando:
docker compose up -d
O docker começará a montar as imagens e após o download, ele subirá os contêineres.
Dependendo de sua máquina e da sua internet, pode ser que demore um pouco para o processo terminar. Basta apenas ter um pouco de paciência. 🙂
Quando finalizar, você verá algo como:
Vamos verificar se os serviços estão up.
docker ps
Todos os contêineres estão ok e executando ! 🙂
Agora acesse o portainer e veja como aparecem.
No navegador, digite a URL abaixo:
http://localhost:9021
Confluent Platform instalado com sucesso !
O próximo passo será a instalação dos conectores Oracle CDC Premium e JDBC Sink. Veremos como fazer isso no próximo artigo !
Referências
Meus containers ficam caindo. Seria falta de memória o problema? Estou usando apenas 6 Gb na VM.
Olá Thadeu !
Não é memória. Ás vezes isso ocorre no meu ambiente também e possuo 20 gigas no total. Eu dei uma olhada nos logs, e aparentemente ele cai por timeout. 🙁
Quando isso ocorrer, basta apenas subir os containers novamente com o docker compose up -d. Uma hora estabiliza.
Na semana do dia 09/11 eu vou ter um encontro com o pessoal da confluent. Vou comentar sobre isso e assim que tiver uma resposta, posto aqui.
Obrigado pela resposta Sergio, vou continuar os estudos aqui. Parabéns pelo conteúdo