Replicando dados com Kafka e Oracle CDC – Parte IV
No primeiro artigo expliquei sobre quais os requisitos necessários para implementarmos a replicação de dados utilizando Kafka. No segundo artigo foi explicado como instalar o Confluent Platform e no terceiro artigo fizemos a instalação dos conectores.
Agora é a vez de prepararmos o nosso banco de dados Oracle para que o conector Oracle CDC possa ler os redo-logs e gravar as operações no tópico correspondente.
Oracle CDC
O CDC (Change Data Capture) não é algo novo. Ele já está disponível no banco de dados Oracle desde a versão 9i com o Oracle Streams.
O CDC consiste basicamente na captura das operações de DML (insert,update,delete), sendo o log based (através da leitura dos redo-logs) considerado o mais performático. Inclusive, esse é o modo que será utilizado pelo nosso conector Oracle CDC na Confluent Platform.
Criando os bancos de dados
No primeiro artigo eu deixei os links sobre como instalar as diversas ferramentas que utilizaremos nessa nossa série de artigos. Inclusive a criação do banco de dados Oracle utilizando o docker.
Criei dois bancos de dados. Para isso, utilizei os dois comandos abaixo para execução dos containers:
oracle19c_1
docker run \
--name oracle19c_1 \
-p 1521:1521 \
-p 5500:5500 \
-e ORACLE_PDB=orcl \
-e ORACLE_PWD=<coloque a sua senha aqui> \
-e ORACLE_MEM=1000 \
-v /opt/oracle/oradata \
-d \
oracle/database:19.3.0-ee
oracle_19c_2
docker run \
--name oracle19c_2 \
-p 1523:1523 \
-p 5502:5502 \
-e ORACLE_PDB=orcl \
-e ORACLE_PWD=<coloque a sua senha aqui> \
-e ORACLE_MEM=1000 \
-v /opt/oracle/oradata \
-d \
oracle/database:19.3.0-ee
Após levantar as duas instâncias, é necessário mais alguns procedimentos para que o listener do banco de dados do container oracle19c_2, possa utilizar a porta 1523. Para isso, basta você seguir os procedimentos descritos nesse artigo.
Preparando o banco de dados para o CDC
É chegado o momento de configurarmos o nosso banco de dados Oracle. Isso é necessário para que o conector CDC da confluent possa fazer o seu trabalho.
Alguns pontos devem ser observados:
- O banco deve estar em modo ARCHIVELOG;
- Supplemental logging deve estar habilitado para todas as colunas;
- É recomendável uma retenção de pelo menos 24 horas
Sobre o archivelog mode e o supplemental logging, não é preciso se preocupar. O próprio procedimento que faremos adiante, já nos ajudará a habilitar ambos.
A questão da retenção não é um problema. O default do undo retention é de 5 minutos, o que não atrapalhará os nossos testes. Caso queira aumentar esse tempo, garanto que não é algo difícil de ser feito e o procedimento pode ser encontrado facilmente na internet. 🙂
Tipos de procedimento
Há 3 tipos de procedimentos disponíveis na documentação da Confluent.
- Non-container
- Multitenant (CDB)
- Multitenant (PDB)
- Amazon RDS
Versões suportadas
- Oracle 11g
- Oracle 12c
- Oracle 18c
- Oracle 19c
O procedimento que utilizaremos é o Multitenant (CDB) e o banco o 19c. Utilizaremos o banco de dados que está no nosso container oracle19c_1.
Você pode optar por seguir o procedimento oficial ou continuar pelo artigo. Ambos chegarão ao mesmo resultado se seguidos corretamente.
Criando o usuário e roles
Vamos agora criar o usuário que o conector CDC utilizará para ler os redo-logs e as tabelas que serão replicadas. O usuário e as roles seguirão uma convenção no qual o nome deles sempre iniciarão com C##.
Manteremos os nomes padrões de usuário (C##MYUSER) e roles (C##CDC_PRIVS). Caso sinta-se confortável, fique a vontade para alterá-los.
Abaixo executaremos uma série de scripts para fazer a criação do usuário e dar as devidas permissões a ele. Esses scripts devem ser OBRIGATORIAMENTE executados com o usuário SYS no CDB como SYSDBA.
Onde está em vermelho, coloque o IP da sua instalação.
Executemos os scripts ! Primeiramente, vamos criar o usuário C##GPO. Ele será o owner da nossa tabela.
CREATE USER C##GPO IDENTIFIED BY < sua senha >
/
GRANT DBA TO C##GPO
/
User C##GPO created.
Grant succeeded.
Sim, eu dei uma de preguiçoso e atribuí o grant de dba para o usuário C##GPO. Fique a vontade para dar apenas os grants necessários, se for de sua vontade. 🙂
Agora abra uma nova conexão com o usuário C##GPO:
Execute agora o script abaixo para criarmos a nossa tabela:
CREATE TABLE tab_teste_1
(
event_id NUMBER(10),
descricao VARCHAR2(100)
)
/
ALTER TABLE tab_teste_1 ADD CONSTRAINT tab_teste_1_pk primary key(event_id)
/
Table TAB_TESTE_1 created.
Table TAB_TESTE_1 altered.
Agora um script básico para preencher a tabela:
DECLARE
y NUMBER;
vCount Number := 1000;
BEGIN
DBMS_OUTPUT.put_line('Start Inserting ' || vCount || ' lines...');
FOR x IN 1..vCount
LOOP
INSERT INTO tab_teste_1 VALUES(x,'TESTE ' || x || ' - ' || TO_CHAR(sysdate,'hh24miss'));
END LOOP;
COMMIT;
DBMS_OUTPUT.put_line('Finish Inserting ' || vCount || ' lines...');
END;
Start Inserting 1000 lines...
Finish Inserting 1000 lines...
100000 registros inseridos com sucesso !
Conecte-se com SYS e execute os comandos abaixos para criar a ROLE e os GRANTS necessários:
CREATE ROLE C##CDC_PRIVS;
GRANT CREATE SESSION TO C##CDC_PRIVS;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##CDC_PRIVS;
GRANT LOGMINING TO C##CDC_PRIVS;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##CDC_PRIVS;
GRANT SELECT ON V_$DATABASE TO C##CDC_PRIVS;
GRANT SELECT ON V_$THREAD TO C##CDC_PRIVS;
GRANT SELECT ON V_$PARAMETER TO C##CDC_PRIVS;
GRANT SELECT ON V_$NLS_PARAMETERS TO C##CDC_PRIVS;
GRANT SELECT ON V_$TIMEZONE_NAMES TO C##CDC_PRIVS;
GRANT SELECT ON ALL_INDEXES TO C##CDC_PRIVS;
GRANT SELECT ON ALL_OBJECTS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_USERS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_CATALOG TO C##CDC_PRIVS;
GRANT SELECT ON ALL_CONSTRAINTS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_CONS_COLUMNS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_TAB_COLS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_IND_COLUMNS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_ENCRYPTED_COLUMNS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_LOG_GROUPS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_TAB_PARTITIONS TO C##CDC_PRIVS;
GRANT SELECT ON SYS.DBA_REGISTRY TO C##CDC_PRIVS;
GRANT SELECT ON SYS.OBJ$ TO C##CDC_PRIVS;
GRANT SELECT ON DBA_TABLESPACES TO C##CDC_PRIVS;
GRANT SELECT ON DBA_OBJECTS TO C##CDC_PRIVS;
GRANT SELECT ON SYS.ENC$ TO C##CDC_PRIVS;
GRANT SELECT ON C##GPO.tab_teste_1 TO C##CDC_PRIVS;
-- The following privileges are required additionally for 19c compared to 12c.
GRANT SELECT ON V_$ARCHIVED_LOG TO C##CDC_PRIVS;
GRANT SELECT ON V_$LOG TO C##CDC_PRIVS;
GRANT SELECT ON V_$LOGFILE TO C##CDC_PRIVS;
GRANT SELECT ON V_$INSTANCE to C##CDC_PRIVS;
Role C##CDC_PRIVS created.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
Grant succeeded.
ROLE criada com sucesso !
Agora vamos criar o usuário C##MYUSER. Ele será utilizado pelo conector CDC:
CREATE USER C##MYUSER IDENTIFIED BY < sua senha > DEFAULT TABLESPACE USERS;
ALTER USER C##MYUSER QUOTA UNLIMITED ON USERS;
GRANT SELECT ON C##GPO.tab_teste_1 TO C##CDC_PRIVS;
GRANT C##CDC_PRIVS to C##MYUSER;
User C##MYUSER created.
User C##MYUSER altered.
Grant succeeded.
Grant succeeded.
Usuário criado com sucesso !
Habilitando o ARCHIVELOG
Vamos verificar o status de nosso banco. Acesse como SYSDBA via SQLPlus no container do banco oracle19c_1:
SQLPLUS / as SYSDBA;
SELECT LOG_MODE FROM V$DATABASE;
LOG_MODE
------------
NOARCHIVELOG
Como esperado de uma nova instalação, o ARCHIVELOG está desabilitado.
Execute os comandos abaixo:
SHUTDOWN IMMEDIATE;
Database closed.
Database dismounted.
ORACLE instance shut down.
Após o shutdown, execute:
STARTUP MOUNT;
Agora execute:
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;
Database altered.
Database altered.
Vamos verificar novamente:
SELECT LOG_MODE FROM V$DATABASE;
LOG_MODE
------------
ARCHIVELOG
ARCHIVELOG habilitado ! 🙂
Supplemental logging
Vamos agora habilitar o supplemental logging. Basicamente, ele informa que qualquer mudança em uma linha ou coluna, deve ser alocada no redo-log.
Com a conta SYS no CDB, execute:
ALTER SESSION SET CONTAINER=cdb$root;
Session altered.
Ao habilitar o supplemental logging, você pode fazê-lo para TODAS as tabelas ou para apenas uma em específico.
Habilitarei para todas ! 🙂
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Database altered.
Caso queira habilitar apenas a tabela que criamos, o comando está abaixo:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE C##GPO.tab_teste_1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
OBS: Para habilitar apenas uma tabela, é necessário executar o comando acima no PDB (ORCL_1).
Privilégios Flashback Query para o usuário
É necessário que o usuário C##MYUSER possua esse privilégio para que possa ser capaz de fazer o SNAPSHOT da tabela.
Com a conta SYS, execute:
GRANT FLASHBACK ON C##GPO.tab_teste_1 TO C##CDC_PRIVS;
Grant succeeded.
Perfeito !
Verificando os procedimentos
Agora é hora de verificarmos todos os procedimentos que efetuamos até o momento. Para isso, conecte-se como SYS e execute os comandos abaixo:
SELECT GRANTEE, OWNER, TABLE_NAME
FROM DBA_TAB_PRIVS
WHERE GRANTEE IN (SELECT granted_role
FROM DBA_ROLE_PRIVS
WHERE GRANTEE = 'C##MYUSER')
AND (TABLE_NAME='DBMS_LOGMNR' OR TABLE_NAME='V_$LOGMNR_CONTENTS');
O resultado esperado é:
GRANTEE OWNER TABLE_NAME
-------------------------------------------
C##CDC_PRIVS SYS DBMS_LOGMNR
C##CDC_PRIVS SYS V_$LOGMNR_CONTENTS
Agora conecte-se como C##MYUSER e execute:
SELECT * FROM SESSION_PRIVS;
SELECT LOG_MODE FROM V$DATABASE;
SELECT COUNT(*) FROM V$DATABASE;
SELECT COUNT(*) FROM V$THREAD;
SELECT COUNT(*) FROM V$PARAMETER;
SELECT COUNT(*) FROM V$NLS_PARAMETERS;
SELECT COUNT(*) FROM V$TIMEZONE_NAMES;
SELECT COUNT(*) FROM ALL_INDEXES;
SELECT COUNT(*) FROM ALL_OBJECTS;
SELECT COUNT(*) FROM ALL_USERS;
SELECT COUNT(*) FROM ALL_CATALOG;
SELECT COUNT(*) FROM ALL_CONSTRAINTS;
SELECT COUNT(*) FROM ALL_CONS_COLUMNS;
SELECT COUNT(*) FROM ALL_TAB_COLS;
SELECT COUNT(*) FROM ALL_IND_COLUMNS;
SELECT COUNT(*) FROM ALL_ENCRYPTED_COLUMNS;
SELECT COUNT(*) FROM ALL_LOG_GROUPS;
SELECT COUNT(*) FROM ALL_TAB_PARTITIONS;
SELECT COUNT(*) FROM SYS.DBA_REGISTRY;
SELECT COUNT(*) FROM SYS.OBJ$;
SELECT COUNT(*) FROM DBA_TABLESPACES;
SELECT COUNT(*) FROM DBA_OBJECTS;
SELECT COUNT(*) FROM SYS.ENC$;
SELECT COUNT(*) FROM C##GPO.TAB_TESTE_1;
-- Flashback query privilege
SELECT COUNT(*) FROM C##GPO.TAB_TESTE_1 AS OF TIMESTAMP SYSDATE;
-- Added for 19C
SELECT count(*) FROM V$ARCHIVED_LOG;
SELECT count(*) FROM V$LOG;
SELECT count(*) FROM V$LOGFILE;
SELECT count(*) FROM V$INSTANCE;
SELECT SEQUENCE# FROM V$LOG WHERE STATUS = 'CURRENT';
PRIVILEGE
----------------------------------------
CREATE SESSION
LOGMINING
LOG_MODE
------------
ARCHIVELOG
COUNT(*)
----------
1
COUNT(*)
----------
1
COUNT(*)
----------
445
COUNT(*)
----------
19
COUNT(*)
----------
2341
COUNT(*)
----------
122
COUNT(*)
----------
54911
COUNT(*)
----------
40
COUNT(*)
----------
13657
COUNT(*)
----------
311
COUNT(*)
----------
353
COUNT(*)
----------
25653
COUNT(*)
----------
219
COUNT(*)
----------
0
COUNT(*)
----------
1
COUNT(*)
----------
1
COUNT(*)
----------
15
COUNT(*)
----------
73021
COUNT(*)
----------
5
COUNT(*)
----------
73020
COUNT(*)
----------
0
COUNT(*)
----------
100000
COUNT(*)
----------
100000
COUNT(*)
----------
17
COUNT(*)
----------
3
COUNT(*)
----------
3
COUNT(*)
----------
1
SEQUENCE#
----------
32
Se o seu resultado for parecido com o que está acima, está ok !
Vamos agora verificar o supplemental logging:
SELECT SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_ALL FROM V$DATABASE;
SUPPLEME SUP SUP
-------- --- ---
YES NO YES
Tudo ok !
Done ! Nosso banco de dados já está preparado para o conector CDC !
No próximo artigo, explicarei como configurar o conector CDC e o conector JDBC Sink !
Abraço !
Referências
Sergio obrigado por mais esse artigo. Estou aguardando ansioso pelo próximo pois estou utilizando para meus estudos.
Fico feliz que esses artigos estejam sendo úteis para você Geno ! 🙂
Peço apenas um pouco de paciência, pois estou bastante atarefado no meu trabalho. Apesar disso, fique tranquilo pois eu vou até o fim com essa série !
Abraço