Modern C++ Kafka API
Public Member Functions | Static Public Attributes | List of all members
KAFKA_API::clients::consumer::KafkaConsumer Class Reference

KafkaConsumer class. More...

#include <KafkaConsumer.h>

Inheritance diagram for KAFKA_API::clients::consumer::KafkaConsumer:
KAFKA_API::clients::KafkaClient

Public Member Functions

 KafkaConsumer (const Properties &properties)
 The constructor for KafkaConsumer. More...
 
 ~KafkaConsumer () override
 The destructor for KafkaConsumer.
 
void close ()
 Close the consumer, waiting for any needed cleanup.
 
std::string getGroupId () const
 To get group ID.
 
void setGroupId (const std::string &id)
 To set group ID. More...
 
void subscribe (const Topics &topics, consumer::RebalanceCallback rebalanceCallback=consumer::NullRebalanceCallback, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SUBSCRIBE_TIMEOUT_MS))
 Subscribe to the given list of topics to get dynamically assigned partitions. More...
 
Topics subscription () const
 Get the current subscription.
 
void unsubscribe (std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_UNSUBSCRIBE_TIMEOUT_MS))
 Unsubscribe from topics currently subscribed.
 
void assign (const TopicPartitions &topicPartitions)
 Manually assign a list of partitions to this consumer. More...
 
TopicPartitions assignment () const
 Get the set of partitions currently assigned to this consumer.
 
void seek (const TopicPartition &topicPartition, Offset offset, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
 Overrides the fetch offsets that the consumer will use on the next poll(timeout). More...
 
void seekToBeginning (const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
 Seek to the first offset for each of the given partitions. More...
 
void seekToBeginning (std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
 
void seekToEnd (const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
 Seek to the last offset for each of the given partitions. More...
 
void seekToEnd (std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS))
 
Offset position (const TopicPartition &topicPartition) const
 Get the offset of the next record that will be fetched (if a record with that offset exists).
 
std::map< TopicPartition, Offset > beginningOffsets (const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
 Get the first offset for the given partitions. More...
 
std::map< TopicPartition, Offset > endOffsets (const TopicPartitions &topicPartitions, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
 Get the last offset for the given partitions. More...
 
std::map< TopicPartition, Offset > offsetsForTime (const TopicPartitions &topicPartitions, std::chrono::time_point< std::chrono::system_clock > timepoint, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const
 Get the offsets for the given partitions by time-point. More...
 
void commitSync ()
 Commit offsets returned on the last poll() for all the subscribed list of topics and partitions.
 
void commitSync (const consumer::ConsumerRecord &record)
 Commit the specified offsets for the specified records.
 
void commitSync (const TopicPartitionOffsets &topicPartitionOffsets)
 Commit the specified offsets for the specified list of topics and partitions.
 
void commitAsync (const consumer::OffsetCommitCallback &offsetCommitCallback=consumer::NullOffsetCommitCallback)
 Commit offsets returned on the last poll() for all the subscribed list of topics and partition. More...
 
void commitAsync (const consumer::ConsumerRecord &record, const consumer::OffsetCommitCallback &offsetCommitCallback=consumer::NullOffsetCommitCallback)
 Commit the specified offsets for the specified records Note: If a callback is provided, it's guaranteed to be triggered (before closing the consumer).
 
void commitAsync (const TopicPartitionOffsets &topicPartitionOffsets, const consumer::OffsetCommitCallback &offsetCommitCallback=consumer::NullOffsetCommitCallback)
 Commit the specified offsets for the specified list of topics and partitions to Kafka. More...
 
Offset committed (const TopicPartition &topicPartition)
 Get the last committed offset for the given partition (whether the commit happened by this process or another).This offset will be used as the position for the consumer in the event of a failure. More...
 
std::vector< consumer::ConsumerRecordpoll (std::chrono::milliseconds timeout)
 Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. More...
 
void pause (const TopicPartitions &topicPartitions)
 Suspend fetching from the requested partitions. More...
 
void pause ()
 Suspend fetching from all assigned partitions. More...
 
void resume (const TopicPartitions &topicPartitions)
 Resume specified partitions which have been paused with pause(). More...
 
void resume ()
 Resume all partitions which have been paused with pause(). More...
 
consumer::ConsumerGroupMetadata groupMetadata ()
 Return the current group metadata associated with this consumer.
 
- Public Member Functions inherited from KAFKA_API::clients::KafkaClient
const std::string & clientId () const
 Get the client id.
 
const std::string & name () const
 Get the client name (i.e. More...
 
void setLogLevel (int level)
 Set log level for the kafka client (the default value: 5).
 
const Propertiesproperties () const
 Return the properties which took effect.
 
Optional< std::string > getProperty (const std::string &name) const
 Fetch the effected property (including the property internally set by librdkafka).
 
void pollEvents (std::chrono::milliseconds timeout)
 Call the OffsetCommit callbacks (if any) Note: The Kafka client should be constructed with option enable.manual.events.poll=true!
 
Optional< BrokerMetadatafetchBrokerMetadata (const std::string &topic, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_METADATA_TIMEOUT_MS), bool disableErrorLogging=false)
 Fetch matadata from a available broker. More...
 
template<class ... Args>
void doLog (int level, const char *filename, int lineno, const char *format, Args... args) const
 
void doLog (int level, const char *filename, int lineno, const char *msg) const
 

Static Public Attributes

static constexpr const char * DEFAULT_MAX_POLL_RECORDS_VALUE = "500"
 

Additional Inherited Members

- Public Types inherited from KAFKA_API::clients::KafkaClient
enum  { DEFAULT_METADATA_TIMEOUT_MS = 10000 }
 

Detailed Description

KafkaConsumer class.

Constructor & Destructor Documentation

◆ KafkaConsumer()

KAFKA_API::clients::consumer::KafkaConsumer::KafkaConsumer ( const Properties properties)
inlineexplicit

The constructor for KafkaConsumer.

Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__INVALID_ARG : Invalid BOOTSTRAP_SERVERS property
  • RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE: Fail to create internal threads

Member Function Documentation

◆ assign()

void KAFKA_API::clients::consumer::KafkaConsumer::assign ( const TopicPartitions &  topicPartitions)
inline

Manually assign a list of partitions to this consumer.

An exception would be thrown if subscribe is called previously (without a subsequent call to unsubscribe())

◆ beginningOffsets()

std::map<TopicPartition, Offset> KAFKA_API::clients::consumer::KafkaConsumer::beginningOffsets ( const TopicPartitions &  topicPartitions,
std::chrono::milliseconds  timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS) 
) const
inline

Get the first offset for the given partitions.

This method does not change the current consumer position of the partitions. Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__FAIL: Generic failure

◆ commitAsync() [1/2]

void KAFKA_API::clients::consumer::KafkaConsumer::commitAsync ( const consumer::OffsetCommitCallback &  offsetCommitCallback = consumer::NullOffsetCommitCallback)
inline

Commit offsets returned on the last poll() for all the subscribed list of topics and partition.

Note: If a callback is provided, it's guaranteed to be triggered (before closing the consumer).

◆ commitAsync() [2/2]

void KAFKA_API::clients::consumer::KafkaConsumer::commitAsync ( const TopicPartitionOffsets &  topicPartitionOffsets,
const consumer::OffsetCommitCallback &  offsetCommitCallback = consumer::NullOffsetCommitCallback 
)
inline

Commit the specified offsets for the specified list of topics and partitions to Kafka.

Note: If a callback is provided, it's guaranteed to be triggered (before closing the consumer).

◆ committed()

Offset KAFKA_API::clients::consumer::KafkaConsumer::committed ( const TopicPartition &  topicPartition)
inline

Get the last committed offset for the given partition (whether the commit happened by this process or another).This offset will be used as the position for the consumer in the event of a failure.

This call will block to do a remote call to get the latest committed offsets from the server. Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__INVALID_ARG: Invalid partition

◆ endOffsets()

std::map<TopicPartition, Offset> KAFKA_API::clients::consumer::KafkaConsumer::endOffsets ( const TopicPartitions &  topicPartitions,
std::chrono::milliseconds  timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS) 
) const
inline

Get the last offset for the given partitions.

The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1. This method does not change the current consumer position of the partitions. Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__FAIL: Generic failure

◆ offsetsForTime()

std::map< TopicPartition, Offset > KAFKA_API::clients::consumer::KafkaConsumer::offsetsForTime ( const TopicPartitions &  topicPartitions,
std::chrono::time_point< std::chrono::system_clock >  timepoint,
std::chrono::milliseconds  timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS) 
) const
inline

Get the offsets for the given partitions by time-point.

Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__TIMED_OUT: Not all offsets could be fetched in time.
  • RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: All partitions are unknown.
  • RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: Unable to query leaders from the given partitions.

◆ pause() [1/2]

void KAFKA_API::clients::consumer::KafkaConsumer::pause ( )
inline

Suspend fetching from all assigned partitions.

Future calls to poll() will not return any records until they have been resumed using resume(). Note: This method does not affect partition subscription/assignment.

◆ pause() [2/2]

void KAFKA_API::clients::consumer::KafkaConsumer::pause ( const TopicPartitions &  topicPartitions)
inline

Suspend fetching from the requested partitions.

Future calls to poll() will not return any records from these partitions until they have been resumed using resume(). Note: 1) After pausing, the application still need to call poll() at regular intervals. 2) This method does not affect partition subscription/assignment (i.e, pause fetching from partitions would not trigger a rebalance, since the consumer is still alive). 3) If none of the provided partitions is assigned to this consumer, an exception would be thrown. Throws KafkaException with error:

  • RD_KAFKA_RESP_ERR__INVALID_ARG: Invalid arguments

◆ poll()

std::vector< consumer::ConsumerRecord > KAFKA_API::clients::consumer::KafkaConsumer::poll ( std::chrono::milliseconds  timeout)
inline

Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.

Returns the polled records. Note: 1) The result could be fetched through ConsumerRecord (with member function error). 2) Make sure the ConsumerRecord be destructed before the KafkaConsumer.close(). Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: Unknow partition

◆ resume() [1/2]

void KAFKA_API::clients::consumer::KafkaConsumer::resume ( )
inline

Resume all partitions which have been paused with pause().

New calls to poll() will return records from these partitions if there are any to be fetched.

◆ resume() [2/2]

void KAFKA_API::clients::consumer::KafkaConsumer::resume ( const TopicPartitions &  topicPartitions)
inline

Resume specified partitions which have been paused with pause().

New calls to poll() will return records from these partitions if there are any to be fetched. Note: If the partitions were not previously paused, this method is a no-op.

◆ seek()

void KAFKA_API::clients::consumer::KafkaConsumer::seek ( const TopicPartition &  topicPartition,
Offset  offset,
std::chrono::milliseconds  timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS) 
)
inline

Overrides the fetch offsets that the consumer will use on the next poll(timeout).

If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__TIMED_OUT: Operation timed out
  • RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: Invalid partition
  • RD_KAFKA_RESP_ERR__STATE: Invalid broker state

◆ seekToBeginning()

void KAFKA_API::clients::consumer::KafkaConsumer::seekToBeginning ( const TopicPartitions &  topicPartitions,
std::chrono::milliseconds  timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS) 
)
inline

Seek to the first offset for each of the given partitions.

This function evaluates lazily, seeking to the first offset in all partitions only when poll(long) or position(TopicPartition) are called. If no partitions are provided, seek to the first offset for all of the currently assigned partitions. Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__TIMED_OUT: Operation timed out
  • RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: Invalid partition
  • RD_KAFKA_RESP_ERR__STATE: Invalid broker state

◆ seekToEnd()

void KAFKA_API::clients::consumer::KafkaConsumer::seekToEnd ( const TopicPartitions &  topicPartitions,
std::chrono::milliseconds  timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS) 
)
inline

Seek to the last offset for each of the given partitions.

This function evaluates lazily, seeking to the final offset in all partitions only when poll(long) or position(TopicPartition) are called. If no partitions are provided, seek to the first offset for all of the currently assigned partitions. Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__TIMED_OUT: Operation timed out
  • RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: Invalid partition
  • RD_KAFKA_RESP_ERR__STATE: Invalid broker state

◆ setGroupId()

void KAFKA_API::clients::consumer::KafkaConsumer::setGroupId ( const std::string &  id)
inline

To set group ID.

The group ID is mandatory for a Consumer.

◆ subscribe()

void KAFKA_API::clients::consumer::KafkaConsumer::subscribe ( const Topics &  topics,
consumer::RebalanceCallback  rebalanceCallback = consumer::NullRebalanceCallback,
std::chrono::milliseconds  timeout = std::chrono::milliseconds(DEFAULT_SUBSCRIBE_TIMEOUT_MS) 
)
inline

Subscribe to the given list of topics to get dynamically assigned partitions.

An exception would be thrown if assign is called previously (without a subsequent call to unsubscribe())


The documentation for this class was generated from the following file: