APIs em tempo real no contexto do Apache Kafka

Pontos Principais

  • Um dos desafios que sempre enfrentamos na construção de aplicações e sistemas, é como trocar informações entre eles de forma eficiente, mantendo a flexibilidade para modificar as interfaces sem impacto indevido em outro lugar;
  • Os eventos oferecem uma abordagem no estilo Goldilocks, no qual as APIs em tempo real podem ser usadas como base para aplicações pois são flexíveis e com baixo acoplamento, mas sem perder o alto desempenho e a eficiência. Ao analisar o domínio de negócios, talvez irá pensar em muitos exemplos de eventos, podendo ser interações geradas por humanos ou por máquinas;
  • O Apache Kafka oferece uma plataforma escalonável de streaming de eventos com a qual é possível construir aplicações em torno deste poderoso conceito. O Kafka inclui recursos de processamento de streams por meio da API Kafka Streams;
  • O ksqlDB é um banco de dados de streaming de eventos desenvolvido especificamente para aplicações de processamento de stream. Fornece uma API baseada em SQL para consultar e processar dados no Kafka;
  • Os diversos recursos do ksqlDB incluem a filtragem, transformação e junção de dados de streams e tabelas em tempo real, criando visualizações materializadas através da agregação de eventos, além de muitas outras funcionalidades.

Como trocar informações entre as aplicações e sistemas de forma eficiente, mantendo a flexibilidade para modificar as interfaces sem ter nenhum tipo de impacto em outro lugar? Este é o maior desafio que enfrentamos todas as vezes que fazemos a construção de aplicações e sistemas. Quanto mais específica e otimizada for uma interface, maior será a probabilidade de que seja tão personalizada que para alterá-la, seria necessário uma reescrita completa. O inverso também é válido, os padrões de integração genéricos podem ser adaptáveis ​​e amplamente suportados, mas isso custaria o desempenho da mesma.

Os eventos oferecem uma abordagem no estilo Goldilocks, na qual as APIs em tempo real podem ser usadas como base para aplicações que são flexíveis e de baixo acoplamento, sem perder o alto desempenho e a eficiência.

Os eventos podem ser considerados como blocos de construção da maioria das outras estruturas de dados. De modo geral, registram o fato e quando algo aconteceu. Um evento pode capturar essas informações em vários níveis de detalhes: Desde uma simples notificação até um rico evento que descreve o estado completo do que aconteceu.

A partir de eventos, podemos agregar para criar um estado, que é o que conhecemos nos armazenamentos de dados RDBMS e NoSQL. Além de serem a base para o estado, os eventos também podem ser usados ​​para acionar de forma assíncrona ações em outro lugar, assim que algo acontece. Esta é a base para arquiteturas orientadas a eventos. Dessa forma, podemos criar consumidores que atendam aos nossos requisitos, podendo ser stateless, ou usando estado e tolerância a falhas. Os produtores podem optar por manter o estado, mas não são obrigados, já que os consumidores podem reconstruí-lo a partir dos eventos recebidos.

Ao analisar o domínio de negócios, poderá existir muitos exemplos de eventos, podendo ser interações geradas por humanos ou por máquinas. Podem conter um payload rico ou ser apenas uma notificação, por exemplo:

  • Evento: userLogin
    • Payload: zbeeblebrox logado em 2020-08-17 16:26:39 BST
  • Evento: carroEstacionado
    • Payload: Registro do carro A42 XYZ estacionado em 2020-08-17 16:36:27 na vaga X42
  • Evento: pedidoRealizado
    • Payload: Robin pediu quatro latas de feijão cozido com custo total de £2.25 em 2020-08-17 16:35:41 BST

Esses eventos podem ser utilizados ​​para estimular ações em outro lugar, como um serviço que processa pedidos assim que são realizados, por exemplo. Também podem ser usados ​​em conjunto para fornecer informações como o número atual de vagas ocupadas e disponíveis em um estacionamento.

Portanto, se os eventos são a base sobre a qual as aplicações e serviços serão construídos, precisamos de uma tecnologia que nos apoie da melhor forma, e é neste cenário que entra o Apache Kafka®. O Kafka é uma plataforma escalável de streaming de eventos que fornece:

  • Pub/Sub
    • Para publicar (gravar) e assinar (ler) streams de eventos, incluindo importação e exportação contínua de dados de outros sistemas;
  • Processamento de stream com estado
    • Para armazenar streams de eventos de forma durável e confiável pelo tempo necessário;
  • Armazenamento
    • Para processar streams de eventos conforme ocorrem ou retrospectivamente.

O Kafka é construído em torno do conceito de log. Esse conceito simples, mas poderoso, é um log distribuído, imutável e append-only, onde podemos capturar e armazenar os eventos que ocorrem no negócio e nos sistemas de maneira escalável e eficiente. Esses eventos podem ser disponibilizados para vários usuários por assinatura, bem como processados ​​e agregados posteriormente, para uso direto ou para armazenamento em outros sistemas, como RDBMS, data lakes e NoSQL.

Neste artigo, serão exploradas as APIs disponíveis no Apache Kafka, com demonstrações de como podem ser usadas nos sistemas e nas aplicações.

As APIs produtoras e consumidoras

A grande vantagem de um sistema como o Kafka é que os produtores e os consumidores estão separados, o que significa, entre outras coisas, que podemos produzir dados sem precisar que um consumidor exista, a priori. Por conta do desacoplamento, podemos escalar esses processos. Assim que um evento acontece, enviamos para Kafka. Tudo o que precisamos saber são os detalhes do cluster do Kafka e o tópico, uma forma de organizar dados no Kafka, uma espécie de tabela em um RDBMS, para o qual queremos enviar o evento.

Existem clientes disponíveis para o Kafka em muitas linguagens de programação diferentes. A seguir um exemplo de produção de um evento para o Kafka usando a linguagem Go:

package main
 
import (
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
 
func main() {
 
    topic := "test_topic"
    p, _ := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092"})
    defer p.Close()
 
    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic,
            Partition: 0},
        Value: []byte("Hello world")}, nil)
 
}

Como o Kafka armazena eventos de forma durável, isso significa que estão disponíveis como e quando queremos consumi-los, até o momento em que os removemos, o que é configurável por tópico.

Tendo escrito o evento para o tópico do Kafka, estará disponível para um ou mais consumidores lerem. Os consumidores podem se comportar da maneira tradicional de pub/sub e receber novos eventos conforme chegarem, bem como optar por re-consumir arbitrariamente eventos de um ponto anterior no tempo, conforme exigido pela aplicação. Essa funcionalidade de reprodução do Kafka, graças à sua camada de armazenamento durável e escalável, é uma grande vantagem para muitos casos de uso importantes na prática, como aprendizado de máquina e testes A/B, onde dados históricos e ativos são necessários. Também é um requisito em setores regulamentados, onde os dados devem ser retidos por muitos anos para atender à conformidade legal. Os sistemas de mensagens tradicionais como RabbitMQ, ActiveMQ não suportam tais requisitos.

package main
 
import (
    "fmt"
 
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
 
func main() {
 
    topic := "test_topic"
 
    cm := kafka.ConfigMap{
        "bootstrap.servers":        "localhost:9092",
        "go.events.channel.enable": true,
        "group.id":                 "rmoff_01"}
 
    c, _ := kafka.NewConsumer(&cm)
    defer c.Close()
    c.Subscribe(topic, nil)
 
    for {
        select {
        case ev := <-c.Events():
            switch ev.(type) {
 
            case *kafka.Message:
                km := ev.(*kafka.Message)
                fmt.Printf("✅ Mensagem '%v' recebida do tópico '%v'\n", string(km.Value), string(*km.TopicPartition.Topic))
            }
        }
    }
 
}

Quando um consumidor se conecta ao Kafka, fornece um identificador de grupo de consumidores (Consumer Group). O conceito de grupo de consumidores permite duas funcionalidades: A primeira, que o Kafka mantém o controle do ponto no tópico para o qual o consumidor leu os eventos, de forma que, quando o consumidor se reconectar no tópico, possa continuar lendo do ponto em que estava antes. A segunda, que a aplicação de consumo pode querer escalar as leituras em várias instâncias, formando um grupo de consumidores que permite o processamento dos dados em paralelo. Então, o Kafka vai alocar eventos para cada consumidor dentro do grupo, com base nas partições de tópico disponíveis, e gerenciará ativamente o grupo caso os membros saiam ou ingressem posteriormente, no caso de uma instância do consumidor cair, por exemplo.

Isso significa que vários serviços podem usar os mesmos dados, sem qualquer interdependência entre eles. Os mesmos dados também podem ser roteados para armazenamentos de dados em outro lugar usando a API Kafka Connect.

As APIs de produtor e consumidor estão disponíveis em bibliotecas Java, C/C++, Go, Python, Node.js e muitas outras linguagens de programação. Mas e se a aplicação quiser usar um HTTP ao invés do protocolo nativo do Kafka? Para isso, existe um Proxy REST.

Usando API REST com Apache Kafka

Vamos supor que estamos desenvolvendo uma aplicação para um dispositivo de um estacionamento inteligente. Um payload para o evento que registra o fato de que um carro acabou de ocupar uma vaga poderia ter ser o seguinte:

{
    "nome": "NCP Sheffield",
    "vaga": "A42",
    "ocupado": true
}

Poderíamos colocar esse evento em um tópico do Kafka, que também registraria a hora como parte dos metadados do evento. A produção de dados para o Kafka usando o Confluent REST Proxy é uma chamada REST direta:

curl -X POST \
     -H "Content-Type: application/vnd.kafka.json.v2+json" \
     -H "Accept: application/vnd.kafka.v2+json" \
     --data '{"records":[{"value":{ "name": "NCP Sheffield", "space": "A42", "occupied": true }}]}' \
     "http://localhost:8082/topics/carpark"

Qualquer aplicação pode consumir a partir deste tópico, usando a API nativa do consumidor conforme descrita acima, ou usando uma chamada REST. Assim como na API nativa do consumidor, os consumidores que usam a API REST também são membros de um grupo denominado de assinatura (subscription). Assim, com a API REST, deve-se declarar primeiro o consumidor e a assinatura:

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "rmoff_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/rmoff_consumer
 
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["carpark"]}' \
 http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/subscription

Feito isso, é possível ler os eventos:

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/records
[
    {
        "topic": "carpark",
        "key": null,
        "value": {
            "name": "Sheffield NCP",
            "space": "A42",
            "occupied": true
        },
        "partition": 0,
        "offset": 0
    }
]

Se existirem vários eventos para receber, serão recebidos em um lote por chamada. Se o cliente quiser verificar se há novos eventos, precisará fazer uma nova chamada REST.

Vimos como podemos obter dados dentro e fora dos tópicos do Kafka. Mas muitas vezes queremos fazer mais do que simplesmente um pub/sub. Queremos pegar um stream de eventos e ver o panorama geral, por exemplo, saber de todos os carros que estão chegando e saindo do estacionamento, quantas vagas livres existem neste momento? Ou talvez, assinar um stream de atualizações apenas para um determinado estacionamento.

Notificações condicionais, processamento de stream e views materializadas

Pensar no Apache Kafka somente como um pub/sub, é como utilizar o iPhone apenas como um celular para fazer e receber chamadas. Podemos descrever como sendo uma das capacidades desta ferramenta, mas possui muito mais funcionalidades. O Apache Kafka inclui recursos de processamento de streams por meio da API Kafka Streams. Esta é uma biblioteca cliente escrita em Java, rica em recursos para fazer processamento de stream de dados com estado no Kafka, em escala e em várias máquinas. Amplamente utilizado em empresas como Walmart, Ticketmaster e Bloomberg, o Kafka Streams também fornece as bases para o ksqlDB.

ksqlDB é um banco de dados de streaming de eventos desenvolvido especificamente para aplicações de processamento de stream. Fornece uma API baseada em SQL para consultar e processar dados no Kafka. Os diversos recursos do ksqlDB incluem a filtragem, transformação e junção de dados de streams e tabelas em tempo real, criando visualizações materializadas através da agregação de eventos, além de muitas outras funcionalidades.

Para trabalhar com os dados em ksqlDB, primeiro precisamos declarar um esquema:

CREATE STREAM CARPARK_EVENTS (NAME     VARCHAR,
                              SPACE    VARCHAR,
                              OCCUPIED BOOLEAN)
                        WITH (KAFKA_TOPIC='carpark',
                              VALUE_FORMAT='JSON');

O ksqlDB é implementado como uma aplicação em cluster e esse trabalho de declaração inicial pode ser feito na inicialização ou diretamente pelo cliente, conforme a necessidade. Feito isso, qualquer cliente pode se inscrever em um stream de alterações do tópico original, mas com um filtro aplicado. Por exemplo, para receber uma notificação quando uma vaga é liberada em um determinado estacionamento, pode-se executar o seguinte comando:

SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS EVENT_TS,
       SPACE
  FROM CARPARK_EVENTS
 WHERE NAME='Sheffield NCP'
   AND OCCUPIED=false
  EMIT CHANGES;

Ao contrário das consultas SQL que estamos acostumados, esta é uma consulta contínua, denotada pela cláusula EMIT CHANGES. As consultas contínuas, conhecidas como consultas push, continuarão a retornar quaisquer novas correspondências ao predicado conforme os eventos ocorrerem, agora e no futuro, até que sejam encerradas. O ksqlDB também oferece suporte a consultas pull, que se comportam como uma consulta em um RDBMS, retornando valores para uma consulta em um determinado momento. Portanto, o ksqlDB suporta os mundos de streaming e modo estático, o que significa que na maioria das aplicações, também terão de fazer com base nas ações que estão sendo executadas.

O ksqlDB inclui uma API REST abrangente. O comando SQL acima pode ser realizado utilizando a chamada via API, que seria semelhante ao comando usado no curl:

curl --http2 'http://localhost:8088/query-stream' \
     --data-raw '{"sql":"SELECT TIMESTAMPTOSTRING(ROWTIME,'\''yyyy-MM-dd HH:mm:ss'\'') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='\''Sheffield NCP'\'' and OCCUPIED=false EMIT CHANGES;"}'

Esta chamada resulta em uma resposta de streaming do servidor, com um cabeçalho e, quando os eventos que correspondam à condição são recebidos no tópico de origem, estes eventos são enviados ao cliente:

{"queryId":"383894a7-05ee-4ec8-bb3b-c5ad39811539","columnNames":["EVENT_TS","SPACE"],"columnTypes":["STRING","STRING"]}
…
["2020-08-05 16:02:33","A42"]
…
…
…
["2020-08-05 16:07:31","D72"]
…

Podemos usar ksqlDB para definir e preencher novos streams de dados também. Ao preceder uma instrução SELECT com o comando CREATE STREAM streamname AS, podemos encaminhar a saída da consulta contínua para um tópico do Kafka. Dessa forma, podemos usar o ksqlDB para realizar as transformações, junções, filtragem e muito mais nos eventos que enviamos para o Kafka. O ksqlDB suporta o conceito de uma tabela como um tipo de objeto de primeira classe, e poderíamos usar isso para enriquecer os eventos do estacionamento que estamos recebendo, com informações sobre o próprio estacionamento:

CREATE STREAM CARPARKS AS
    SELECT E.NAME AS NAME, E.SPACE,
           R.LOCATION, R.CAPACITY,
           E.OCCUPIED,
           CASE
               WHEN OCCUPIED=TRUE THEN 1
               ELSE -1
           END AS OCCUPIED_IND
    FROM   CARPARK_EVENTS E
           INNER JOIN
           CARPARK_REFERENCE R
           ON E.NAME = R.NAME;

Estamos utilizando uma instrução CASE para aplicar a lógica aos dados, permitindo criar uma contagem contínua de vagas disponíveis no estacionamento. O CREATE STREAM acima popula um tópico Kafka semelhante ao descrito a seguir:

+----------------+-------+----------+----------------------------+----------+--------------+
|NAME            |SPACE  |OCCUPIED  |LOCATION                    |CAPACITY  |OCCUPIED_IND  |
+----------------+-------+----------+----------------------------+----------+--------------+
|Sheffield NCP   |E48    |true      |{LAT=53.4265964, LON=-1.8426|1000      |1             |
|                |       |          |386}                        |          |              |

Por fim, vamos verificar como criar uma agregação com estado em ksqlDB e consultá-la a partir de um cliente. Para criar a visão materializada, executamos o SQL que inclui as funções de agregação:

CREATE TABLE CARPARK_SPACES AS
    SELECT NAME,
           SUM(OCCUPIED_IND) AS OCCUPIED_SPACES
        FROM CARPARKS
        GROUP BY NAME;

Este estado é mantido em todos os nós distribuídos do ksqlDB e pode ser consultado diretamente usando a API REST:

curl --http2 'http://localhost:8088/query-stream' \
     --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'

Ao contrário da resposta de streaming que vimos anteriormente, as consultas em relação ao estado, conhecidas como “consultas pull” em oposição a “consultas push”, retornam imediatamente e depois finalizam:

{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[30]

Se a aplicação deseja obter o valor mais recente, pode executar a consulta novamente, e o valor pode ou não ter sido alterado:

curl --http2 'http://localhost:8088/query-stream' \
     --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[29]

Existe também um cliente Java para o ksqlDB e para os clientes Python e Go de autoria da comunidade.

Integração com outros sistemas

Um dos benefícios de usar o Apache Kafka como um broker altamente escalável e persistente para mensagens assíncronas, é que os mesmos dados que são trafegados entre as aplicações, também podem ser direcionados para o processamento de stream (como vimos acima) e podem ser consumidos diretamente por sistemas dependentes.

Continuando com o exemplo de uma aplicação que envia um evento toda vez que um carro estaciona ou libera uma vaga do estacionamento, é provável que estas informações sejam utilizadas em outro lugar, como por exemplo:

  • Análises para observar os comportamentos e tendências no estacionamento;
  • Aprendizado de máquina para prever os requisitos de capacidade;
  • Feeds de dados para fornecedores terceirizados.

Usando o Connect API do Apache Kafka, é possível definir integrações de streaming com sistemas dentro e fora do mesmo. Por exemplo, para transmitir os dados em tempo real do Kafka para o Amazon S3, pode-se executar o comando:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-s3/config \
    -d ' {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "topics": "carpark",
        "s3.bucket.name": "rmoff-carparks",
        "s3.region": "us-west-2",
        "flush.size": "1024",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat"
        }'

Os mesmos dados que direcionam as notificações para a aplicação e criam o estado que pode ser consultado diretamente, também estão sendo transmitidos para o Amazon S3. Cada uso é desacoplado do outro. Se posteriormente quisermos transmitir os mesmos dados para outro destino, como o Amazon Snowflake, basta adicionar outra configuração no Kafka Connect. Os outros consumidores não são afetados pela modificação. O Kafka Connect também pode transmitir dados para o Kafka. Por exemplo, a tabela CARPARK_REFERENCE que usamos no ksqlDB acima pode ser transmitida usando o Change Data Capture (CDC) de um banco de dados que atua como o sistema de registro para essas informações.

Conclusão

O Apache Kafka oferece uma plataforma escalonável de streaming de eventos com a qual é possível construir aplicações em torno do poderoso conceito de eventos. Existem diversos benefícios ao usar eventos como base para conectar as aplicações e serviços, incluindo o baixo acoplamento, autonomia dos serviços, elasticidade, capacidade de evolução flexível e resiliência.

As APIs do Kafka e seu ecossistema, incluindo o ksqlDB, podem ser utilizadas para o consumo baseado em assinatura, bem como pesquisas de chave/valor em visualizações materializadas, sem a necessidade de armazenamentos de dados adicionais. As APIs estão disponíveis como clientes nativos e também por REST.

Para saber mais sobre o Apache Kafka, visite o site developer.confluent.io. A Confluent Platform é uma distribuição do Apache Kafka que inclui todos os componentes discutidos neste artigo. Está disponível on-premises ou como um serviço gerenciado chamado Confluent Cloud. Os exemplos de código deste artigo e um Docker Compose podem ser encontrados no GitHub. Para aprender mais sobre a construção de sistemas orientados a eventos em torno do Kafka, certifique-se de ler o excelente livro Designing Event-Driven Systems, de Ben Stopford.