Pular para o conteúdo

Replicando dados com Kafka e Oracle CDC – Parte II (Confluent Platform)

Replicando dados com Kafka e Oracle CDC – Parte II

Olá pessoal !

Como prometido, estou de volta com a nossa série de artigos sobre replicação utilizando Oracle CDC, Kafka e a plataforma da Confluent.

Se ainda não leu o primeiro artigo, você pode fazê-lo clicando aqui !

O foco hoje será exatamente na instalação do Confluent Platform. O procedimento será relativamente fácil, pois utilizaremos o docker compose para baixar as imagens, configurar o nosso ambiente e subir os contêineres.

Abaixo vou fazer uma breve descrição de cada imagem.

CP All in One – O que será instalado

ZooKeeper

Apache ZooKeeper é um serviço centralizado para manter informações de configuração, nomear, fornecer sincronização distribuída e fornecer serviços de grupo. No Kafka ele não apenas armazena logs de partição de tópicos e lida com solicitações de consumo/produção como outros agentes, mas também mantém metadados de cluster como IDs e racks de agentes, tópicos, partições, informações de líder e ISR e configurações de todo o cluster e por tópico, como bem como credenciais de segurança. Atualmente ele está sendo substituído pelo Apache KRaft (Kafka Raft) e se mantém na instalação apenas por questão de compatibilidade com implementações mais antigas.

Se tiver interesse, CLIQUE AQUI para ler um artigo que explica bem o porquê dessa mudança.

Confluent Platform Server

O Confluent Server é um componente do Confluent Platform que inclui Kafka e recursos comerciais adicionais. A seguir estão alguns dos principais recursos incluídos no Confluent Server:

  • Controle de acesso baseado em função (RBAC)
  • Armazenamento em camadas Clusters de autobalance
  • Confluent para Kubernetes
Schema Registry

O Schema Registry é um serviço de registro de esquema RESTful para gerenciar, armazenar e recuperar seus esquemas Avro®, JSON Schema e Protobuf.

Para saber mais sobre o Schema Registry, CLIQUE AQUI !

Datagen Source Connector

O conector Kafka Connect Datagen Source gera mock data para desenvolvimento e teste.

Enterprise Control Center

Centro de controle da plataforma.

Ksql DB Server

Contêm interceptores de monitoramento Confluent. Os interceptores de monitoramento permitem que conectores e consultas SQL coletem as métricas que podem ser visualizadas no Confluent Control Center.

ksqlDB é um banco de dados criado especificamente para ajudar os desenvolvedores a criar aplicativos de processamento de fluxo em cima do Apache Kafka.

Se quiser saber mais sobre o KsqlDB, CLIQUE AQUI !

KsqlDB cli

É o command line interface do Ksql

KsqlDB Examples

Mock data para o KsqlDB.

REST Proxy

O Confluent REST Proxy fornece uma interface RESTful para um cluster Apache Kafka®, facilitando a produção e o consumo de mensagens, a visualização do estado do cluster e a execução de ações administrativas sem usar o protocolo ou clientes Kafka nativos.

Se quiser saber mais sobre o REST Proxy, CLIQUE AQUI !

Agora que temos uma idéia de todos os compoinentes que fazem parte do Confluent Platform, vamos para instalação !

Instalando o Confluent Platform

Precisamos do arquivo docker-compose.yml. Esse arquivo possui todas as instruções para o download das imagens e a criação dos contêineres e suas dependências.

Ele deve possuir o código abaixo:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.2.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.2.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.2.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.2.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.2.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.2.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.2.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

Execute os seguintes passos:

  • Copie o código acima e salve-o no arquivo docker-compose.yml
  • Salve o arquivo no diretório /home/<seu usuario>

Agora conecte como super usuário:

sudo su -

No diretório /home/<seu usuario> execute o seguinte comando:

docker compose up -d
image 15

O docker começará a montar as imagens e após o download, ele subirá os contêineres.

Dependendo de sua máquina e da sua internet, pode ser que demore um pouco para o processo terminar. Basta apenas ter um pouco de paciência. 🙂

Quando finalizar, você verá algo como:

image 17

Vamos verificar se os serviços estão up.

docker ps
image 21

Todos os contêineres estão ok e executando ! 🙂

Agora acesse o portainer e veja como aparecem.

image 19
image 22

No navegador, digite a URL abaixo:

http://localhost:9021

image 25
image 24

Confluent Platform instalado com sucesso !

O próximo passo será a instalação dos conectores Oracle CDC Premium e JDBC Sink. Veremos como fazer isso no próximo artigo !

Referências

Quão útil foi este post ?

Clique em uma estrela para classificar o post

nota média 5 / 5. Contagem de votos: 19

Sem votos ! Seja o primeiro a classificar !

3 comentários em “Replicando dados com Kafka e Oracle CDC – Parte II (Confluent Platform)”

    1. Avatar de Sergio Willians

      Olá Thadeu !

      Não é memória. Ás vezes isso ocorre no meu ambiente também e possuo 20 gigas no total. Eu dei uma olhada nos logs, e aparentemente ele cai por timeout. 🙁

      Quando isso ocorrer, basta apenas subir os containers novamente com o docker compose up -d. Uma hora estabiliza.

      Na semana do dia 09/11 eu vou ter um encontro com o pessoal da confluent. Vou comentar sobre isso e assim que tiver uma resposta, posto aqui.

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

plugins premium WordPress