Cardano => Kafka
This guide shows how to leverage Oura to stream data from a Cardano node into a Kafka topic.
About Kafka
Apache Kafka is a framework implementation of a software bus using stream-processing. It is an open-source software platform developed by the Apache Software Foundation written in Scala and Java.
Find more info about Kafka in wikipedia or visit Kafka’s official website
Prerequisites
This examples assumes the following prerequisites:
- A running Cardano node locally accesible via a unix socket.
- A Kafka cluster accesible through the network.
- An already existing Kafka topic where to output events
- Oura binary release installed in local system
Instructions
1. Create an Oura configuration file cardano2kafka.toml
[source]type = "N2C"address = ["Unix", "/opt/cardano/cnode/sockets/node0.socket"]magic = "testnet"
[sink]type = "Kafka"brokers = ["kafka-broker-0:9092"]topic = "cardano-events"Some comments regarding the above configuration:
- the [source]section indicates Oura from where to pull chain data.- the N2Csource type value tells Oura to get the data from a Cardano node using Node-to-Client mini-protocols (chain-sync instantiated to full blocks).
- the addressfield indicates that we should connect viaUnixsocket at the specified path. This value should match the location of your local node socket path.
- the magicfield indicates that our node is running in thetestnetnetwork. Change this value tomainnetif appropriate.
 
- the 
- the [sink]section tells Oura where to send the information it gathered.- the Kafkasink type value indicates that Oura should use a Kafka producer client as the output
- the brokersfield indicates the location of the Kafka brokers within the network. Several hostname:port pairs can be added to the array for a “cluster” scenario.
- the topicfields indicates which Kafka topic to used for the outbound messages.
 
- the 
2. Run Oura in daemon mode
Run the following command from your terminal to start the daemon process:
RUST_LOG=info oura daemon --config cardano2kafka.tomlYou should see an output similar to the following:
[2021-12-13T22:16:43Z INFO  oura::sources::n2n::setup] handshake output: Accepted(7, VersionData { network_magic: 764824073, initiator_and_responder_diffusion_mode: false })[2021-12-13T22:16:43Z INFO  oura::sources::n2n::setup] chain point query output: Some(Tip(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"), 6624258))[2021-12-13T22:16:43Z INFO  oura::sources::n2n::setup] node tip: Point(47867448,"f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6")[2021-12-13T22:16:44Z INFO  oura::sources::n2n] rolling block to point Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6")[2021-12-13T22:16:52Z INFO  oura::sources::n2n] requesting block fetch for point Some(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"))[2021-12-13T22:17:15Z INFO  oura::sources::n2n] requesting block fetch for point Some(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"))[2021-12-13T22:17:20Z INFO  oura::sources::n2n] requesting block fetch for point Some(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"))