Modern C++ Kafka API
Public Member Functions | Static Public Attributes | List of all members
KAFKA_API::clients::producer::ProducerConfig Class Reference

Configuration for the Kafka Producer. More...

#include <ProducerConfig.h>

Inheritance diagram for KAFKA_API::clients::producer::ProducerConfig:
KAFKA_API::clients::Config KAFKA_API::Properties

Public Member Functions

 ProducerConfig (const ProducerConfig &)=default
 
 ProducerConfig (const PropertiesMap &kvMap)
 
- Public Member Functions inherited from KAFKA_API::clients::Config
 Config (const Config &)=default
 
 Config (const PropertiesMap &kvMap)
 
- Public Member Functions inherited from KAFKA_API::Properties
 Properties (const Properties &)=default
 
 Properties (PropertiesMap kvMap)
 
bool operator== (const Properties &rhs) const
 
template<class T >
Propertiesput (const std::string &key, const T &value)
 Set a property. More...
 
void remove (const std::string &key)
 Remove the property (if one exists).
 
bool contains (const std::string &key) const
 Check whether the map contains a property.
 
template<class T >
T & get (const std::string &key) const
 Get a property reference. More...
 
Optional< std::string > getProperty (const std::string &key) const
 Get a property.
 
void eraseProperty (const std::string &key)
 Remove a property.
 
std::string toString () const
 
const PropertiesMap & map () const
 Get all properties with a map.
 

Static Public Attributes

static constexpr const char * ACKS = "acks"
 The acks parameter controls how many partition replicas must receive the record before the producer can consider the write successful. More...
 
static constexpr const char * QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages"
 Maximum number of messages allowed on the producer queue. More...
 
static constexpr const char * QUEUE_BUFFERING_MAX_KBYTES = "queue.buffering.max.kbytes"
 Maximum total message size sum allowed on the producer queue. More...
 
static constexpr const char * LINGER_MS = "linger.ms"
 Delay in milliseconds to wait for messages in the producer queue, to accumulate before constructing messages batches to transmit to brokers. More...
 
static constexpr const char * BATCH_NUM_MESSAGES = "batch.num.messages"
 Maximum number of messages batched in one messageSet. More...
 
static constexpr const char * BATCH_SIZE = "batch.size"
 Maximum size (in bytes) of all messages batched in one MessageSet (including protocol framing overhead). More...
 
static constexpr const char * MESSAGE_MAX_BYTES = "message.max.bytes"
 Maximum Kafka protocol request message size. More...
 
static constexpr const char * MESSAGE_TIMEOUT_MS = "message.timeout.ms"
 This value is enforced locally and limits the time a produced message waits for successful delivery. More...
 
static constexpr const char * REQUEST_TIMEOUT_MS = "request.timeout.ms"
 This value is only enforced by the brokers and relies on ACKS being non-zero. More...
 
static constexpr const char * PARTITIONER = "partitioner"
 The default partitioner for a ProducerRecord (with no partition assigned). More...
 
static constexpr const char * MAX_IN_FLIGHT = "max.in.flight"
 Maximum number of in-flight requests per broker connection. More...
 
static constexpr const char * ENABLE_IDEMPOTENCE = "enable.idempotence"
 When set to true, the producer will ensure that messages are succefully sent exactly once and in the original order. More...
 
static constexpr const char * TRANSACTIONAL_ID = "transactional.id"
 It's used to identify the same transactional producer instance across process restarts.
 
static constexpr const char * TRANSACTION_TIMEOUT_MS = "transaction.timeout.ms"
 Th maximus amount of time in milliseconds that the transaction coordinator will wait for a trnsaction status update from the producer before proactively ablrting the ongoing transaction. More...
 
- Static Public Attributes inherited from KAFKA_API::clients::Config
static constexpr const char * ENABLE_MANUAL_EVENTS_POLL = "enable.manual.events.poll"
 To poll the events manually (otherwise, it would be done with a background polling thread). More...
 
static constexpr const char * LOG_CB = "log_cb"
 Log callback. More...
 
static constexpr const char * ERROR_CB = "error_cb"
 Log callback. More...
 
static constexpr const char * STATS_CB = "stats_cb"
 Statistics callback. More...
 
static constexpr const char * OAUTHBEARER_TOKEN_REFRESH_CB = "oauthbearer_token_refresh_cb"
 OAUTHBEARER token refresh callback. More...
 
static constexpr const char * INTERCEPTORS = "interceptors"
 Interceptors for thread start/exit, brokers' state change, etc. More...
 
static constexpr const char * BOOTSTRAP_SERVERS = "bootstrap.servers"
 The string contains host:port pairs of brokers (splitted by ",") that the consumer will use to establish initial connection to the Kafka cluster. More...
 
static constexpr const char * CLIENT_ID = "client.id"
 Client identifier.
 
static constexpr const char * LOG_LEVEL = "log_level"
 Log level (syslog(3) levels).
 
static constexpr const char * SOCKET_TIMEOUT_MS = "socket.timeout.ms"
 Timeout for network requests. More...
 
static constexpr const char * SECURITY_PROTOCOL = "security.protocol"
 Protocol used to communicate with brokers. More...
 
static constexpr const char * SASL_MECHANISM = "sasl.mechanisms"
 SASL mechanism to use for authentication. More...
 
static constexpr const char * SASL_USERNAME = "sasl.username"
 SASL username for use with the PLAIN and SASL-SCRAM-. More...
 
static constexpr const char * SASL_PASSWORD = "sasl.password"
 SASL password for use with the PLAIN and SASL-SCRAM-. More...
 
static constexpr const char * SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd"
 Shell command to refresh or acquire the client's Kerberos ticket.
 
static constexpr const char * SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"
 The client's Kerberos principal name.
 
static constexpr const char * SASL_OAUTHBEARER_METHOD = "sasl.oauthbearer.method"
 Set to "default" or "oidc" to control with login method to be used. More...
 
static constexpr const char * SASL_OAUTHBEARER_CLIENT_ID = "sasl.oauthbearer.client.id"
 Public identifier for the applicaition. More...
 
static constexpr const char * SASL_OAUTHBEARER_CLIENT_SECRET = "sasl.oauthbearer.client.secret"
 Client secret only known to the application and the authorization server. More...
 
static constexpr const char * SASL_OAUTHBEARER_EXTENSIONS = "sasl.oauthbearer.extensions"
 Allow additional information to be provided to the broker. More...
 
static constexpr const char * SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope"
 Client use this to specify the scope of the access request to the broker. More...
 
static constexpr const char * SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url"
 OAuth/OIDC issuer token endpoint HTTP(S) URI used to retreve token. More...
 
static constexpr const char * SASL_OAUTHBEARER_CONFIG = "sasl.oauthbearer.config"
 SASL/OAUTHBEARER configuration. More...
 
static constexpr const char * ENABLE_SASL_OAUTHBEARER_UNSECURE_JWT = "enable.sasl.oauthbearer.unsecure.jwt"
 Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set. More...
 

Additional Inherited Members

- Public Types inherited from KAFKA_API::Properties
using PropertiesMap = std::map< std::string, ValueType >
 

Detailed Description

Configuration for the Kafka Producer.

Member Data Documentation

◆ ACKS

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::ACKS = "acks"
staticconstexpr

The acks parameter controls how many partition replicas must receive the record before the producer can consider the write successful.

1) acks=0, the producer will not wait for a reply from the broker before assuming the message was sent successfully. 2) acks=1, the producer will receive a success response from the broker the moment the leader replica received the message. 3) acks=all, the producer will receive a success response from the broker once all in-sync replicas received the message. Note: if "ack=all", please make sure the topic's replication factor be larger than 1. That means, if the topic is automaticly created by producer's send, the default.replication.factor property for the kafka server should be larger than 1. The "ack=all" property is mandatory for reliability requirements, but would increase the ack latency and impact the throughput. Default value: all

◆ BATCH_NUM_MESSAGES

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::BATCH_NUM_MESSAGES = "batch.num.messages"
staticconstexpr

Maximum number of messages batched in one messageSet.

The total MessageSet size is also limited by MESSAGE_MAX_BYTES. Default value: 10000

◆ BATCH_SIZE

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::BATCH_SIZE = "batch.size"
staticconstexpr

Maximum size (in bytes) of all messages batched in one MessageSet (including protocol framing overhead).

Default value: 1000000

◆ ENABLE_IDEMPOTENCE

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::ENABLE_IDEMPOTENCE = "enable.idempotence"
staticconstexpr

When set to true, the producer will ensure that messages are succefully sent exactly once and in the original order.

Default value: false

◆ LINGER_MS

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::LINGER_MS = "linger.ms"
staticconstexpr

Delay in milliseconds to wait for messages in the producer queue, to accumulate before constructing messages batches to transmit to brokers.

Default value: 0 (KafkaSyncProducer); 0.5 (KafkaAsyncProducer)

◆ MAX_IN_FLIGHT

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::MAX_IN_FLIGHT = "max.in.flight"
staticconstexpr

Maximum number of in-flight requests per broker connection.

Default value: 1000000 (while enable.idempotence=false); 5 (while enable.idempotence=true)

◆ MESSAGE_MAX_BYTES

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::MESSAGE_MAX_BYTES = "message.max.bytes"
staticconstexpr

Maximum Kafka protocol request message size.

Note: Should be coordinated with the brokers's configuration. Otherwise, any larger message would be rejected! Default value: 1000000

◆ MESSAGE_TIMEOUT_MS

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::MESSAGE_TIMEOUT_MS = "message.timeout.ms"
staticconstexpr

This value is enforced locally and limits the time a produced message waits for successful delivery.

Note: If failed to get the ack within this limit, an exception would be thrown (in SyncProducer.send()), or an error code would be passed into the delivery callback (AsyncProducer). Default value: 300000

◆ PARTITIONER

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::PARTITIONER = "partitioner"
staticconstexpr

The default partitioner for a ProducerRecord (with no partition assigned).

Note: It's not the same with Java version's "partitioner.class" property Available options: 1) random – random distribution 2) consistent – CRC32 hash of key (ProducerRecords with empty/null key are mapped to single partition) 3) consistent_random – CRC32 hash of key (ProducerRecords with empty/null key are randomly partitioned) 4) murmur2 – Java Producer compatible Murmur2 hash of key (ProducerRecords with null key are mapped to single partition) 5) murmur2_random – Java Producer compatible Murmur2 hash of key (ProducerRecords with null key are randomly partitioned. It's equivalent to the Java Producer's default partitioner) 6) fnv1a – FNV-1a hash of key (ProducerRecords with null key are mapped to single partition) 7) fnv1a_random – FNV-1a hash of key (ProducerRecords with null key are randomly partitioned) Default value: murmur2_random

◆ QUEUE_BUFFERING_MAX_KBYTES

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::QUEUE_BUFFERING_MAX_KBYTES = "queue.buffering.max.kbytes"
staticconstexpr

Maximum total message size sum allowed on the producer queue.

Default value: 0x100000 (1GB)

◆ QUEUE_BUFFERING_MAX_MESSAGES

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages"
staticconstexpr

Maximum number of messages allowed on the producer queue.

Default value: 100000

◆ REQUEST_TIMEOUT_MS

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::REQUEST_TIMEOUT_MS = "request.timeout.ms"
staticconstexpr

This value is only enforced by the brokers and relies on ACKS being non-zero.

Note: The leading broker waits for in-sync replicas to acknowledge the message, and will return an error if the time elapses without the necessary acks. Default value: 5000

◆ TRANSACTION_TIMEOUT_MS

constexpr const char* KAFKA_API::clients::producer::ProducerConfig::TRANSACTION_TIMEOUT_MS = "transaction.timeout.ms"
staticconstexpr

Th maximus amount of time in milliseconds that the transaction coordinator will wait for a trnsaction status update from the producer before proactively ablrting the ongoing transaction.

Default value: 60000


The documentation for this class was generated from the following file: