Kafka
நிகழ் நேரத்தில் அதிக அளவு உற்பத்தியாகும் (throughput) தரவு ஊட்டங்களை (data feed) குறைந்த காலதாமதத்தில் (low latency) பெற்று ப்ராசஸ் செய்வதற்கான ஒரு கட்டமைப்பே kafka ஆகும். இது scala மொழியில் எழுதப்பட்ட திறந்த மூல மென்பொருள் கருவி ஆகும். ப்ரொடியூசர் கன்ஸ்யூமர் என்னும் இருவேறு அப்ளிகேஷன்களுக்கு இடையே செய்திகளைத் தாங்கிச் செல்லும் இடைத்தரகர் போன்று இக்கருவி செயல்படும். IOT சென்சார் தரவுகள், சேவை மையங்களில் தினசரி மேற்கொள்ளப்படும் தொலைபேசி அழைப்புகள், ஒரு வங்கியின் பல்வேறு ஏடிஎம் அட்டைகளில் பல்வேறு இடங்களில் மேற்கொள்ளப்படும் பணப் பரிமாற்றங்கள் போன்றவையெல்லாம் ப்ரொடியூசராக இயங்கி தரவுகளை வழங்கிக் கொண்டே இருந்தால் அவற்றையெல்லாம் சேமிக்க உதவும் data lake, hdfs, mongodb போன்ற nosql டேட்டாபேஸஸ் ஆகியவை கன்ஸ்யூமராக இயங்கி அவற்றைப் பெற்றுக் கொள்ளும். இவ்விரண்டுக்கும் இடையே topic என்ற ஒன்றை உருவாக்கி அதன் கீழ் உடனுக்குடன் தரவுகளைப் பகிரும் வேலையை kafka செய்கிறது.
தரவுகளை வழங்குவது ப்ரொடியூசர் எனவும், பெற்றுக்கொள்வது கன்ஸ்யூமர் எனவும் அழைக்கப்படும். இவ்விரண்டுக்கும் இடையில் ஒவ்வொரு partition-ஆக தரவுகளின் பகிர்வு நடைபெறுவதைக் கண்காணிக்கும் வேலையை zoo keeper செய்கிறது. ஜூ கீப்பர் இல்லாமல் கஃப்காவை நாம் இயக்க முடியாது.
கஃப்காவை கீழ்க்கண்ட முகவரியில் சென்று பதிவிறக்கம் செய்து பிரித்தெடுத்து வைத்துக் கொள்ளவும்.
$ wget "https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz" $ tar -xzf kafka_2.12-2.5.0.tgz $ cd kafka_2.12-2.5.0
Zoo Keeper & Kafka Server
கஃப்கா மற்றும் ஜூ கீப்பர் ஆகிய இரண்டுக்குமான சர்வர்களை இரண்டு தனித்தனி டெர்மினல்களில் ஓடவிட வேண்டும். இது பின்வருமாறு.
Terminal1:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Terminal2:
$ bin/kafka-server-start.sh config/server.properties
இவை இயங்குவதில் ஏதேனும் பிரச்சினை ஏற்பட்டால் server. properties எனும் கோப்பிற்குள் சென்று listeners என்பது கமெண்ட் செய்யப்பட்டிருந்தால் அதனை நீக்கிவிடவும். பின் அதன் மதிப்பினை என அமைக்கவும்.
$ vim config/server.properties listeners = PLAINTEXT://localhost:9092
Topic creation
மேற்கண்ட இரண்டும் வெவ்வேறு டெர்மினல்களில் ஓடிக்கொண்டிருக்கும் போது புதிதாக ஒரு டெர்மினலை உருவாக்கி, அதில் தரவுகளைப் பகிர்வதற்குத் தேவையான ஒரு டாப்பிக்கை உருவாக்கவும். இங்கு கணியம் எனும் பெயரில் ஒரு டாப்பிக் உருவாக்கப்பட்டுள்ளது. அடுத்துள்ள கட்டளை இதுவரை நாம் உருவாக்கியுள்ள டாப்பிக் அனைத்தையும் பட்டியலிட உதவும்.
Terminal3:
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kaniyam
Producer – Consumer Model
ஓரிடத்திலிருந்து தகவல்களை வழங்குவது producer எனவும், அதனை மற்றொரு இடத்திலிருந்து பெற்றுக் கொள்வது consumer எனவும் அழைக்கப்படும். மிக எளிமையாக இதனை செய்து பார்க்க ஒரு டெர்மினலில் ப்ரொடியூசரை இயக்கி மற்றொரு டெர்மினலில் கன்ஸ்யூமரை இயக்கவும். இந்த டெர்மினலில் நாம் கொடுக்கின்ற தரவுகள் அனைத்தும் அடுத்த டெர்மினலில் உடனுக்குடன் வெளிப்படுகிறதா என சோதிக்கலாம்.
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic kaniyam
It’s a website for open-source technologies.
இப்படி ஒவ்வொரு வரியாக புரொடியூசர் இயங்குகின்ற இடத்தில் மெசேஜை அளிக்கவும்.
கன்ஸ்யூமர் இயங்குகின்ற இடத்தில் சென்று கொடுக்கப்பட்ட வரிகள் ஒவ்வொரு முறையும் உடனுக்குடன் வெளிப்படுகிறதா என சோதிக்கவும்.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kaniyam --from-beginning
சோதித்து முடித்தபின் கன்ஸ்யூமரையோ புரொடியூசரையோ ctrl+d எனக் கொடுத்து நிறுத்தினால் மொத்தம் எத்தனை வரிகள் பகிரப்பட்டுள்ளது என்பதை வெளிப்படுத்தும். இங்கு Processed a total of 4 messages என்பதை வெளிப்படுத்தியுள்ளது.
Multinode Cluster
ஒரே கணினியில் பல்வேறு nodes கொண்ட கிளஸ்டரை நாம் உருவாக்கலாம். இதற்காக பல்வேறு சர்வர்களைப் பயன்படுத்துவதற்கு பதிலாக ஒரே சர்வரில் பல்வேறு nodes-ஐ இயக்குவதன் மூலம் இது சாத்தியமாகிறது. கஃப்காவின் config போஃல்டருக்குள் நமது லோக்கல் சர்வருடைய பண்புகளைப் பெற்று விளங்கும் கோப்பினைக் காணலாம். இதனை அப்படியே நகலெடுத்து அவற்றை சர்வர்1, சர்வர்2 எனும் பெயரில் சேமிக்கவும்.
$ cp server.properties server1.properties $ cp server.properties server2.properties
பின் இத்தகைய கோப்புகளில் உள்ள சில முக்கியப் பண்புகளின் மதிப்புகளை பின்வருமாறு அமைப்பதன் மூலம் மூன்று ப்ரோக்கர்களை மூன்று வெவ்வேறு port-இல் இயங்குமாறு செய்யலாம்.
$ vim server.properties broker.id=0 listeners=PLAINTEXT://localhost:9092 log.dirs=/tmp/kafka-logs
$ vim server1.properties broker.id=1 listeners=PLAINTEXT://localhost:9093 log.dirs=/tmp/kafka-logs-1
$ vim server2.properties broker.id=2 listeners=PLAINTEXT://localhost:9094 log.dirs=/tmp/kafka-logs-2
ஏற்கனவே கஃப்கா மற்றும் ஜூ கீப்பர் ஆகிய இரண்டுக்குமான சர்வர்கள் இரண்டு தனித்தனி டெர்மினல்களில் ஓடிக்கொண்டிருக்க இப்போது புதிதாக உருவாக்கிய இரண்டு சர்வர்களையும் இரண்டு டெர்மினல்களில் ஓடவிட வேண்டும்.
Terminal4:
$ bin/kafka-server-start.sh config/server1.properties
Terminal5:
$ bin/kafka-server-start.sh config/server2.properties
இப்போது மூன்று சர்வர்கள் மூன்று port-இல் ஓடிக்கொண்டிருக்கும் நிலையில், 9092 எனும் போர்டில் carrot எனும் டாப்பிக்கை உருவாக்கவும். அப்போது 3 இடங்களில் பிரதி எடுத்து வைக்குமாறு கொடுக்கவும். எனவே ஒரு இடத்தில் இது அழிக்கப்பட்டாலும், மற்ற இடங்களில் இதன் செயல்பாட்டைக் காணலாம்.
Terminal6:
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic carrot
9092 எனும் போர்டில் ஓடிக்கொண்டிருக்கும் சர்வரின் பிராசஸ் எண்ணைக் கண்டுபிடித்து அழிக்க கீழ்கண்ட கட்டளைகள் பயன்படுகின்றன.
$ ps aux | grep server.properties $ kill -9 19042
அதன்பின் carrot எனும் டாப்பிக் 9093 எனும் போர்டில் இடம்பெற்றிருப்பதைக் காணலாம். இதுவே multi-node cluster-க்கு சிறந்த உதாரணமாக அமையும்.
$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9093 --topic carrot
File Streaming: kafka-connect
பல்வேறு மூலங்களில் இருக்கும் தரவுகளை கஃப்கா டாப்பிக் வழியே இன்னபிற இடங்களில் செலுத்துவதற்கு kafka-connect எனும் கருவி பயன்படுகிறது. தொடர்ச்சியாக ஓரிடத்தில் கோப்புகள் வந்து விழும்போது அவற்றை எடுத்து ஒரு டாப்பிக்குக்குள் செலுத்த FileSource connector பயன்படுகிறது. அவ்வாறே கஃப்கா டாப்பிக்குக்குள் இருப்பவற்றை ஒரு ஃபைலில் எழுதி ஓரிடத்தில் சேமிக்க FileSink connector பயன்படுகிறது.
கீழ்கண்ட எடுத்துக்காட்டில் smp.txt எனும் கோப்பிற்குள் உள்ளவற்றை கணியம் எனும் டாப்பிக்குக்குள் செலுத்துவதற்கும், பின்னர் அந்த டாப்பிக்குக்குள் உள்ளவற்றை sample.txt எனும் கோப்பினுள் செலுத்துவதற்குமான connectors பின்வருமாறு அமையும்.
Config போஃல்டருக்குள் உள்ள இத்தகைய கனெக்டாரின் பண்புகளை பின்வருமாறு அமைக்க வேண்டும்.
$ vim config/connect-file-source.properties
$ vim config/connect-file-sink.properties
பின்னர் connect-standalone.sh எனும் பிராசஸை இயக்குவதன் மூலம் இச்செயல் முழுமையுற்று நம்முடைய தற்போதைய டைரக்டரியில் sample.txt இடம் பெற்றிருப்பதைக் காணலாம். டெர்மினலிலும் ஒருமுறை டாப்பிக்கை இயக்கி கோப்பில் உள்ள செய்திகள் இங்கு வெளிப்படுகின்றனவா என சோதித்துக் கொள்ளலாம்.
Terminal6:
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Data Streaming: Pykafka
கஃப்காவின் புரொடியூசர் கன்ஸ்யூமர் ஆகியவற்றை பைத்தான் மொழியில் எழுதி செயல்படுத்த pykafka என்பது பயன்படுகிறது. 5 செகண்டுக்கு ஒருமுறை ஒரு json வடிவ தரவினை வெளிப்படுத்தும் producer.py எனும் புரோகிராம் இங்கு பின்வருமாறு எழுதப்பட்டுள்ளது.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pykafka import KafkaClient | |
import json | |
import time | |
client = KafkaClient(hosts='localhost:9092') | |
topic = client.topics['dataets'] | |
producer = topic.get_sync_producer() | |
producer.produce(b'test message') | |
for e in range(1000): | |
data = {'number' : e} | |
print(data) | |
info_as_json = json.dumps(data) | |
producer.produce(info_as_json.encode('ascii')) | |
time.sleep(5) |
இதனை இயக்கினால் அது பின்வருமாறு வெளிப்படுத்தும். நமக்குப் புரிய வேண்டும் என்பதற்காக இதனை பிரிண்ட் செய்து பார்த்துள்ளோம். ஆனால் உண்மையில் இது ஐந்து செகண்டுக்கு ஒருமுறை dataets எனும் டாப்பிக்குக்குள் இதனை செலுத்திக் கொண்டே இருக்கும்.
இந்த டாப்பிக்கை கன்சோலில் பிரிண்ட் செய்து பார்ப்பதன் மூலம் இதனை உறுதிபடுத்திக் கொள்ளலாம்.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dataets --from-beginning
இது ப்ரொடியூசருக்கான உதாரணம் மட்டுமே. கன்ஸ்யூமர் என்பது ஒரு டாப்பிக்குக்குள் இருப்பவற்றை வேறொரு இடத்தில் சேமிப்பது ஆகும். பொதுவாக mongodb என்பது கன்ஸ்யூமராக அமையும். இதற்கடுத்து இதனை mongodb-ல் சேமிப்பதற்கான கன்ஸ்யூமர் ப்ரோக்ராமை பைத்தான் மொழியில் எழுதுவது பற்றி பார்க்கலாம். அதற்கு முன்னர் mongodb-ஐப் பற்றிக் கொஞ்சம் தெரிந்து கொள்வோம்.