Eelduseks
Kafka andmete lugemiseks peate installima vajaliku Pythoni teegi. Python3-d kasutatakse selles õpetuses tarbija ja tootja skripti kirjutamiseks. Kui pip-paketti pole teie Linuxi operatsioonisüsteemi varem installitud, peate enne pythoni Kafka teegi installimist installima pip-i. python3-kafka kasutatakse selles õpetuses Kafka andmete lugemiseks. Teegi installimiseks käivitage järgmine käsk.
$ pip installib python3-kafkaLihtsate tekstiandmete lugemine Kafkast
Tootjalt saab konkreetse teema kohta saata erinevat tüüpi andmeid, mida tarbija saab lugeda. Kuidas lihtsaid tekstiandmeid saab Kafkalt tootja ja tarbija abil saata ja vastu võtta, on näidatud selle õpetuse selles osas.
Looge fail nimega tootja1.py järgmise pythoni skriptiga. KafkaTootja moodul imporditakse Kafka raamatukogust. Maaklerite loend peab Kafka serveriga ühenduse loomiseks määratlema tootjaobjekti initsialiseerimise ajal. Kafka vaikeport on '9092". argumenti bootstrap_servers kasutatakse hosti nime määratlemiseks pordiga. "First_Topic'on seatud teema nimeks, millega produtsendilt tekstisõnum saadetakse. Järgmine, lihtne tekstsõnum, "Tere Kafkast'saadetakse saada() meetod KafkaTootja teema juurde, "First_Topic".
tootja1.py:
# Import KafkaProducer Kafka raamatukogustalates kafka import KafkaProducer
# Määrake server pordiga
bootstrap_servers = ['localhost: 9092']
# Määrake teema nimi, kus sõnum avaldatakse
topicName = 'First_Topic'
# Algata tootja muutuja
producer = KafkaProducer (alglaadimisserverid = alglaadimisserverid)
# Avalda tekst määratletud teemas
tootja.saada (teemaNimi, b'Tere kafkast ... ')
# Sõnumi printimine
print ("Sõnum saadetud")
Looge fail nimega tarbija1.py järgmise pythoni skriptiga. KafkaTarbija moodul imporditakse Kafka teegist Kafka andmete lugemiseks. sys moodulit kasutatakse siin skripti lõpetamiseks. Kafka andmete lugemiseks kasutatakse tarbija skriptis tootja sama hosti ja pordi numbrit. Tarbija ja tootja teema nimi peab olema sama, mis on "Esimene_teema". Järgmisena lähtestatakse tarbijaobjekt kolme argumendiga. Teema nimi, rühma ID ja serveri teave. eest loopi kasutatakse siin Kafka tootja saadetud teksti lugemiseks.
tarbija1.py:
# Import KafkaTarbija Kafka raamatukogustalates kafka import KafkaTarbija
# Impordi sys moodul
impordi süsteem
# Määrake server pordiga
bootstrap_servers = ['localhost: 9092']
# Määratlege teema nimi, kust sõnum vastu võetakse
topicName = 'First_Topic'
# Initsialiseerige tarbijamuutuja
tarbija = KafkaConsumer (teemaNimi, grupi_id = 'rühm1', alglaadimisserverid =
alglaadimisserverid)
# Lugege ja printige tarbija sõnumit
Tarbija sõnumi puhul:
print ("Teema nimi =% s, Sõnum =% s"% (msg.teema, sõnum.väärtus))
# Lõpeta skript
sys.väljumine ()
Väljund:
Tootjaskripti käivitamiseks käivitage järgmine terminal ühest terminalist.
$ python3 tootja1.pyPärast teate saatmist kuvatakse järgmine väljund.
Tarbija skripti käivitamiseks käivitage järgmine käsk teisest terminalist.
$ python3 tarbija1.pyVäljund näitab teema nime ja tootjalt saadetud tekstsõnumit.
JSON-vormingus andmete lugemine Kafkast
JSON-vormingus andmeid saab Kafka tootja saata ja Kafka tarbija saab neid lugeda Json pythoni moodul. Kuidas JSON-i andmeid enne andmete saatmist ja vastuvõtmist python-kafka mooduli abil järjestada ja desarialiseerida, kuvatakse selle õpetuse selles osas.
Looge nimega pythoni skript tootja2.py järgmise skriptiga. Teine JSON-moodul imporditakse koos KafkaTootja moodul siin. väärtus_serializer argumenti kasutatakse alglaadimisserverid argument siin Kafka tootja objekti initsialiseerimiseks. See argument näitab, et JSON-i andmed kodeeritakse, kasutadesutf-8'saatekirja ajal määratud tähemärk. Järgmisena saadetakse JSON-vormingus andmed nimetatud teemale JSONtopic.
tootja2.py:
# Import KafkaProducer Kafka raamatukogustalates kafka import KafkaProducer
# Andmete jadastamiseks importige JSON-moodul
import json
# Initsialiseeri tootjamuutuja ja määra JSON-kodeeringu parameeter
tootja = KafkaProducer (alglaadimisserverid =
['localhost: 9092'], väärtus_serializer = lambda v: json.prügimäed (v).kodeerida ('utf-8'))
# Saada andmed JSON-vormingus
tootja.saada ('JSONtopic', 'name': 'fahmida', 'email': '[email protected]')
# Sõnumi printimine
print ("JSONtopicule saadetud sõnum")
Looge nimega pythoni skript tarbija2.py järgmise skriptiga. KafkaTarbija, sys ja JSON-moodulid imporditakse selles skriptis. KafkaTarbija moodulit kasutatakse JSON-vormingus andmete lugemiseks Kafkast. JSON-moodulit kasutatakse Kafka tootjalt saadetud kodeeritud JSON-andmete dekodeerimiseks. Sys moodulit kasutatakse skripti lõpetamiseks. väärtus_deserializer argumenti kasutatakse alglaadimisserverid määratleda, kuidas JSON-andmeid dekodeeritakse. Järgmine, eest silmust kasutatakse kõigi Kafkast hangitud tarbijate kirjete ja JSON-andmete printimiseks.
tarbija2.py:
# Import KafkaTarbija Kafka raamatukogustalates kafka import KafkaTarbija
# Impordi sys moodul
impordi süsteem
# Andmete jadastamiseks impordi jsoni moodul
import json
# Initsialiseerige tarbijamuutuja ja määrake JSON-i dekodeerimise omadus
tarbija = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
väärtuse_eraldaja = lambda m: json.koormused (m.dekodeerida ('utf-8')))
# Loe kafka andmeid
tarbijale mõeldud sõnumi jaoks:
print ("Tarbijakirjed: \ n")
print (sõnum)
print ("\ nLugemine JSON-i andmetest \ n")
print ("Nimi:", teade [6] ['nimi'])
print ("E-post:", sõnum [6] ['e-post'])
# Lõpeta skript
sys.väljumine ()
Väljund:
Tootjaskripti käivitamiseks käivitage järgmine terminal ühest terminalist.
$ python3 tootja2.pySkript prindib pärast JSON-i andmete saatmist järgmise teate.
Tarbija skripti käivitamiseks käivitage järgmine käsk teisest terminalist.
$ python3 tarbija2.pyPärast skripti käivitamist ilmub järgmine väljund.
Järeldus:
Andmeid saab Pyfoni abil Kafkalt saata ja vastu võtta erinevates vormingutes. Andmeid saab ka andmebaasi salvestada ja andmebaasist hankida, kasutades Kafka ja pythoni. Ma olen kodus, see õpetus aitab pythoni kasutajal alustada Kafkaga töötamist.