Modern C++ Kafka API
KafkaConsumer.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/ConsumerCommon.h>
6 #include <kafka/ConsumerConfig.h>
7 #include <kafka/ConsumerRecord.h>
8 #include <kafka/Error.h>
9 #include <kafka/KafkaClient.h>
10 
11 #include <librdkafka/rdkafka.h>
12 
13 #include <algorithm>
14 #include <cassert>
15 #include <chrono>
16 #include <iterator>
17 #include <memory>
18 
19 
20 namespace KAFKA_API { namespace clients { namespace consumer {
21 
26 {
27 public:
28  // Default value for property "max.poll.records" (which is same with Java API)
29  static const constexpr char* DEFAULT_MAX_POLL_RECORDS_VALUE = "500";
30 
38  explicit KafkaConsumer(const Properties& properties);
39 
43  ~KafkaConsumer() override { if (_opened) close(); }
44 
48  void close();
49 
53  std::string getGroupId() const { return _groupId; }
54 
58  void setGroupId(const std::string& id) { _groupId = id; }
59 
64  void subscribe(const Topics& topics,
65  consumer::RebalanceCallback rebalanceCallback = consumer::NullRebalanceCallback,
66  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SUBSCRIBE_TIMEOUT_MS));
70  Topics subscription() const;
71 
75  void unsubscribe(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_UNSUBSCRIBE_TIMEOUT_MS));
76 
81  void assign(const TopicPartitions& topicPartitions);
82 
86  TopicPartitions assignment() const;
87 
88  // Seek & Position
97  void seek(const TopicPartition& topicPartition, Offset offset, std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS));
98 
108  void seekToBeginning(const TopicPartitions& topicPartitions,
109  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(topicPartitions, true, timeout); }
110  void seekToBeginning(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(_assignment, true, timeout); }
111 
121  void seekToEnd(const TopicPartitions& topicPartitions,
122  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(topicPartitions, false, timeout); }
123  void seekToEnd(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(_assignment, false, timeout); }
124 
128  Offset position(const TopicPartition& topicPartition) const;
129 
136  std::map<TopicPartition, Offset> beginningOffsets(const TopicPartitions& topicPartitions,
137  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
138  {
139  return getOffsets(topicPartitions, true, timeout);
140  }
141 
148  std::map<TopicPartition, Offset> endOffsets(const TopicPartitions& topicPartitions,
149  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
150  {
151  return getOffsets(topicPartitions, false, timeout);
152  }
153 
161  std::map<TopicPartition, Offset> offsetsForTime(const TopicPartitions& topicPartitions,
162  std::chrono::time_point<std::chrono::system_clock> timepoint,
163  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const;
164 
168  void commitSync();
169 
173  void commitSync(const consumer::ConsumerRecord& record);
174 
178  void commitSync(const TopicPartitionOffsets& topicPartitionOffsets);
179 
184  void commitAsync(const consumer::OffsetCommitCallback& offsetCommitCallback = consumer::NullOffsetCommitCallback);
185 
190  void commitAsync(const consumer::ConsumerRecord& record, const consumer::OffsetCommitCallback& offsetCommitCallback = consumer::NullOffsetCommitCallback);
191 
196  void commitAsync(const TopicPartitionOffsets& topicPartitionOffsets, const consumer::OffsetCommitCallback& offsetCommitCallback = consumer::NullOffsetCommitCallback);
197 
204  Offset committed(const TopicPartition& topicPartition);
205 
214  std::vector<consumer::ConsumerRecord> poll(std::chrono::milliseconds timeout);
215 
224  void pause(const TopicPartitions& topicPartitions);
225 
230  void pause();
231 
236  void resume(const TopicPartitions& topicPartitions);
237 
241  void resume();
242 
247 
248 private:
249  static const constexpr char* ENABLE_AUTO_OFFSET_STORE = "enable.auto.offset.store";
250  static const constexpr char* AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
251 
252 #if COMPILER_SUPPORTS_CPP_17
253  static constexpr int DEFAULT_SUBSCRIBE_TIMEOUT_MS = 30000;
254  static constexpr int DEFAULT_UNSUBSCRIBE_TIMEOUT_MS = 10000;
255  static constexpr int DEFAULT_QUERY_TIMEOUT_MS = 10000;
256  static constexpr int DEFAULT_SEEK_TIMEOUT_MS = 10000;
257  static constexpr int SEEK_RETRY_INTERVAL_MS = 5000;
258 #else
259  enum { DEFAULT_SUBSCRIBE_TIMEOUT_MS = 30000 };
260  enum { DEFAULT_UNSUBSCRIBE_TIMEOUT_MS = 10000 };
261  enum { DEFAULT_QUERY_TIMEOUT_MS = 10000 };
262  enum { DEFAULT_SEEK_TIMEOUT_MS = 10000 };
263  enum { SEEK_RETRY_INTERVAL_MS = 5000 };
264 #endif
265 
266  enum class CommitType { Sync, Async };
267  void commit(const TopicPartitionOffsets& topicPartitionOffsets, CommitType type);
268 
269  // Offset Commit Callback (for librdkafka)
270  static void offsetCommitCallback(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* rk_tpos, void* opaque);
271 
272  // Validate properties (and fix it if necesary)
273  static Properties validateAndReformProperties(Properties properties);
274 
275  void commitStoredOffsetsIfNecessary(CommitType type);
276  void storeOffsetsIfNecessary(const std::vector<consumer::ConsumerRecord>& records);
277 
278  void seekToBeginningOrEnd(const TopicPartitions& topicPartitions, bool toBeginning, std::chrono::milliseconds timeout);
279  std::map<TopicPartition, Offset> getOffsets(const TopicPartitions& topicPartitions,
280  bool atBeginning,
281  std::chrono::milliseconds timeout) const;
282 
283  enum class PartitionsRebalanceEvent { Assign, Revoke, IncrementalAssign, IncrementalUnassign };
284  void changeAssignment(PartitionsRebalanceEvent event, const TopicPartitions& tps);
285 
286  std::string _groupId;
287 
288  std::size_t _maxPollRecords = 500; // From "max.poll.records" property, and here is the default for batch-poll
289  bool _enableAutoCommit = true; // From "enable.auto.commit" property
290 
291  rd_kafka_queue_unique_ptr _rk_queue;
292 
293  // Save assignment info (from "assign()" call or rebalance callback) locally, to accelerate seeking procedure
294  TopicPartitions _assignment;
295  // Assignment from user's input, -- by calling "assign()"
296  TopicPartitions _userAssignment;
297  // Subscription from user's input, -- by calling "subscribe()"
298  Topics _userSubscription;
299 
300  enum class PendingEvent { PartitionsAssignment, PartitionsRevocation };
301  Optional<PendingEvent> _pendingEvent;
302 
303  // Identify whether the "partition.assignment.strategy" is "cooperative-sticky"
304  Optional<bool> _cooperativeEnabled;
305  bool isCooperativeEnabled() const { return _cooperativeEnabled && *_cooperativeEnabled; }
306 
307  // The offsets to store (and commit later)
308  std::map<TopicPartition, Offset> _offsetsToStore;
309 
310  // Register Callbacks for rd_kafka_conf_t
311  static void registerConfigCallbacks(rd_kafka_conf_t* conf);
312 
313  enum class PauseOrResumeOperation { Pause, Resume };
314  void pauseOrResumePartitions(const TopicPartitions& topicPartitions, PauseOrResumeOperation op);
315 
316  // Rebalance Callback (for librdkafka)
317  static void rebalanceCallback(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque);
318  // Rebalance Callback (for class instance)
319  void onRebalance(rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* rk_partitions);
320 
321  consumer::RebalanceCallback _rebalanceCb;
322 
323  rd_kafka_queue_t* getCommitCbQueue() { return _rk_commit_cb_queue.get(); }
324 
325  rd_kafka_queue_unique_ptr _rk_commit_cb_queue;
326 
327  void pollCallbacks(int timeoutMs)
328  {
329  rd_kafka_queue_t* queue = getCommitCbQueue();
330  rd_kafka_queue_poll_callback(queue, timeoutMs);
331  }
332 };
333 
334 
335 // Validate properties (and fix it if necesary)
336 inline Properties
337 KafkaConsumer::validateAndReformProperties(Properties properties)
338 {
339  using namespace consumer;
340 
341  // Don't pass the "max.poll.records" property to librdkafka
343 
344  // Let the base class validate first
345  auto newProperties = KafkaClient::validateAndReformProperties(properties);
346 
347  // If no "group.id" configured, generate a random one for user
348  if (!newProperties.getProperty(ConsumerConfig::GROUP_ID))
349  {
350  newProperties.put(ConsumerConfig::GROUP_ID, utility::getRandomString());
351  }
352 
353  // Disable the internal auto-commit from librdkafka, since we want to customize the behavior
354  newProperties.put(ConsumerConfig::ENABLE_AUTO_COMMIT, "false");
355  newProperties.put(AUTO_COMMIT_INTERVAL_MS, "0");
356  newProperties.put(ENABLE_AUTO_OFFSET_STORE, "true");
357 
358  return newProperties;
359 }
360 
361 // Register Callbacks for rd_kafka_conf_t
362 inline void
363 KafkaConsumer::registerConfigCallbacks(rd_kafka_conf_t* conf)
364 {
365  // Rebalance Callback
366  // would turn off librdkafka's automatic partition assignment/revocation
367  rd_kafka_conf_set_rebalance_cb(conf, KafkaConsumer::rebalanceCallback);
368 }
369 
370 inline
372  : KafkaClient(ClientType::KafkaConsumer, validateAndReformProperties(properties), registerConfigCallbacks)
373 {
374  using namespace consumer;
375 
376  // Pick up the "max.poll.records" property
377  if (auto maxPollRecordsProperty = properties.getProperty(ConsumerConfig::MAX_POLL_RECORDS))
378  {
379  const std::string maxPollRecords = *maxPollRecordsProperty;
380  _maxPollRecords = static_cast<std::size_t>(std::stoi(maxPollRecords));
381  }
382  _properties.put(ConsumerConfig::MAX_POLL_RECORDS, std::to_string(_maxPollRecords));
383 
384  // Pick up the "enable.auto.commit" property
385  if (auto enableAutoCommitProperty = properties.getProperty(ConsumerConfig::ENABLE_AUTO_COMMIT))
386  {
387  const std::string enableAutoCommit = *enableAutoCommitProperty;
388 
389  auto isTrue = [](const std::string& str) { return str == "1" || str == "true"; };
390  auto isFalse = [](const std::string& str) { return str == "0" || str == "false"; };
391 
392  if (!isTrue(enableAutoCommit) && !isFalse(enableAutoCommit))
393  {
394  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid property[enable.auto.commit=").append(enableAutoCommit).append("], which MUST be true(1) or false(0)!")));
395  }
396 
397  _enableAutoCommit = isTrue(enableAutoCommit);
398  }
399  _properties.put(ConsumerConfig::ENABLE_AUTO_COMMIT, (_enableAutoCommit ? "true" : "false"));
400 
401  // Fetch groupId from reformed configuration
402  auto groupId = _properties.getProperty(ConsumerConfig::GROUP_ID);
403  assert(groupId);
404  setGroupId(*groupId);
405 
406  // Redirect the reply queue (to the client group queue)
407  const Error result{ rd_kafka_poll_set_consumer(getClientHandle()) };
408  KAFKA_THROW_IF_WITH_ERROR(result);
409 
410  // Initialize message-fetching queue
411  _rk_queue.reset(rd_kafka_queue_get_consumer(getClientHandle()));
412 
413  // Initialize commit-callback queue
414  _rk_commit_cb_queue.reset(rd_kafka_queue_new(getClientHandle()));
415 
416  // Start background polling (if needed)
417  startBackgroundPollingIfNecessary([this](int timeoutMs){ pollCallbacks(timeoutMs); });
418 
419  const auto propsStr = KafkaClient::properties().toString();
420  KAFKA_API_DO_LOG(Log::Level::Notice, "initialized with properties[%s]", propsStr.c_str());
421 }
422 
423 inline void
425 {
426  _opened = false;
427 
428  stopBackgroundPollingIfNecessary();
429 
430  try
431  {
432  // Commit the offsets for these messages which had been polled last time (for `enable.auto.commit=true` case.)
433  commitStoredOffsetsIfNecessary(CommitType::Sync);
434  }
435  catch (const KafkaException& e)
436  {
437  KAFKA_API_DO_LOG(Log::Level::Err, "met error[%s] while closing", e.what());
438  }
439 
440  rd_kafka_consumer_close(getClientHandle());
441 
442  while (rd_kafka_outq_len(getClientHandle()))
443  {
444  rd_kafka_poll(getClientHandle(), KafkaClient::TIMEOUT_INFINITE);
445  }
446 
447  rd_kafka_queue_t* queue = getCommitCbQueue();
448  while (rd_kafka_queue_length(queue))
449  {
450  rd_kafka_queue_poll_callback(queue, TIMEOUT_INFINITE);
451  }
452 
453  KAFKA_API_DO_LOG(Log::Level::Notice, "closed");
454 }
455 
456 
457 // Subscription
458 inline void
459 KafkaConsumer::subscribe(const Topics& topics, consumer::RebalanceCallback rebalanceCallback, std::chrono::milliseconds timeout)
460 {
461  if (!_userAssignment.empty())
462  {
463  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once assign() was used, subscribe() should not be called any more!"));
464  }
465 
466  if (isCooperativeEnabled() && topics == _userSubscription)
467  {
468  KAFKA_API_DO_LOG(Log::Level::Info, "skip subscribe (no change since last time)");
469  return;
470  }
471 
472  _userSubscription = topics;
473 
474  const std::string topicsStr = toString(topics);
475  KAFKA_API_DO_LOG(Log::Level::Info, "will subscribe, topics[%s]", topicsStr.c_str());
476 
477  _rebalanceCb = std::move(rebalanceCallback);
478 
479  auto rk_topics = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topics));
480 
481  const Error result{ rd_kafka_subscribe(getClientHandle(), rk_topics.get()) };
482  KAFKA_THROW_IF_WITH_ERROR(result);
483 
484  _pendingEvent = PendingEvent::PartitionsAssignment;
485 
486  // The rebalcance callback would be served during the time (within this thread)
487  for (const auto end = std::chrono::steady_clock::now() + timeout; std::chrono::steady_clock::now() < end; )
488  {
489  rd_kafka_poll(getClientHandle(), EVENT_POLLING_INTERVAL_MS);
490 
491  if (!_pendingEvent)
492  {
493  KAFKA_API_DO_LOG(Log::Level::Notice, "subscribed, topics[%s]", topicsStr.c_str());
494  return;
495  }
496  }
497 
498  _pendingEvent.reset();
499  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "subscribe() timed out!"));
500 }
501 
502 inline void
503 KafkaConsumer::unsubscribe(std::chrono::milliseconds timeout)
504 {
505  if (_userSubscription.empty() && _userAssignment.empty())
506  {
507  KAFKA_API_DO_LOG(Log::Level::Info, "skip unsubscribe (no assignment/subscription yet)");
508  return;
509  }
510 
511  KAFKA_API_DO_LOG(Log::Level::Info, "will unsubscribe");
512 
513  // While it's for the previous `assign(...)`
514  if (!_userAssignment.empty())
515  {
516  changeAssignment(isCooperativeEnabled() ? PartitionsRebalanceEvent::IncrementalUnassign : PartitionsRebalanceEvent::Revoke,
517  _userAssignment);
518  _userAssignment.clear();
519 
520  KAFKA_API_DO_LOG(Log::Level::Notice, "unsubscribed (the previously assigned partitions)");
521  return;
522  }
523 
524  _userSubscription.clear();
525 
526  const Error result{ rd_kafka_unsubscribe(getClientHandle()) };
527  KAFKA_THROW_IF_WITH_ERROR(result);
528 
529  _pendingEvent = PendingEvent::PartitionsRevocation;
530 
531  // The rebalance callback would be served during the time (within this thread)
532  for (const auto end = std::chrono::steady_clock::now() + timeout; std::chrono::steady_clock::now() < end; )
533  {
534  rd_kafka_poll(getClientHandle(), EVENT_POLLING_INTERVAL_MS);
535 
536  if (!_pendingEvent)
537  {
538  KAFKA_API_DO_LOG(Log::Level::Notice, "unsubscribed");
539  return;
540  }
541  }
542 
543  _pendingEvent.reset();
544  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "unsubscribe() timed out!"));
545 }
546 
547 inline Topics
549 {
550  rd_kafka_topic_partition_list_t* raw_topics = nullptr;
551  const Error result{ rd_kafka_subscription(getClientHandle(), &raw_topics) };
552  auto rk_topics = rd_kafka_topic_partition_list_unique_ptr(raw_topics);
553 
554  KAFKA_THROW_IF_WITH_ERROR(result);
555 
556  return getTopics(rk_topics.get());
557 }
558 
559 inline void
560 KafkaConsumer::changeAssignment(PartitionsRebalanceEvent event, const TopicPartitions& tps)
561 {
562  auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(tps));
563 
564  Error result;
565  switch (event)
566  {
567  case PartitionsRebalanceEvent::Assign:
568  result = Error{ rd_kafka_assign(getClientHandle(), rk_tps.get()) };
569  // Update assignment
570  _assignment = tps;
571  break;
572 
573  case PartitionsRebalanceEvent::Revoke:
574  result = Error{ rd_kafka_assign(getClientHandle(), nullptr) };
575  // Update assignment
576  _assignment.clear();
577  break;
578 
579  case PartitionsRebalanceEvent::IncrementalAssign:
580  result = Error{ rd_kafka_incremental_assign(getClientHandle(), rk_tps.get()) };
581  // Update assignment
582  for (const auto& tp: tps)
583  {
584  auto found = _assignment.find(tp);
585  if (found != _assignment.end())
586  {
587  const std::string tpStr = toString(tp);
588  KAFKA_API_DO_LOG(Log::Level::Err, "incremental assign partition[%s] has already been assigned", tpStr.c_str());
589  continue;
590  }
591  _assignment.emplace(tp);
592  }
593  break;
594 
595  case PartitionsRebalanceEvent::IncrementalUnassign:
596  result = Error{ rd_kafka_incremental_unassign(getClientHandle(), rk_tps.get()) };
597  // Update assignment
598  for (const auto& tp: tps)
599  {
600  auto found = _assignment.find(tp);
601  if (found == _assignment.end())
602  {
603  const std::string tpStr = toString(tp);
604  KAFKA_API_DO_LOG(Log::Level::Err, "incremental unassign partition[%s] could not be found", tpStr.c_str());
605  continue;
606  }
607  _assignment.erase(found);
608  }
609  break;
610  }
611 
612  KAFKA_THROW_IF_WITH_ERROR(result);
613 }
614 
615 // Assign Topic-Partitions
616 inline void
617 KafkaConsumer::assign(const TopicPartitions& topicPartitions)
618 {
619  if (!_userSubscription.empty())
620  {
621  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once subscribe() was used, assign() should not be called any more!"));
622  }
623 
624  _userAssignment = topicPartitions;
625 
626  changeAssignment(isCooperativeEnabled() ? PartitionsRebalanceEvent::IncrementalAssign : PartitionsRebalanceEvent::Assign,
627  topicPartitions);
628 }
629 
630 // Assignment
631 inline TopicPartitions
633 {
634  rd_kafka_topic_partition_list_t* raw_tps = nullptr;
635  const Error result{ rd_kafka_assignment(getClientHandle(), &raw_tps) };
636 
637  auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(raw_tps);
638 
639  KAFKA_THROW_IF_WITH_ERROR(result);
640 
641  return getTopicPartitions(rk_tps.get());
642 }
643 
644 
645 // Seek & Position
646 inline void
647 KafkaConsumer::seek(const TopicPartition& topicPartition, Offset offset, std::chrono::milliseconds timeout)
648 {
649  const std::string topicPartitionStr = toString(topicPartition);
650  KAFKA_API_DO_LOG(Log::Level::Info, "will seek with topic-partition[%s], offset[%d]", topicPartitionStr.c_str(), offset);
651 
652  auto rkt = rd_kafka_topic_unique_ptr(rd_kafka_topic_new(getClientHandle(), topicPartition.first.c_str(), nullptr));
653  if (!rkt)
654  {
655  KAFKA_THROW_ERROR(Error(rd_kafka_last_error()));
656  }
657 
658  const auto end = std::chrono::steady_clock::now() + timeout;
659 
660  rd_kafka_resp_err_t respErr = RD_KAFKA_RESP_ERR_NO_ERROR;
661  do
662  {
663  respErr = rd_kafka_seek(rkt.get(), topicPartition.second, offset, SEEK_RETRY_INTERVAL_MS);
664  if (respErr != RD_KAFKA_RESP_ERR__STATE && respErr != RD_KAFKA_RESP_ERR__TIMED_OUT && respErr != RD_KAFKA_RESP_ERR__OUTDATED)
665  {
666  break;
667  }
668 
669  // If the "seek" was called just after "assign", there's a chance that the toppar's "fetch_state" (async setted) was not ready yes.
670  // If that's the case, we would retry again (normally, just after a very short while, the "seek" would succeed)
671  std::this_thread::yield();
672  } while (std::chrono::steady_clock::now() < end);
673 
674  KAFKA_THROW_IF_WITH_ERROR(Error(respErr));
675 
676  KAFKA_API_DO_LOG(Log::Level::Info, "seeked with topic-partition[%s], offset[%d]", topicPartitionStr.c_str(), offset);
677 }
678 
679 inline void
680 KafkaConsumer::seekToBeginningOrEnd(const TopicPartitions& topicPartitions, bool toBeginning, std::chrono::milliseconds timeout)
681 {
682  for (const auto& topicPartition: topicPartitions)
683  {
684  seek(topicPartition, (toBeginning ? RD_KAFKA_OFFSET_BEGINNING : RD_KAFKA_OFFSET_END), timeout);
685  }
686 }
687 
688 inline Offset
689 KafkaConsumer::position(const TopicPartition& topicPartition) const
690 {
691  auto rk_tp = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList({topicPartition}));
692 
693  const Error error{ rd_kafka_position(getClientHandle(), rk_tp.get()) };
694  KAFKA_THROW_IF_WITH_ERROR(error);
695 
696  return rk_tp->elems[0].offset;
697 }
698 
699 inline std::map<TopicPartition, Offset>
700 KafkaConsumer::offsetsForTime(const TopicPartitions& topicPartitions,
701  std::chrono::time_point<std::chrono::system_clock> timepoint,
702  std::chrono::milliseconds timeout) const
703 {
704  if (topicPartitions.empty()) return {};
705 
706  auto msSinceEpoch = std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count();
707 
708  auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topicPartitions));
709 
710  for (int i = 0; i < rk_tpos->cnt; ++i)
711  {
712  rd_kafka_topic_partition_t& rk_tp = rk_tpos->elems[i];
713  // Here the `msSinceEpoch` would be overridden by the offset result (after called by `rd_kafka_offsets_for_times`)
714  rk_tp.offset = msSinceEpoch;
715  }
716 
717  Error error{ rd_kafka_offsets_for_times(getClientHandle(), rk_tpos.get(), static_cast<int>(timeout.count())) }; // NOLINT
718  KAFKA_THROW_IF_WITH_ERROR(error);
719 
720  auto results = getTopicPartitionOffsets(rk_tpos.get());
721 
722  // Remove invalid results (which are not updated with an valid offset)
723  for (auto it = results.begin(); it != results.end(); )
724  {
725  it = ((it->second == msSinceEpoch) ? results.erase(it) : std::next(it));
726  }
727 
728  return results;
729 }
730 
731 inline std::map<TopicPartition, Offset>
732 KafkaConsumer::getOffsets(const TopicPartitions& topicPartitions,
733  bool atBeginning,
734  std::chrono::milliseconds timeout) const
735 {
736  std::map<TopicPartition, Offset> result;
737 
738  for (const auto& topicPartition: topicPartitions)
739  {
740  Offset beginning{}, end{};
741  const Error error{ rd_kafka_query_watermark_offsets(getClientHandle(),
742  topicPartition.first.c_str(),
743  topicPartition.second,
744  &beginning,
745  &end,
746  static_cast<int>(timeout.count())) };
747  KAFKA_THROW_IF_WITH_ERROR(error);
748 
749  result[topicPartition] = (atBeginning ? beginning : end);
750  }
751 
752  return result;
753 }
754 
755 // Commit
756 inline void
757 KafkaConsumer::commit(const TopicPartitionOffsets& topicPartitionOffsets, CommitType type)
758 {
759  auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(topicPartitionOffsets.empty() ? nullptr : createRkTopicPartitionList(topicPartitionOffsets));
760 
761  Error error{ rd_kafka_commit(getClientHandle(), rk_tpos.get(), type == CommitType::Async ? 1 : 0) };
762  // No stored offset to commit (it might happen and should not be treated as a mistake)
763  if (topicPartitionOffsets.empty() && error.value() == RD_KAFKA_RESP_ERR__NO_OFFSET)
764  {
765  error = Error{};
766  }
767 
768  KAFKA_THROW_IF_WITH_ERROR(error);
769 }
770 
771 // Fetch committed offset
772 inline Offset
773 KafkaConsumer::committed(const TopicPartition& topicPartition)
774 {
775  auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList({topicPartition}));
776 
777  const Error error {rd_kafka_committed(getClientHandle(), rk_tps.get(), TIMEOUT_INFINITE) };
778  KAFKA_THROW_IF_WITH_ERROR(error);
779 
780  return rk_tps->elems[0].offset;
781 }
782 
783 // Commit stored offsets
784 inline void
785 KafkaConsumer::commitStoredOffsetsIfNecessary(CommitType type)
786 {
787  if (_enableAutoCommit && !_offsetsToStore.empty())
788  {
789  for (auto& o: _offsetsToStore)
790  {
791  ++o.second;
792  }
793  commit(_offsetsToStore, type);
794  _offsetsToStore.clear();
795  }
796 }
797 
798 // Store offsets
799 inline void
800 KafkaConsumer::storeOffsetsIfNecessary(const std::vector<consumer::ConsumerRecord>& records)
801 {
802  if (_enableAutoCommit)
803  {
804  for (const auto& record: records)
805  {
806  _offsetsToStore[TopicPartition(record.topic(), record.partition())] = record.offset();
807  }
808  }
809 }
810 
811 // Fetch messages
812 inline std::vector<consumer::ConsumerRecord>
813 KafkaConsumer::poll(std::chrono::milliseconds timeout)
814 {
815  // Commit the offsets for these messages which had been polled last time (for "enable.auto.commit=true" case)
816  commitStoredOffsetsIfNecessary(CommitType::Async);
817 
818  // Poll messages with librdkafka's API
819  std::vector<rd_kafka_message_t*> msgPtrArray(_maxPollRecords);
820  auto msgReceived = rd_kafka_consume_batch_queue(_rk_queue.get(), convertMsDurationToInt(timeout), msgPtrArray.data(), _maxPollRecords);
821  if (msgReceived < 0)
822  {
823  KAFKA_THROW_ERROR(Error(rd_kafka_last_error()));
824  }
825 
826  // Wrap messages with ConsumerRecord
827  std::vector<consumer::ConsumerRecord> records(msgPtrArray.begin(), msgPtrArray.begin() + msgReceived);
828 
829  // Store the offsets for all these polled messages (for "enable.auto.commit=true" case)
830  storeOffsetsIfNecessary(records);
831 
832  return records;
833 }
834 
835 inline void
836 KafkaConsumer::pauseOrResumePartitions(const TopicPartitions& topicPartitions, PauseOrResumeOperation op)
837 {
838  auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topicPartitions));
839 
840  const Error error{ (op == PauseOrResumeOperation::Pause) ?
841  rd_kafka_pause_partitions(getClientHandle(), rk_tpos.get())
842  : rd_kafka_resume_partitions(getClientHandle(), rk_tpos.get()) };
843  KAFKA_THROW_IF_WITH_ERROR(error);
844 
845  const char* opString = (op == PauseOrResumeOperation::Pause) ? "pause" : "resume";
846  int cnt = 0;
847  for (int i = 0; i < rk_tpos->cnt; ++i)
848  {
849  const rd_kafka_topic_partition_t& rk_tp = rk_tpos->elems[i];
850  if (rk_tp.err != RD_KAFKA_RESP_ERR_NO_ERROR)
851  {
852  KAFKA_API_DO_LOG(Log::Level::Err, "%s topic-partition[%s-%d] error[%s]", opString, rk_tp.topic, rk_tp.partition, rd_kafka_err2str(rk_tp.err));
853  }
854  else
855  {
856  KAFKA_API_DO_LOG(Log::Level::Notice, "%sd topic-partition[%s-%d]", opString, rk_tp.topic, rk_tp.partition, rd_kafka_err2str(rk_tp.err));
857  ++cnt;
858  }
859  }
860 
861  if (cnt == 0 && op == PauseOrResumeOperation::Pause)
862  {
863  const std::string errMsg = std::string("No partition could be ") + opString + std::string("d among TopicPartitions[") + toString(topicPartitions) + std::string("]");
864  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg));
865  }
866 }
867 
868 inline void
869 KafkaConsumer::pause(const TopicPartitions& topicPartitions)
870 {
871  pauseOrResumePartitions(topicPartitions, PauseOrResumeOperation::Pause);
872 }
873 
874 inline void
876 {
877  pause(_assignment);
878 }
879 
880 inline void
881 KafkaConsumer::resume(const TopicPartitions& topicPartitions)
882 {
883  pauseOrResumePartitions(topicPartitions, PauseOrResumeOperation::Resume);
884 }
885 
886 inline void
888 {
889  resume(_assignment);
890 }
891 
892 // Rebalance Callback (for class instance)
893 inline void
894 KafkaConsumer::onRebalance(rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* rk_partitions)
895 {
896  const TopicPartitions tps = getTopicPartitions(rk_partitions);
897  const std::string tpsStr = toString(tps);
898 
899  if (err != RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && err != RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
900  {
901  KAFKA_API_DO_LOG(Log::Level::Err, "unknown re-balance event[%d], topic-partitions[%s]", err, tpsStr.c_str());
902  return;
903  }
904 
905  // Initialize attribute for cooperative protocol
906  if (!_cooperativeEnabled)
907  {
908  if (const char* protocol = rd_kafka_rebalance_protocol(getClientHandle()))
909  {
910  _cooperativeEnabled = (std::string(protocol) == "COOPERATIVE");
911  }
912  }
913 
914  KAFKA_API_DO_LOG(Log::Level::Notice, "re-balance event triggered[%s], cooperative[%s], topic-partitions[%s]",
915  err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? "ASSIGN_PARTITIONS" : "REVOKE_PARTITIONS",
916  isCooperativeEnabled() ? "enabled" : "disabled",
917  tpsStr.c_str());
918 
919  // Remove the mark for pending event
920  if (_pendingEvent
921  && ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && *_pendingEvent == PendingEvent::PartitionsAssignment)
922  || (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS && *_pendingEvent == PendingEvent::PartitionsRevocation)))
923  {
924  _pendingEvent.reset();
925  }
926 
927  const PartitionsRebalanceEvent event = (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
928  (isCooperativeEnabled() ? PartitionsRebalanceEvent::IncrementalAssign : PartitionsRebalanceEvent::Assign)
929  : (isCooperativeEnabled() ? PartitionsRebalanceEvent::IncrementalUnassign : PartitionsRebalanceEvent::Revoke));
930 
931  if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
932  {
933  changeAssignment(event, tps);
934  }
935 
936  if (_rebalanceCb)
937  {
938  _rebalanceCb(err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? consumer::RebalanceEventType::PartitionsAssigned : consumer::RebalanceEventType::PartitionsRevoked,
939  tps);
940  }
941 
942  if (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
943  {
944  changeAssignment(event, isCooperativeEnabled() ? tps : TopicPartitions{});
945  }
946 }
947 
948 // Rebalance Callback (for librdkafka)
949 inline void
950 KafkaConsumer::rebalanceCallback(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* /* opaque */)
951 {
952  KafkaClient& client = kafkaClient(rk);
953  auto& consumer = dynamic_cast<KafkaConsumer&>(client);
954  consumer.onRebalance(err, partitions);
955 }
956 
957 // Offset Commit Callback (for librdkafka)
958 inline void
959 KafkaConsumer::offsetCommitCallback(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* rk_tpos, void* opaque)
960 {
961  const TopicPartitionOffsets tpos = getTopicPartitionOffsets(rk_tpos);
962 
963  if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
964  {
965  auto tposStr = toString(tpos);
966  kafkaClient(rk).KAFKA_API_DO_LOG(Log::Level::Err, "invoked offset-commit callback. offsets[%s], result[%s]", tposStr.c_str(), rd_kafka_err2str(err));
967  }
968 
969  auto* cb = static_cast<consumer::OffsetCommitCallback*>(opaque);
970  if (cb && *cb)
971  {
972  (*cb)(tpos, Error(err));
973  }
974  delete cb;
975 }
976 
977 inline consumer::ConsumerGroupMetadata
979 {
980  return consumer::ConsumerGroupMetadata{rd_kafka_consumer_group_metadata(getClientHandle())};
981 }
982 
983 inline void
985 {
986  commit(TopicPartitionOffsets(), CommitType::Sync);
987 }
988 
989 inline void
991 {
992  TopicPartitionOffsets tpos;
993  // committed offset should be "current-received-offset + 1"
994  tpos[TopicPartition(record.topic(), record.partition())] = record.offset() + 1;
995 
996  commit(tpos, CommitType::Sync);
997 }
998 
999 inline void
1000 KafkaConsumer::commitSync(const TopicPartitionOffsets& topicPartitionOffsets)
1001 {
1002  commit(topicPartitionOffsets, CommitType::Sync);
1003 }
1004 
1005 inline void
1006 KafkaConsumer::commitAsync(const TopicPartitionOffsets& topicPartitionOffsets, const consumer::OffsetCommitCallback& offsetCommitCallback)
1007 {
1008  auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(topicPartitionOffsets.empty() ? nullptr : createRkTopicPartitionList(topicPartitionOffsets));
1009 
1010  const Error error{ rd_kafka_commit_queue(getClientHandle(),
1011  rk_tpos.get(),
1012  getCommitCbQueue(),
1013  &KafkaConsumer::offsetCommitCallback,
1014  new consumer::OffsetCommitCallback(offsetCommitCallback)) };
1015  KAFKA_THROW_IF_WITH_ERROR(error);
1016 }
1017 
1018 inline void
1019 KafkaConsumer::commitAsync(const consumer::ConsumerRecord& record, const consumer::OffsetCommitCallback& offsetCommitCallback)
1020 {
1021  TopicPartitionOffsets tpos;
1022  // committed offset should be "current received record's offset" + 1
1023  tpos[TopicPartition(record.topic(), record.partition())] = record.offset() + 1;
1024  commitAsync(tpos, offsetCommitCallback);
1025 }
1026 
1027 inline void
1028 KafkaConsumer::commitAsync(const consumer::OffsetCommitCallback& offsetCommitCallback)
1029 {
1030  commitAsync(TopicPartitionOffsets(), offsetCommitCallback);
1031 }
1032 
1033 } } } // end of KAFKA_API::clients::consumer
1034 
Unified error type.
Definition: Error.h:32
Specific exception for Kafka clients.
Definition: KafkaException.h:22
const char * what() const noexcept override
Obtains explanatory string.
Definition: KafkaException.h:39
The properties for Kafka clients.
Definition: Properties.h:24
void remove(const std::string &key)
Remove the property (if one exists).
Definition: Properties.h:136
Optional< std::string > getProperty(const std::string &key) const
Get a property.
Definition: Properties.h:170
The base class for Kafka clients.
Definition: KafkaClient.h:34
const Properties & properties() const
Return the properties which took effect.
Definition: KafkaClient.h:56
static constexpr const char * ENABLE_AUTO_COMMIT
Automatically commits previously polled offsets on each poll operation.
Definition: ConsumerConfig.h:31
static constexpr const char * GROUP_ID
Group identifier.
Definition: ConsumerConfig.h:26
static constexpr const char * MAX_POLL_RECORDS
This controls the maximum number of records that a single call to poll() will return.
Definition: ConsumerConfig.h:51
A metadata struct containing the consumer group information.
Definition: ConsumerCommon.h:54
A key/value pair to be received from Kafka.
Definition: ConsumerRecord.h:22
Offset offset() const
The position of this record in the corresponding Kafka partition.
Definition: ConsumerRecord.h:40
Partition partition() const
The partition from which this record is received.
Definition: ConsumerRecord.h:35
Topic topic() const
The topic this record is received from.
Definition: ConsumerRecord.h:30
KafkaConsumer class.
Definition: KafkaConsumer.h:26
std::vector< consumer::ConsumerRecord > poll(std::chrono::milliseconds timeout)
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.
Definition: KafkaConsumer.h:813
KafkaConsumer(const Properties &properties)
The constructor for KafkaConsumer.
Definition: KafkaConsumer.h:371
~KafkaConsumer() override
The destructor for KafkaConsumer.
Definition: KafkaConsumer.h:43
Offset committed(const TopicPartition &topicPartition)
Get the last committed offset for the given partition (whether the commit happened by this process or...
Definition: KafkaConsumer.h:773
std::map< TopicPartition, Offset > offsetsForTime(const TopicPartitions &topicPartitions, std::chrono::time_point< std::chrono::system_clock > timepoint, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
Get the offsets for the given partitions by time-point.
Definition: KafkaConsumer.h:700
void close()
Close the consumer, waiting for any needed cleanup.
Definition: KafkaConsumer.h:424
std::map< TopicPartition, Offset > beginningOffsets(const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
Get the first offset for the given partitions.
Definition: KafkaConsumer.h:136
Topics subscription() const
Get the current subscription.
Definition: KafkaConsumer.h:548
void unsubscribe(std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_UNSUBSCRIBE_TIMEOUT_MS))
Unsubscribe from topics currently subscribed.
Definition: KafkaConsumer.h:503
Offset position(const TopicPartition &topicPartition) const
Get the offset of the next record that will be fetched (if a record with that offset exists).
Definition: KafkaConsumer.h:689
void seekToBeginning(const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
Seek to the first offset for each of the given partitions.
Definition: KafkaConsumer.h:108
std::string getGroupId() const
To get group ID.
Definition: KafkaConsumer.h:53
void commitSync()
Commit offsets returned on the last poll() for all the subscribed list of topics and partitions.
Definition: KafkaConsumer.h:984
consumer::ConsumerGroupMetadata groupMetadata()
Return the current group metadata associated with this consumer.
Definition: KafkaConsumer.h:978
void resume()
Resume all partitions which have been paused with pause().
Definition: KafkaConsumer.h:887
std::map< TopicPartition, Offset > endOffsets(const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
Get the last offset for the given partitions.
Definition: KafkaConsumer.h:148
void assign(const TopicPartitions &topicPartitions)
Manually assign a list of partitions to this consumer.
Definition: KafkaConsumer.h:617
void commitAsync(const consumer::OffsetCommitCallback &offsetCommitCallback=consumer::NullOffsetCommitCallback)
Commit offsets returned on the last poll() for all the subscribed list of topics and partition.
Definition: KafkaConsumer.h:1028
void subscribe(const Topics &topics, consumer::RebalanceCallback rebalanceCallback=consumer::NullRebalanceCallback, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SUBSCRIBE_TIMEOUT_MS))
Subscribe to the given list of topics to get dynamically assigned partitions.
Definition: KafkaConsumer.h:459
void seek(const TopicPartition &topicPartition, Offset offset, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
Overrides the fetch offsets that the consumer will use on the next poll(timeout).
Definition: KafkaConsumer.h:647
void seekToEnd(const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
Seek to the last offset for each of the given partitions.
Definition: KafkaConsumer.h:121
void setGroupId(const std::string &id)
To set group ID.
Definition: KafkaConsumer.h:58
void pause()
Suspend fetching from all assigned partitions.
Definition: KafkaConsumer.h:875
TopicPartitions assignment() const
Get the set of partitions currently assigned to this consumer.
Definition: KafkaConsumer.h:632