Consommation de messages Kafka

Programmeur Xiao Li 2022-07-23 21:39:58 阅读数:75

consommationmessageskafka

Consommateurs et groupes

Kafka Les consommateurs sont subordonnés aux groupes de consommateurs.Les consommateurs du Groupe de consommateurs s'abonnent au même sujet,Chaque consommateur reçoit un message d'une section du sujet.

1. Sujet T1 Oui.4Partitions,Il n'y a que des consommateurs C1, C1 Tout sera reçu4Messages pour les partitions

 2. Dans les groupes G1 Nouveaux consommateurs C2,Ensuite, chaque consommateur recevra un message de deux partitions distinctes.

 3. Si le Groupe G1 Oui. 4 Consommateurs,Chaque consommateur peut alors se voir attribuer une partition

 4. Si nous ajoutons plus de consommateurs au groupe,Plus de partitions que le sujet,Une partie des consommateurs serait alors inactive,Aucun message reçu

 

5. Si vous ajoutez un nouveau groupe avec un seul consommateur G2, Alors ce consommateur va passer du sujet T1 Recevoir tous les messages sur,Avec le Groupe G1 Aucune influence mutuelle.

Bref, Créer un groupe de consommateurs pour chaque application qui a besoin d'obtenir tous les messages pour un ou plusieurs sujets , Ensuite, ajoutez des consommateurs au Groupe pour ajuster la capacité de lecture et de traitement ,Chaque consommateur du Groupe ne traite qu'une partie des messages.

 

Configuration de base

1. bootstrap.servers (Où obtenir)

2. group.id( À quel groupe appartient )

3. key.deserializer( Désrialisateur de clés )

4. value.deserializer( Désérialisateur de valeur )

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

Abonnez - vous au sujet

consumer.subscribe(Collections.singletonList("customerCountries"));

Soutenir des thèmes précis ,La régularité est également prise en charge.

consumer.subscribe("test.*");

Message de vote

try {
// Vote
while (true) {
// Temps mort100
ConsumerRecords<String, String> records = consumer.poll(100);
// Traverser les messages récupérés
for (ConsumerRecord<String, String> record : records){
log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
} finally {
// Fermer
consumer.close();
}

Adoptionpoll Comment tirer le message ,Entrée100C'est le temps mort.Utiliser avant de quitter l'application close() Méthode de fermeture du consommateur.

Plusieurs configurations spéciales

PropriétésDescription

fetch.min.bytes

Nombre minimum d'octets que le consommateur obtient des enregistrements du serveur

fetch.max.wait.ms

Désignation broker Temps d'attente pour,Par défaut 500ms

Avecfetch.min.bytesEn association avec

max.partition.fetch.bytes

Nombre maximum d'octets retournés au consommateur à partir de chaque partition

session.timeout.ms

Temps pendant lequel le consommateur peut se déconnecter du serveur avant d'être considéré comme mort ,Par défaut 3s

heartbeat.interval.ms

poll() Méthode envoyer la fréquence cardiaque au Coordonnateur , En généralsession.timeout.msDe1/3

auto.offset.reset

Le consommateur lit une partition sans décalage ou le décalage est invalide ( Parce que le consommateur est long Échec temporel , L'enregistrement contenant l'offset est périmé et supprimé )Que faire

La valeur par défaut est latest, Lire les données à partir du dernier enregistrement .Une autre valeur est earliest, Lire l'enregistrement de la partition à partir de la position de départ .

enable.auto.commit

Le consommateur soumet - il automatiquement l'offset? Quantité,La valeur par défaut est true.

Si c'est réglé à true,Il est également possible de configurer auto.commit.interval.ms Propriété pour contrôler la fréquence des soumissions

partition.assignment.strategy

Décider quelles partitions doivent être assignées à quel consommateur.

Range La politique assigne plusieurs partitions consécutives du sujet au consommateur.

RoundRobin La stratégie donne Il y a un nombre égal de partitions attribuées par le consommateur (Ou un maximum d'une partition)

client.id

Identifier les messages envoyés par les clients

max.poll.records

Contrôle un seul appel poll() Nombre d'enregistrements que la méthode peut renvoyer

receive.buffer.bytes

send.buffer.bytes

Utilisé pour lire et écrire des données TCP Taille du tampon

Soumettre、Offset

Appelez poll() La méthode renvoie un enregistrement qui n'a pas été lu par le consommateur , Vous pouvez tracer quels enregistrements sont lus par quel consommateur dans le Groupe .Les consommateurs peuvent utiliser Kafka Pour suivre l'emplacement du message dans la partition (Offset).

On met Mettre à jour l'emplacement actuel de la partition L'opération s'appelleSoumettre.

Les consommateurs se tournent vers un _consumer_offset Sujet pour envoyer un message , Le message contient un décalage par partition . Les consommateurs se sont effondrés ou de nouveaux consommateurs ont rejoint le Groupe , Déclenchera un rééquilibrage , Les partitions sont réattribuées . Pour pouvoir poursuivre les travaux antérieurs , Le consommateur doit lire le dernier offset soumis pour chaque partition , Puis procéder à partir de l'offset spécifié .

En dernier analyse, L'offset est pour le rééquilibrage , Les consommateurs réattribués sont en mesure de poursuivre le traitement des données à leur emplacement précédent , Au lieu de perdre des données ou de réutiliser des données déjà traitées .

Et l'offset est soumis pour empêcher le rééquilibrage , Cet offset n'a pas été trouvé (Lieu de l'accident).

Soumission automatique

La façon la plus simple de soumettre est de permettre au consommateur de soumettre automatiquement un décalage .Si enable.auto.commit Set to true,C'est... Tous les jours. 5s,Les consommateurs vont automatiquement poll() Le décalage maximal reçu par la méthode est soumis à.

Mais ce qui ne peut être évité, c'est qu'au cours des deux décalages de soumission , Rééquilibrage en cours , En conséquence, les compensations traitées n'ont pas été soumises en temps opportun ,Peut entraîner une consommation répétée.

Soumission manuelle synchronisée

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
try {
// Soumission synchrone
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}

Prends ça. auto.commit.offset Set to false,Laisser l'application décider quand soumettre l'offset. commitSync() Sera soumis par poll() Dernier décalage retourné.En cas de rééquilibrage, Tous les messages du dernier lot jusqu'à ce que le rééquilibrage se produise seront traités en double .commitSync Si la soumission échoue, elle sera récupérée. .

Soumission asynchrone manuelle

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}

Rappel du support de soumission asynchrone

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}

commitAsync() Ne pas réessayer en cas d'échec .

Présentation combinée

Afin d'éviter que les consommateurs ne soient pas en mesure de soumettre des compensations lorsqu'ils sont fermés , Exécuter à l'arrêt commitSync(), Utilisé à chaque traitement commitAsync() Cela augmente le débit tout en , Présentation des données garanties .

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
// Soumission asynchrone d'un seul message
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
// Soumettre de façon synchrone à la sortie
consumer.commitSync();
} finally {
consumer.close();
}
}

PointspartitionSoumettre l'offset

// Cartographie des décalages de zone
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
while (true) {
// Tirer le message
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
// Enregistrer le décalage
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
// Chaque1000Article soumis une fois
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
}
}

La différence est enregistrée manuellement ici topicC'est différent.partition Offset sous ,Et le transmettre àcommitPrésentation des méthodes.

En résolvant le rééquilibrage , Le décalage ne peut pas être soumis en temps opportun ——Moniteur de rééquilibrage

// Enregistrement offset
Map<TopicPartition, OffsetAndMetadata> currentOffsets= new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition>partitions) {
}
// Rééquilibrage
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance. Committing current
offsets:" + currentOffsets);
// Décalage de soumission synchrone
consumer.commitSync(currentOffsets);
}
}
try {
// Enregistrement des auditeurs
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
}
// Soumission asynchrone,Ne pas réessayer
consumer.commitAsync(currentOffsets, null); }
} catch (WakeupException e) {
// Ignorer l'exception, Fermeture des consommateurs
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
// Soumettre de façon synchrone à la fermeture
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}

AdoptionsubscribeLors de l'abonnement,EntréeConsumerRebalanceListenerRéalisation.InonPartitionsRevoked Décalage de soumission synchrone moyen , Prévenir la perte de décalage .

Démarrer la consommation à l'offset spécifié

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition partition: partitions){
consumer.seek(partition, getOffsetFromDB(partition));
}
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);
// Spécifier la position offset
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
commitDBTransaction();
}

Inpartition Lors de la réaffectation , Lire la position de décalage enregistrée ,seekÀ l'endroit indiqué.

Sortie gracieusewakeup()

// Lors du traitement du fil principal ,Besoin deRuntimeAppelé à la sortiewakeup()
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
// Cycle,Jusqu'à ce queCtrl+CClé, Les crochets fermés sont nettoyés à la sortie
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println(System.currentTimeMillis() + " -- waiting for data...");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at position:" + consumer.position(tp));
consumer.commitSync();
}
} catch (WakeupException e) {
// Ignorer l'exception de fermeture
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}

Consommation hors abonnement

List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
consumer.assign(partitions); *
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record: records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}

AdoptionpartitionFor("topicName") Obtient toutes les partitions pour le sujet spécifié ,AdoptionAssign Assigner à la zone qu'il consomme .Ce mode etsubacribe Chez le même consommateur ,Il n'y a qu'un seul.

Copyright:Cet article est[Programmeur Xiao Li]Établi,Veuillez apporter le lien original pour réimprimer,remercier。 https://fra.fheadline.com/2022/204/202207232129035693.html