Modern C++ Kafka API
Public Types | Public Member Functions | List of all members
KAFKA_API::clients::producer::KafkaProducer Class Reference

KafkaProducer class. More...

#include <KafkaProducer.h>

Inheritance diagram for KAFKA_API::clients::producer::KafkaProducer:
KAFKA_API::clients::KafkaClient

Public Types

enum class  SendOption { NoCopyRecordValue , ToCopyRecordValue }
 Options for sending messages.
 
enum class  ActionWhileQueueIsFull { Block , NoBlock }
 Choose the action while the sending buffer is full.
 
- Public Types inherited from KAFKA_API::clients::KafkaClient
enum  { DEFAULT_METADATA_TIMEOUT_MS = 10000 }
 

Public Member Functions

 KafkaProducer (const Properties &properties)
 The constructor for KafkaProducer. More...
 
 ~KafkaProducer () override
 The destructor for KafkaProducer.
 
Error flush (std::chrono::milliseconds timeout=InfiniteTimeout)
 Invoking this method makes all buffered records immediately available to send, and blocks on the completion of the requests associated with these records. More...
 
Error purge ()
 Purge messages currently handled by the KafkaProducer.
 
void close (std::chrono::milliseconds timeout=InfiniteTimeout)
 Close this producer. More...
 
void send (const producer::ProducerRecord &record, const producer::Callback &deliveryCb, SendOption option=SendOption::NoCopyRecordValue, ActionWhileQueueIsFull action=ActionWhileQueueIsFull::Block)
 Asynchronously send a record to a topic. More...
 
void send (const producer::ProducerRecord &record, const producer::Callback &deliveryCb, Error &error, SendOption option=SendOption::NoCopyRecordValue, ActionWhileQueueIsFull action=ActionWhileQueueIsFull::Block)
 Asynchronously send a record to a topic. More...
 
producer::RecordMetadata syncSend (const producer::ProducerRecord &record)
 Synchronously send a record to a topic. More...
 
void initTransactions (std::chrono::milliseconds timeout=InfiniteTimeout)
 Needs to be called before any other methods when the transactional.id is set in the configuration.
 
void beginTransaction ()
 Should be called before the start of each new transaction.
 
void commitTransaction (std::chrono::milliseconds timeout=InfiniteTimeout)
 Commit the ongoing transaction.
 
void abortTransaction (std::chrono::milliseconds timeout=InfiniteTimeout)
 Abort the ongoing transaction.
 
void sendOffsetsToTransaction (const TopicPartitionOffsets &topicPartitionOffsets, const consumer::ConsumerGroupMetadata &groupMetadata, std::chrono::milliseconds timeout)
 Send a list of specified offsets to the consumer group coodinator, and also marks those offsets as part of the current transaction.
 
- Public Member Functions inherited from KAFKA_API::clients::KafkaClient
const std::string & clientId () const
 Get the client id.
 
const std::string & name () const
 Get the client name (i.e. More...
 
void setLogLevel (int level)
 Set log level for the kafka client (the default value: 5).
 
const Propertiesproperties () const
 Return the properties which took effect.
 
Optional< std::string > getProperty (const std::string &name) const
 Fetch the effected property (including the property internally set by librdkafka).
 
void pollEvents (std::chrono::milliseconds timeout)
 Call the OffsetCommit callbacks (if any) Note: The Kafka client should be constructed with option enable.manual.events.poll=true!
 
Optional< BrokerMetadatafetchBrokerMetadata (const std::string &topic, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_METADATA_TIMEOUT_MS), bool disableErrorLogging=false)
 Fetch matadata from a available broker. More...
 
template<class ... Args>
void doLog (int level, const char *filename, int lineno, const char *format, Args... args) const
 
void doLog (int level, const char *filename, int lineno, const char *msg) const
 

Detailed Description

KafkaProducer class.

Constructor & Destructor Documentation

◆ KafkaProducer()

KAFKA_API::clients::producer::KafkaProducer::KafkaProducer ( const Properties properties)
inlineexplicit

The constructor for KafkaProducer.

Throws KafkaException with errors:

  • RD_KAFKA_RESP_ERR__INVALID_ARG : Invalid BOOTSTRAP_SERVERS property
  • RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE: Fail to create internal threads

Member Function Documentation

◆ close()

void KAFKA_API::clients::producer::KafkaProducer::close ( std::chrono::milliseconds  timeout = InfiniteTimeout)
inline

Close this producer.

This method would wait up to timeout for the producer to complete the sending of all incomplete requests (before purging them).

◆ flush()

Error KAFKA_API::clients::producer::KafkaProducer::flush ( std::chrono::milliseconds  timeout = InfiniteTimeout)
inline

Invoking this method makes all buffered records immediately available to send, and blocks on the completion of the requests associated with these records.

Possible error values:

  • RD_KAFKA_RESP_ERR__TIMED_OUT: The timeout was reached before all outstanding requests were completed.

◆ send() [1/2]

void KAFKA_API::clients::producer::KafkaProducer::send ( const producer::ProducerRecord record,
const producer::Callback &  deliveryCb,
Error error,
SendOption  option = SendOption::NoCopyRecordValue,
ActionWhileQueueIsFull  action = ActionWhileQueueIsFull::Block 
)
inline

Asynchronously send a record to a topic.

Note:

  • If a callback is provided, it's guaranteed to be triggered (before closing the producer).
  • The input reference parameter error will be set if an error occurred.
  • Make sure the memory block (for ProducerRecord's value) is valid until the delivery callback finishes; Otherwise, should be with option KafkaProducer::SendOption::ToCopyRecordValue.

Possible errors: Local errors,

  • RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC: The topic doesn't exist
  • RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: The partition doesn't exist
  • RD_KAFKA_RESP_ERR__INVALID_ARG: Invalid topic(topic is null, or the length is too long (> 512)
  • RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: No ack received within the time limit
  • RD_KAFKA_RESP_ERR__QUEUE_FULL: The message buffing queue is full Broker errors,
  • Error Codes

◆ send() [2/2]

void KAFKA_API::clients::producer::KafkaProducer::send ( const producer::ProducerRecord record,
const producer::Callback &  deliveryCb,
SendOption  option = SendOption::NoCopyRecordValue,
ActionWhileQueueIsFull  action = ActionWhileQueueIsFull::Block 
)
inline

Asynchronously send a record to a topic.

Note:

  • If a callback is provided, it's guaranteed to be triggered (before closing the producer).
  • If any error occured, an exception would be thrown.
  • Make sure the memory block (for ProducerRecord's value) is valid until the delivery callback finishes; Otherwise, should be with option KafkaProducer::SendOption::ToCopyRecordValue.

Possible errors: Local errors,

  • RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC: The topic doesn't exist
  • RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: The partition doesn't exist
  • RD_KAFKA_RESP_ERR__INVALID_ARG: Invalid topic(topic is null, or the length is too long (> 512)
  • RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: No ack received within the time limit
  • RD_KAFKA_RESP_ERR__QUEUE_FULL: The message buffing queue is full Broker errors,
  • Error Codes

◆ syncSend()

producer::RecordMetadata KAFKA_API::clients::producer::KafkaProducer::syncSend ( const producer::ProducerRecord record)
inline

Synchronously send a record to a topic.

Throws KafkaException with errors: Local errors,

  • RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC: The topic doesn't exist
  • RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: The partition doesn't exist
  • RD_KAFKA_RESP_ERR__INVALID_ARG: Invalid topic(topic is null, or the length is too long (> 512)
  • RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: No ack received within the time limit Broker errors,
  • Error Codes

The documentation for this class was generated from the following file: