3 #include <kafka/Project.h>
5 #include <kafka/ConsumerCommon.h>
6 #include <kafka/ConsumerConfig.h>
7 #include <kafka/ConsumerRecord.h>
8 #include <kafka/Error.h>
9 #include <kafka/KafkaClient.h>
11 #include <librdkafka/rdkafka.h>
20 namespace KAFKA_API {
namespace clients {
namespace consumer {
29 static const constexpr
char* DEFAULT_MAX_POLL_RECORDS_VALUE =
"500";
58 void setGroupId(
const std::string&
id) { _groupId = id; }
65 consumer::RebalanceCallback rebalanceCallback = consumer::NullRebalanceCallback,
66 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SUBSCRIBE_TIMEOUT_MS));
75 void unsubscribe(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_UNSUBSCRIBE_TIMEOUT_MS));
81 void assign(
const TopicPartitions& topicPartitions);
97 void seek(
const TopicPartition& topicPartition, Offset offset, std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS));
109 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(topicPartitions,
true, timeout); }
110 void seekToBeginning(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(_assignment,
true, timeout); }
122 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(topicPartitions,
false, timeout); }
123 void seekToEnd(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(_assignment,
false, timeout); }
128 Offset
position(
const TopicPartition& topicPartition)
const;
137 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS))
const
139 return getOffsets(topicPartitions,
true, timeout);
148 std::map<TopicPartition, Offset>
endOffsets(
const TopicPartitions& topicPartitions,
149 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS))
const
151 return getOffsets(topicPartitions,
false, timeout);
161 std::map<TopicPartition, Offset>
offsetsForTime(
const TopicPartitions& topicPartitions,
162 std::chrono::time_point<std::chrono::system_clock> timepoint,
163 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS))
const;
178 void commitSync(
const TopicPartitionOffsets& topicPartitionOffsets);
184 void commitAsync(
const consumer::OffsetCommitCallback& offsetCommitCallback = consumer::NullOffsetCommitCallback);
196 void commitAsync(
const TopicPartitionOffsets& topicPartitionOffsets,
const consumer::OffsetCommitCallback& offsetCommitCallback = consumer::NullOffsetCommitCallback);
204 Offset
committed(
const TopicPartition& topicPartition);
214 std::vector<consumer::ConsumerRecord>
poll(std::chrono::milliseconds timeout);
224 void pause(
const TopicPartitions& topicPartitions);
236 void resume(
const TopicPartitions& topicPartitions);
249 static const constexpr
char* ENABLE_AUTO_OFFSET_STORE =
"enable.auto.offset.store";
250 static const constexpr
char* AUTO_COMMIT_INTERVAL_MS =
"auto.commit.interval.ms";
252 #if COMPILER_SUPPORTS_CPP_17
253 static constexpr
int DEFAULT_SUBSCRIBE_TIMEOUT_MS = 30000;
254 static constexpr
int DEFAULT_UNSUBSCRIBE_TIMEOUT_MS = 10000;
255 static constexpr
int DEFAULT_QUERY_TIMEOUT_MS = 10000;
256 static constexpr
int DEFAULT_SEEK_TIMEOUT_MS = 10000;
257 static constexpr
int SEEK_RETRY_INTERVAL_MS = 5000;
259 enum { DEFAULT_SUBSCRIBE_TIMEOUT_MS = 30000 };
260 enum { DEFAULT_UNSUBSCRIBE_TIMEOUT_MS = 10000 };
261 enum { DEFAULT_QUERY_TIMEOUT_MS = 10000 };
262 enum { DEFAULT_SEEK_TIMEOUT_MS = 10000 };
263 enum { SEEK_RETRY_INTERVAL_MS = 5000 };
266 enum class CommitType { Sync, Async };
267 void commit(
const TopicPartitionOffsets& topicPartitionOffsets, CommitType type);
270 static void offsetCommitCallback(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* rk_tpos,
void* opaque);
273 static Properties validateAndReformProperties(Properties
properties);
275 void commitStoredOffsetsIfNecessary(CommitType type);
276 void storeOffsetsIfNecessary(
const std::vector<consumer::ConsumerRecord>& records);
278 void seekToBeginningOrEnd(
const TopicPartitions& topicPartitions,
bool toBeginning, std::chrono::milliseconds timeout);
279 std::map<TopicPartition, Offset> getOffsets(
const TopicPartitions& topicPartitions,
281 std::chrono::milliseconds timeout)
const;
283 enum class PartitionsRebalanceEvent { Assign, Revoke, IncrementalAssign, IncrementalUnassign };
284 void changeAssignment(PartitionsRebalanceEvent event,
const TopicPartitions& tps);
286 std::string _groupId;
288 std::size_t _maxPollRecords = 500;
289 bool _enableAutoCommit =
true;
291 rd_kafka_queue_unique_ptr _rk_queue;
294 TopicPartitions _assignment;
296 TopicPartitions _userAssignment;
298 Topics _userSubscription;
300 enum class PendingEvent { PartitionsAssignment, PartitionsRevocation };
301 Optional<PendingEvent> _pendingEvent;
304 Optional<bool> _cooperativeEnabled;
305 bool isCooperativeEnabled()
const {
return _cooperativeEnabled && *_cooperativeEnabled; }
308 std::map<TopicPartition, Offset> _offsetsToStore;
311 static void registerConfigCallbacks(rd_kafka_conf_t* conf);
313 enum class PauseOrResumeOperation { Pause, Resume };
314 void pauseOrResumePartitions(
const TopicPartitions& topicPartitions, PauseOrResumeOperation op);
317 static void rebalanceCallback(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions,
void* opaque);
319 void onRebalance(rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* rk_partitions);
321 consumer::RebalanceCallback _rebalanceCb;
323 rd_kafka_queue_t* getCommitCbQueue() {
return _rk_commit_cb_queue.get(); }
325 rd_kafka_queue_unique_ptr _rk_commit_cb_queue;
327 void pollCallbacks(
int timeoutMs)
329 rd_kafka_queue_t* queue = getCommitCbQueue();
330 rd_kafka_queue_poll_callback(queue, timeoutMs);
337 KafkaConsumer::validateAndReformProperties(Properties properties)
339 using namespace consumer;
345 auto newProperties = KafkaClient::validateAndReformProperties(
properties);
355 newProperties.put(AUTO_COMMIT_INTERVAL_MS,
"0");
356 newProperties.put(ENABLE_AUTO_OFFSET_STORE,
"true");
358 return newProperties;
363 KafkaConsumer::registerConfigCallbacks(rd_kafka_conf_t* conf)
367 rd_kafka_conf_set_rebalance_cb(conf, KafkaConsumer::rebalanceCallback);
374 using namespace consumer;
379 const std::string maxPollRecords = *maxPollRecordsProperty;
380 _maxPollRecords =
static_cast<std::size_t
>(std::stoi(maxPollRecords));
387 const std::string enableAutoCommit = *enableAutoCommitProperty;
389 auto isTrue = [](
const std::string& str) {
return str ==
"1" || str ==
"true"; };
390 auto isFalse = [](
const std::string& str) {
return str ==
"0" || str ==
"false"; };
392 if (!isTrue(enableAutoCommit) && !isFalse(enableAutoCommit))
394 KAFKA_THROW_ERROR(
Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string(
"Invalid property[enable.auto.commit=").append(enableAutoCommit).append(
"], which MUST be true(1) or false(0)!")));
397 _enableAutoCommit = isTrue(enableAutoCommit);
407 const Error result{ rd_kafka_poll_set_consumer(getClientHandle()) };
408 KAFKA_THROW_IF_WITH_ERROR(result);
411 _rk_queue.reset(rd_kafka_queue_get_consumer(getClientHandle()));
414 _rk_commit_cb_queue.reset(rd_kafka_queue_new(getClientHandle()));
417 startBackgroundPollingIfNecessary([
this](
int timeoutMs){ pollCallbacks(timeoutMs); });
420 KAFKA_API_DO_LOG(Log::Level::Notice,
"initialized with properties[%s]", propsStr.c_str());
428 stopBackgroundPollingIfNecessary();
433 commitStoredOffsetsIfNecessary(CommitType::Sync);
437 KAFKA_API_DO_LOG(Log::Level::Err,
"met error[%s] while closing", e.
what());
440 rd_kafka_consumer_close(getClientHandle());
442 while (rd_kafka_outq_len(getClientHandle()))
444 rd_kafka_poll(getClientHandle(), KafkaClient::TIMEOUT_INFINITE);
447 rd_kafka_queue_t* queue = getCommitCbQueue();
448 while (rd_kafka_queue_length(queue))
450 rd_kafka_queue_poll_callback(queue, TIMEOUT_INFINITE);
453 KAFKA_API_DO_LOG(Log::Level::Notice,
"closed");
461 if (!_userAssignment.empty())
463 KAFKA_THROW_ERROR(
Error(RD_KAFKA_RESP_ERR__FAIL,
"Unexpected Operation! Once assign() was used, subscribe() should not be called any more!"));
466 if (isCooperativeEnabled() && topics == _userSubscription)
468 KAFKA_API_DO_LOG(Log::Level::Info,
"skip subscribe (no change since last time)");
472 _userSubscription = topics;
474 const std::string topicsStr = toString(topics);
475 KAFKA_API_DO_LOG(Log::Level::Info,
"will subscribe, topics[%s]", topicsStr.c_str());
477 _rebalanceCb = std::move(rebalanceCallback);
479 auto rk_topics = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topics));
481 const Error result{ rd_kafka_subscribe(getClientHandle(), rk_topics.get()) };
482 KAFKA_THROW_IF_WITH_ERROR(result);
484 _pendingEvent = PendingEvent::PartitionsAssignment;
487 for (
const auto end = std::chrono::steady_clock::now() + timeout; std::chrono::steady_clock::now() < end; )
489 rd_kafka_poll(getClientHandle(), EVENT_POLLING_INTERVAL_MS);
493 KAFKA_API_DO_LOG(Log::Level::Notice,
"subscribed, topics[%s]", topicsStr.c_str());
498 _pendingEvent.reset();
499 KAFKA_THROW_ERROR(
Error(RD_KAFKA_RESP_ERR__TIMED_OUT,
"subscribe() timed out!"));
505 if (_userSubscription.empty() && _userAssignment.empty())
507 KAFKA_API_DO_LOG(Log::Level::Info,
"skip unsubscribe (no assignment/subscription yet)");
511 KAFKA_API_DO_LOG(Log::Level::Info,
"will unsubscribe");
514 if (!_userAssignment.empty())
516 changeAssignment(isCooperativeEnabled() ? PartitionsRebalanceEvent::IncrementalUnassign : PartitionsRebalanceEvent::Revoke,
518 _userAssignment.clear();
520 KAFKA_API_DO_LOG(Log::Level::Notice,
"unsubscribed (the previously assigned partitions)");
524 _userSubscription.clear();
526 const Error result{ rd_kafka_unsubscribe(getClientHandle()) };
527 KAFKA_THROW_IF_WITH_ERROR(result);
529 _pendingEvent = PendingEvent::PartitionsRevocation;
532 for (
const auto end = std::chrono::steady_clock::now() + timeout; std::chrono::steady_clock::now() < end; )
534 rd_kafka_poll(getClientHandle(), EVENT_POLLING_INTERVAL_MS);
538 KAFKA_API_DO_LOG(Log::Level::Notice,
"unsubscribed");
543 _pendingEvent.reset();
544 KAFKA_THROW_ERROR(
Error(RD_KAFKA_RESP_ERR__TIMED_OUT,
"unsubscribe() timed out!"));
550 rd_kafka_topic_partition_list_t* raw_topics =
nullptr;
551 const Error result{ rd_kafka_subscription(getClientHandle(), &raw_topics) };
552 auto rk_topics = rd_kafka_topic_partition_list_unique_ptr(raw_topics);
554 KAFKA_THROW_IF_WITH_ERROR(result);
556 return getTopics(rk_topics.get());
560 KafkaConsumer::changeAssignment(PartitionsRebalanceEvent event,
const TopicPartitions& tps)
562 auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(tps));
567 case PartitionsRebalanceEvent::Assign:
568 result =
Error{ rd_kafka_assign(getClientHandle(), rk_tps.get()) };
573 case PartitionsRebalanceEvent::Revoke:
574 result =
Error{ rd_kafka_assign(getClientHandle(),
nullptr) };
579 case PartitionsRebalanceEvent::IncrementalAssign:
580 result =
Error{ rd_kafka_incremental_assign(getClientHandle(), rk_tps.get()) };
582 for (
const auto& tp: tps)
584 auto found = _assignment.find(tp);
585 if (found != _assignment.end())
587 const std::string tpStr = toString(tp);
588 KAFKA_API_DO_LOG(Log::Level::Err,
"incremental assign partition[%s] has already been assigned", tpStr.c_str());
591 _assignment.emplace(tp);
595 case PartitionsRebalanceEvent::IncrementalUnassign:
596 result = Error{ rd_kafka_incremental_unassign(getClientHandle(), rk_tps.get()) };
598 for (
const auto& tp: tps)
600 auto found = _assignment.find(tp);
601 if (found == _assignment.end())
603 const std::string tpStr = toString(tp);
604 KAFKA_API_DO_LOG(Log::Level::Err,
"incremental unassign partition[%s] could not be found", tpStr.c_str());
607 _assignment.erase(found);
612 KAFKA_THROW_IF_WITH_ERROR(result);
619 if (!_userSubscription.empty())
621 KAFKA_THROW_ERROR(
Error(RD_KAFKA_RESP_ERR__FAIL,
"Unexpected Operation! Once subscribe() was used, assign() should not be called any more!"));
624 _userAssignment = topicPartitions;
626 changeAssignment(isCooperativeEnabled() ? PartitionsRebalanceEvent::IncrementalAssign : PartitionsRebalanceEvent::Assign,
631 inline TopicPartitions
634 rd_kafka_topic_partition_list_t* raw_tps =
nullptr;
635 const Error result{ rd_kafka_assignment(getClientHandle(), &raw_tps) };
637 auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(raw_tps);
639 KAFKA_THROW_IF_WITH_ERROR(result);
641 return getTopicPartitions(rk_tps.get());
647 KafkaConsumer::seek(
const TopicPartition& topicPartition, Offset offset, std::chrono::milliseconds timeout)
649 const std::string topicPartitionStr = toString(topicPartition);
650 KAFKA_API_DO_LOG(Log::Level::Info,
"will seek with topic-partition[%s], offset[%d]", topicPartitionStr.c_str(), offset);
652 auto rkt = rd_kafka_topic_unique_ptr(rd_kafka_topic_new(getClientHandle(), topicPartition.first.c_str(),
nullptr));
655 KAFKA_THROW_ERROR(
Error(rd_kafka_last_error()));
658 const auto end = std::chrono::steady_clock::now() + timeout;
660 rd_kafka_resp_err_t respErr = RD_KAFKA_RESP_ERR_NO_ERROR;
663 respErr = rd_kafka_seek(rkt.get(), topicPartition.second, offset, SEEK_RETRY_INTERVAL_MS);
664 if (respErr != RD_KAFKA_RESP_ERR__STATE && respErr != RD_KAFKA_RESP_ERR__TIMED_OUT && respErr != RD_KAFKA_RESP_ERR__OUTDATED)
671 std::this_thread::yield();
672 }
while (std::chrono::steady_clock::now() < end);
674 KAFKA_THROW_IF_WITH_ERROR(
Error(respErr));
676 KAFKA_API_DO_LOG(Log::Level::Info,
"seeked with topic-partition[%s], offset[%d]", topicPartitionStr.c_str(), offset);
680 KafkaConsumer::seekToBeginningOrEnd(
const TopicPartitions& topicPartitions,
bool toBeginning, std::chrono::milliseconds timeout)
682 for (
const auto& topicPartition: topicPartitions)
684 seek(topicPartition, (toBeginning ? RD_KAFKA_OFFSET_BEGINNING : RD_KAFKA_OFFSET_END), timeout);
691 auto rk_tp = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList({topicPartition}));
693 const Error error{ rd_kafka_position(getClientHandle(), rk_tp.get()) };
694 KAFKA_THROW_IF_WITH_ERROR(error);
696 return rk_tp->elems[0].offset;
699 inline std::map<TopicPartition, Offset>
701 std::chrono::time_point<std::chrono::system_clock> timepoint,
702 std::chrono::milliseconds timeout)
const
704 if (topicPartitions.empty())
return {};
706 auto msSinceEpoch = std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count();
708 auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topicPartitions));
710 for (
int i = 0; i < rk_tpos->cnt; ++i)
712 rd_kafka_topic_partition_t& rk_tp = rk_tpos->elems[i];
714 rk_tp.offset = msSinceEpoch;
717 Error error{ rd_kafka_offsets_for_times(getClientHandle(), rk_tpos.get(),
static_cast<int>(timeout.count())) };
718 KAFKA_THROW_IF_WITH_ERROR(error);
720 auto results = getTopicPartitionOffsets(rk_tpos.get());
723 for (
auto it = results.begin(); it != results.end(); )
725 it = ((it->second == msSinceEpoch) ? results.erase(it) : std::next(it));
731 inline std::map<TopicPartition, Offset>
732 KafkaConsumer::getOffsets(
const TopicPartitions& topicPartitions,
734 std::chrono::milliseconds timeout)
const
736 std::map<TopicPartition, Offset> result;
738 for (
const auto& topicPartition: topicPartitions)
740 Offset beginning{}, end{};
741 const Error error{ rd_kafka_query_watermark_offsets(getClientHandle(),
742 topicPartition.first.c_str(),
743 topicPartition.second,
746 static_cast<int>(timeout.count())) };
747 KAFKA_THROW_IF_WITH_ERROR(error);
749 result[topicPartition] = (atBeginning ? beginning : end);
757 KafkaConsumer::commit(
const TopicPartitionOffsets& topicPartitionOffsets, CommitType type)
759 auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(topicPartitionOffsets.empty() ?
nullptr : createRkTopicPartitionList(topicPartitionOffsets));
761 Error error{ rd_kafka_commit(getClientHandle(), rk_tpos.get(), type == CommitType::Async ? 1 : 0) };
763 if (topicPartitionOffsets.empty() && error.value() == RD_KAFKA_RESP_ERR__NO_OFFSET)
768 KAFKA_THROW_IF_WITH_ERROR(error);
775 auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList({topicPartition}));
777 const Error error {rd_kafka_committed(getClientHandle(), rk_tps.get(), TIMEOUT_INFINITE) };
778 KAFKA_THROW_IF_WITH_ERROR(error);
780 return rk_tps->elems[0].offset;
785 KafkaConsumer::commitStoredOffsetsIfNecessary(CommitType type)
787 if (_enableAutoCommit && !_offsetsToStore.empty())
789 for (
auto& o: _offsetsToStore)
793 commit(_offsetsToStore, type);
794 _offsetsToStore.clear();
800 KafkaConsumer::storeOffsetsIfNecessary(
const std::vector<consumer::ConsumerRecord>& records)
802 if (_enableAutoCommit)
804 for (
const auto& record: records)
806 _offsetsToStore[TopicPartition(record.topic(), record.partition())] = record.offset();
812 inline std::vector<consumer::ConsumerRecord>
816 commitStoredOffsetsIfNecessary(CommitType::Async);
819 std::vector<rd_kafka_message_t*> msgPtrArray(_maxPollRecords);
820 auto msgReceived = rd_kafka_consume_batch_queue(_rk_queue.get(), convertMsDurationToInt(timeout), msgPtrArray.data(), _maxPollRecords);
823 KAFKA_THROW_ERROR(
Error(rd_kafka_last_error()));
827 std::vector<consumer::ConsumerRecord> records(msgPtrArray.begin(), msgPtrArray.begin() + msgReceived);
830 storeOffsetsIfNecessary(records);
836 KafkaConsumer::pauseOrResumePartitions(
const TopicPartitions& topicPartitions, PauseOrResumeOperation op)
838 auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topicPartitions));
840 const Error error{ (op == PauseOrResumeOperation::Pause) ?
841 rd_kafka_pause_partitions(getClientHandle(), rk_tpos.get())
842 : rd_kafka_resume_partitions(getClientHandle(), rk_tpos.get()) };
843 KAFKA_THROW_IF_WITH_ERROR(error);
845 const char* opString = (op == PauseOrResumeOperation::Pause) ?
"pause" :
"resume";
847 for (
int i = 0; i < rk_tpos->cnt; ++i)
849 const rd_kafka_topic_partition_t& rk_tp = rk_tpos->elems[i];
850 if (rk_tp.err != RD_KAFKA_RESP_ERR_NO_ERROR)
852 KAFKA_API_DO_LOG(Log::Level::Err,
"%s topic-partition[%s-%d] error[%s]", opString, rk_tp.topic, rk_tp.partition, rd_kafka_err2str(rk_tp.err));
856 KAFKA_API_DO_LOG(Log::Level::Notice,
"%sd topic-partition[%s-%d]", opString, rk_tp.topic, rk_tp.partition, rd_kafka_err2str(rk_tp.err));
861 if (cnt == 0 && op == PauseOrResumeOperation::Pause)
863 const std::string errMsg = std::string(
"No partition could be ") + opString + std::string(
"d among TopicPartitions[") + toString(topicPartitions) + std::string(
"]");
864 KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg));
871 pauseOrResumePartitions(topicPartitions, PauseOrResumeOperation::Pause);
883 pauseOrResumePartitions(topicPartitions, PauseOrResumeOperation::Resume);
894 KafkaConsumer::onRebalance(rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* rk_partitions)
896 const TopicPartitions tps = getTopicPartitions(rk_partitions);
897 const std::string tpsStr = toString(tps);
899 if (err != RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && err != RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
901 KAFKA_API_DO_LOG(Log::Level::Err,
"unknown re-balance event[%d], topic-partitions[%s]", err, tpsStr.c_str());
906 if (!_cooperativeEnabled)
908 if (
const char* protocol = rd_kafka_rebalance_protocol(getClientHandle()))
910 _cooperativeEnabled = (std::string(protocol) ==
"COOPERATIVE");
914 KAFKA_API_DO_LOG(Log::Level::Notice,
"re-balance event triggered[%s], cooperative[%s], topic-partitions[%s]",
915 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
"ASSIGN_PARTITIONS" :
"REVOKE_PARTITIONS",
916 isCooperativeEnabled() ?
"enabled" :
"disabled",
921 && ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && *_pendingEvent == PendingEvent::PartitionsAssignment)
922 || (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS && *_pendingEvent == PendingEvent::PartitionsRevocation)))
924 _pendingEvent.reset();
927 const PartitionsRebalanceEvent
event = (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
928 (isCooperativeEnabled() ? PartitionsRebalanceEvent::IncrementalAssign : PartitionsRebalanceEvent::Assign)
929 : (isCooperativeEnabled() ? PartitionsRebalanceEvent::IncrementalUnassign : PartitionsRebalanceEvent::Revoke));
931 if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
933 changeAssignment(event, tps);
938 _rebalanceCb(err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? consumer::RebalanceEventType::PartitionsAssigned : consumer::RebalanceEventType::PartitionsRevoked,
942 if (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
944 changeAssignment(event, isCooperativeEnabled() ? tps : TopicPartitions{});
950 KafkaConsumer::rebalanceCallback(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions,
void* )
952 KafkaClient& client = kafkaClient(rk);
954 consumer.onRebalance(err, partitions);
959 KafkaConsumer::offsetCommitCallback(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* rk_tpos,
void* opaque)
961 const TopicPartitionOffsets tpos = getTopicPartitionOffsets(rk_tpos);
963 if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
965 auto tposStr = toString(tpos);
966 kafkaClient(rk).KAFKA_API_DO_LOG(Log::Level::Err,
"invoked offset-commit callback. offsets[%s], result[%s]", tposStr.c_str(), rd_kafka_err2str(err));
969 auto* cb =
static_cast<consumer::OffsetCommitCallback*
>(opaque);
972 (*cb)(tpos, Error(err));
977 inline consumer::ConsumerGroupMetadata
986 commit(TopicPartitionOffsets(), CommitType::Sync);
992 TopicPartitionOffsets tpos;
996 commit(tpos, CommitType::Sync);
1002 commit(topicPartitionOffsets, CommitType::Sync);
1008 auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(topicPartitionOffsets.empty() ?
nullptr : createRkTopicPartitionList(topicPartitionOffsets));
1010 const Error error{ rd_kafka_commit_queue(getClientHandle(),
1013 &KafkaConsumer::offsetCommitCallback,
1014 new consumer::OffsetCommitCallback(offsetCommitCallback)) };
1015 KAFKA_THROW_IF_WITH_ERROR(error);
1021 TopicPartitionOffsets tpos;
1030 commitAsync(TopicPartitionOffsets(), offsetCommitCallback);
Unified error type.
Definition: Error.h:32
Specific exception for Kafka clients.
Definition: KafkaException.h:22
const char * what() const noexcept override
Obtains explanatory string.
Definition: KafkaException.h:39
The properties for Kafka clients.
Definition: Properties.h:24
void remove(const std::string &key)
Remove the property (if one exists).
Definition: Properties.h:136
Optional< std::string > getProperty(const std::string &key) const
Get a property.
Definition: Properties.h:170
The base class for Kafka clients.
Definition: KafkaClient.h:34
const Properties & properties() const
Return the properties which took effect.
Definition: KafkaClient.h:56
static constexpr const char * ENABLE_AUTO_COMMIT
Automatically commits previously polled offsets on each poll operation.
Definition: ConsumerConfig.h:31
static constexpr const char * GROUP_ID
Group identifier.
Definition: ConsumerConfig.h:26
static constexpr const char * MAX_POLL_RECORDS
This controls the maximum number of records that a single call to poll() will return.
Definition: ConsumerConfig.h:51
A key/value pair to be received from Kafka.
Definition: ConsumerRecord.h:22
Offset offset() const
The position of this record in the corresponding Kafka partition.
Definition: ConsumerRecord.h:40
Partition partition() const
The partition from which this record is received.
Definition: ConsumerRecord.h:35
Topic topic() const
The topic this record is received from.
Definition: ConsumerRecord.h:30
KafkaConsumer class.
Definition: KafkaConsumer.h:26
std::vector< consumer::ConsumerRecord > poll(std::chrono::milliseconds timeout)
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.
Definition: KafkaConsumer.h:813
KafkaConsumer(const Properties &properties)
The constructor for KafkaConsumer.
Definition: KafkaConsumer.h:371
~KafkaConsumer() override
The destructor for KafkaConsumer.
Definition: KafkaConsumer.h:43
Offset committed(const TopicPartition &topicPartition)
Get the last committed offset for the given partition (whether the commit happened by this process or...
Definition: KafkaConsumer.h:773
std::map< TopicPartition, Offset > offsetsForTime(const TopicPartitions &topicPartitions, std::chrono::time_point< std::chrono::system_clock > timepoint, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
Get the offsets for the given partitions by time-point.
Definition: KafkaConsumer.h:700
void close()
Close the consumer, waiting for any needed cleanup.
Definition: KafkaConsumer.h:424
std::map< TopicPartition, Offset > beginningOffsets(const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
Get the first offset for the given partitions.
Definition: KafkaConsumer.h:136
Topics subscription() const
Get the current subscription.
Definition: KafkaConsumer.h:548
void unsubscribe(std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_UNSUBSCRIBE_TIMEOUT_MS))
Unsubscribe from topics currently subscribed.
Definition: KafkaConsumer.h:503
Offset position(const TopicPartition &topicPartition) const
Get the offset of the next record that will be fetched (if a record with that offset exists).
Definition: KafkaConsumer.h:689
void seekToBeginning(const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
Seek to the first offset for each of the given partitions.
Definition: KafkaConsumer.h:108
std::string getGroupId() const
To get group ID.
Definition: KafkaConsumer.h:53
void commitSync()
Commit offsets returned on the last poll() for all the subscribed list of topics and partitions.
Definition: KafkaConsumer.h:984
consumer::ConsumerGroupMetadata groupMetadata()
Return the current group metadata associated with this consumer.
Definition: KafkaConsumer.h:978
void resume()
Resume all partitions which have been paused with pause().
Definition: KafkaConsumer.h:887
std::map< TopicPartition, Offset > endOffsets(const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
Get the last offset for the given partitions.
Definition: KafkaConsumer.h:148
void assign(const TopicPartitions &topicPartitions)
Manually assign a list of partitions to this consumer.
Definition: KafkaConsumer.h:617
void commitAsync(const consumer::OffsetCommitCallback &offsetCommitCallback=consumer::NullOffsetCommitCallback)
Commit offsets returned on the last poll() for all the subscribed list of topics and partition.
Definition: KafkaConsumer.h:1028
void subscribe(const Topics &topics, consumer::RebalanceCallback rebalanceCallback=consumer::NullRebalanceCallback, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SUBSCRIBE_TIMEOUT_MS))
Subscribe to the given list of topics to get dynamically assigned partitions.
Definition: KafkaConsumer.h:459
void seek(const TopicPartition &topicPartition, Offset offset, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
Overrides the fetch offsets that the consumer will use on the next poll(timeout).
Definition: KafkaConsumer.h:647
void seekToEnd(const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
Seek to the last offset for each of the given partitions.
Definition: KafkaConsumer.h:121
void setGroupId(const std::string &id)
To set group ID.
Definition: KafkaConsumer.h:58
void pause()
Suspend fetching from all assigned partitions.
Definition: KafkaConsumer.h:875
TopicPartitions assignment() const
Get the set of partitions currently assigned to this consumer.
Definition: KafkaConsumer.h:632