<< All Posts
Zookeeper and Apache Kafka with Ruby


Time to check this distributed queue system idea using Zookeeper and Apache Kafka. Let me guide you from installing it on Ubuntu 16.04 and finally testing a producer and consumer using Ruby.

There we go:

1) Install Zookeeper: sudo apt-get install zookeeper

2) Downloading Apache Kafka:

wget http://apache.parentingamerica.com/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz

3) Installing Kafka:

sudo mkdir /usr/local/kafka

sudo tar -xvf kafka_2.11-0.11.0.0.tgz -C /usr/local/kafka/

4) Special config to make it work on a single node:

sudo vim /usr/local/kafka/kafka_2.11-0.11.0.0/config/server.properties

port = 9092
advertised.host.name = localhost

5) Starting Apache Kafka in background. Notice that you DON’T need sudo.

nohup /usr/local/kafka/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh /usr/local/kafka/kafka_2.11-0.11.0.0/config/server.properties &

6) Let’s create a iptables simple firewall allowing only localhost to connect to our Zookeeper and Kafka. Remember that the idea of Zookeeper and Kafta is having a distributed queue system where you can spread your queue on multiple machines. (Huge stuff brow); So, since the idea here is just study using one single machine on digital ocean, here is how to block h4ck3rs from internet connecting to your toys.

vim iptables.sh

#!/bin/bash

echo "Closing Kafka and Zookeeper ports from external world. Allowing just locally."

iptables -A INPUT -p tcp --dport 2181 -s 127.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 44337 -s 127.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 34232 -s 127.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 9092 -s 127.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 2181 -j DROP
iptables -A INPUT -p tcp --dport 44337 -j DROP
iptables -A INPUT -p tcp --dport 34232 -j DROP
iptables -A INPUT -p tcp --dport 9092 -j DROP

Cleaning iptables before starting our rules. Careful if you have previous rules!! You can check the rules with sudo iptables -L

sudo iptables -F
sudo ./iptables.sh

Now testing using ruby:

gem install ruby-kafka
git clone [email protected]:rodolfobandeira/ruby-kafka.git
cd ruby-kafka
ruby producer.rb # (Repete 3 times)
ruby consumer.rb # (Yeahh)
Hello, World! 617f4877-e34d-4bab-9993-4f95da626549
1

Hello, World! f14e791a-092f-4100-b35d-c51d716e5e57
2

Hello, World! 87c302aa-df8e-41fa-88b5-657148f029d1
3

producer.rb:

require 'kafka'
require 'securerandom'

kafka = Kafka.new(
  seed_brokers: ['127.0.0.1:9092'],
  client_id: 'my-application',
)

kafka.deliver_message("Hello, World! #{SecureRandom.uuid}", topic: 'greetings')

consumer.rb:

require 'kafka'

kafka = Kafka.new(seed_brokers: ['localhost:9092'])


consumer = kafka.consumer(
  group_id: 'my-consumer',

  # Increase offset commit frequency to once every 5 seconds.
  offset_commit_interval: 5,

  # Commit offsets when 100 messages have been processed.
  offset_commit_threshold: 100,

  # Increase the length of time that committed offsets are kept.
  offset_retention_time: 7 * 60 * 60
)

consumer.subscribe('greetings')

trap("TERM") { consumer.stop }

consumer.each_message(automatically_mark_as_processed: false) do |message|
  puts message.offset, message.key, message.value

  consumer.mark_message_as_processed(message)
end

Hope you enjoy!


Reference:

  • https://gist.github.com/monkut/07cd1618102cbae8d587811654c92902
  • https://devops.profitbricks.com/tutorials/install-and-configure-apache-kafka-on-ubuntu-1604-1/
  • https://stackoverflow.com/questions/35788697/leader-not-available-kafka-in-console-producer
  • https://github.com/zendesk/ruby-kafka#consuming-messages-from-kafka
  • https://github.com/rodolfobandeira/ruby-kafka



<< All Posts