Modern C++ Kafka API
|
Configuration for the Kafka Producer. More...
#include <ProducerConfig.h>
Public Member Functions | |
ProducerConfig (const ProducerConfig &)=default | |
ProducerConfig (const PropertiesMap &kvMap) | |
Public Member Functions inherited from KAFKA_API::clients::Config | |
Config (const Config &)=default | |
Config (const PropertiesMap &kvMap) | |
Public Member Functions inherited from KAFKA_API::Properties | |
Properties (const Properties &)=default | |
Properties (PropertiesMap kvMap) | |
bool | operator== (const Properties &rhs) const |
template<class T > | |
Properties & | put (const std::string &key, const T &value) |
Set a property. More... | |
void | remove (const std::string &key) |
Remove the property (if one exists). | |
bool | contains (const std::string &key) const |
Check whether the map contains a property. | |
template<class T > | |
T & | get (const std::string &key) const |
Get a property reference. More... | |
Optional< std::string > | getProperty (const std::string &key) const |
Get a property. | |
void | eraseProperty (const std::string &key) |
Remove a property. | |
std::string | toString () const |
const PropertiesMap & | map () const |
Get all properties with a map. | |
Static Public Attributes | |
static constexpr const char * | ACKS = "acks" |
The acks parameter controls how many partition replicas must receive the record before the producer can consider the write successful. More... | |
static constexpr const char * | QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages" |
Maximum number of messages allowed on the producer queue. More... | |
static constexpr const char * | QUEUE_BUFFERING_MAX_KBYTES = "queue.buffering.max.kbytes" |
Maximum total message size sum allowed on the producer queue. More... | |
static constexpr const char * | LINGER_MS = "linger.ms" |
Delay in milliseconds to wait for messages in the producer queue, to accumulate before constructing messages batches to transmit to brokers. More... | |
static constexpr const char * | BATCH_NUM_MESSAGES = "batch.num.messages" |
Maximum number of messages batched in one messageSet. More... | |
static constexpr const char * | BATCH_SIZE = "batch.size" |
Maximum size (in bytes) of all messages batched in one MessageSet (including protocol framing overhead). More... | |
static constexpr const char * | MESSAGE_MAX_BYTES = "message.max.bytes" |
Maximum Kafka protocol request message size. More... | |
static constexpr const char * | MESSAGE_TIMEOUT_MS = "message.timeout.ms" |
This value is enforced locally and limits the time a produced message waits for successful delivery. More... | |
static constexpr const char * | REQUEST_TIMEOUT_MS = "request.timeout.ms" |
This value is only enforced by the brokers and relies on ACKS being non-zero. More... | |
static constexpr const char * | PARTITIONER = "partitioner" |
The default partitioner for a ProducerRecord (with no partition assigned). More... | |
static constexpr const char * | MAX_IN_FLIGHT = "max.in.flight" |
Maximum number of in-flight requests per broker connection. More... | |
static constexpr const char * | ENABLE_IDEMPOTENCE = "enable.idempotence" |
When set to true , the producer will ensure that messages are succefully sent exactly once and in the original order. More... | |
static constexpr const char * | TRANSACTIONAL_ID = "transactional.id" |
It's used to identify the same transactional producer instance across process restarts. | |
static constexpr const char * | TRANSACTION_TIMEOUT_MS = "transaction.timeout.ms" |
Th maximus amount of time in milliseconds that the transaction coordinator will wait for a trnsaction status update from the producer before proactively ablrting the ongoing transaction. More... | |
Static Public Attributes inherited from KAFKA_API::clients::Config | |
static constexpr const char * | ENABLE_MANUAL_EVENTS_POLL = "enable.manual.events.poll" |
To poll the events manually (otherwise, it would be done with a background polling thread). More... | |
static constexpr const char * | LOG_CB = "log_cb" |
Log callback. More... | |
static constexpr const char * | ERROR_CB = "error_cb" |
Log callback. More... | |
static constexpr const char * | STATS_CB = "stats_cb" |
Statistics callback. More... | |
static constexpr const char * | OAUTHBEARER_TOKEN_REFRESH_CB = "oauthbearer_token_refresh_cb" |
OAUTHBEARER token refresh callback. More... | |
static constexpr const char * | INTERCEPTORS = "interceptors" |
Interceptors for thread start/exit, brokers' state change, etc. More... | |
static constexpr const char * | BOOTSTRAP_SERVERS = "bootstrap.servers" |
The string contains host:port pairs of brokers (splitted by ",") that the consumer will use to establish initial connection to the Kafka cluster. More... | |
static constexpr const char * | CLIENT_ID = "client.id" |
Client identifier. | |
static constexpr const char * | LOG_LEVEL = "log_level" |
Log level (syslog(3) levels). | |
static constexpr const char * | SOCKET_TIMEOUT_MS = "socket.timeout.ms" |
Timeout for network requests. More... | |
static constexpr const char * | SECURITY_PROTOCOL = "security.protocol" |
Protocol used to communicate with brokers. More... | |
static constexpr const char * | SASL_MECHANISM = "sasl.mechanisms" |
SASL mechanism to use for authentication. More... | |
static constexpr const char * | SASL_USERNAME = "sasl.username" |
SASL username for use with the PLAIN and SASL-SCRAM-. More... | |
static constexpr const char * | SASL_PASSWORD = "sasl.password" |
SASL password for use with the PLAIN and SASL-SCRAM-. More... | |
static constexpr const char * | SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd" |
Shell command to refresh or acquire the client's Kerberos ticket. | |
static constexpr const char * | SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name" |
The client's Kerberos principal name. | |
static constexpr const char * | SASL_OAUTHBEARER_METHOD = "sasl.oauthbearer.method" |
Set to "default" or "oidc" to control with login method to be used. More... | |
static constexpr const char * | SASL_OAUTHBEARER_CLIENT_ID = "sasl.oauthbearer.client.id" |
Public identifier for the applicaition. More... | |
static constexpr const char * | SASL_OAUTHBEARER_CLIENT_SECRET = "sasl.oauthbearer.client.secret" |
Client secret only known to the application and the authorization server. More... | |
static constexpr const char * | SASL_OAUTHBEARER_EXTENSIONS = "sasl.oauthbearer.extensions" |
Allow additional information to be provided to the broker. More... | |
static constexpr const char * | SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope" |
Client use this to specify the scope of the access request to the broker. More... | |
static constexpr const char * | SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url" |
OAuth/OIDC issuer token endpoint HTTP(S) URI used to retreve token. More... | |
static constexpr const char * | SASL_OAUTHBEARER_CONFIG = "sasl.oauthbearer.config" |
SASL/OAUTHBEARER configuration. More... | |
static constexpr const char * | ENABLE_SASL_OAUTHBEARER_UNSECURE_JWT = "enable.sasl.oauthbearer.unsecure.jwt" |
Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set. More... | |
Additional Inherited Members | |
Public Types inherited from KAFKA_API::Properties | |
using | PropertiesMap = std::map< std::string, ValueType > |
Configuration for the Kafka Producer.
|
staticconstexpr |
The acks parameter controls how many partition replicas must receive the record before the producer can consider the write successful.
1) acks=0, the producer will not wait for a reply from the broker before assuming the message was sent successfully. 2) acks=1, the producer will receive a success response from the broker the moment the leader replica received the message. 3) acks=all, the producer will receive a success response from the broker once all in-sync replicas received the message. Note: if "ack=all", please make sure the topic's replication factor be larger than 1. That means, if the topic is automaticly created by producer's send
, the default.replication.factor
property for the kafka server should be larger than 1. The "ack=all" property is mandatory for reliability requirements, but would increase the ack latency and impact the throughput. Default value: all
|
staticconstexpr |
Maximum number of messages batched in one messageSet.
The total MessageSet size is also limited by MESSAGE_MAX_BYTES. Default value: 10000
|
staticconstexpr |
Maximum size (in bytes) of all messages batched in one MessageSet (including protocol framing overhead).
Default value: 1000000
|
staticconstexpr |
When set to true
, the producer will ensure that messages are succefully sent exactly once and in the original order.
Default value: false
|
staticconstexpr |
Delay in milliseconds to wait for messages in the producer queue, to accumulate before constructing messages batches to transmit to brokers.
Default value: 0 (KafkaSyncProducer); 0.5 (KafkaAsyncProducer)
|
staticconstexpr |
Maximum number of in-flight requests per broker connection.
Default value: 1000000 (while enable.idempotence
=false); 5 (while enable.idempotence
=true)
|
staticconstexpr |
Maximum Kafka protocol request message size.
Note: Should be coordinated with the brokers's configuration. Otherwise, any larger message would be rejected! Default value: 1000000
|
staticconstexpr |
This value is enforced locally and limits the time a produced message waits for successful delivery.
Note: If failed to get the ack within this limit, an exception would be thrown (in SyncProducer.send()
), or an error code would be passed into the delivery callback (AsyncProducer). Default value: 300000
|
staticconstexpr |
The default partitioner for a ProducerRecord (with no partition assigned).
Note: It's not the same with Java version's "partitioner.class" property Available options: 1) random – random distribution 2) consistent – CRC32 hash of key (ProducerRecord
s with empty/null key are mapped to single partition) 3) consistent_random – CRC32 hash of key (ProducerRecord
s with empty/null key are randomly partitioned) 4) murmur2 – Java Producer compatible Murmur2 hash of key (ProducerRecord
s with null key are mapped to single partition) 5) murmur2_random – Java Producer compatible Murmur2 hash of key (ProducerRecord
s with null key are randomly partitioned. It's equivalent to the Java Producer's default partitioner) 6) fnv1a – FNV-1a hash of key (ProducerRecord
s with null key are mapped to single partition) 7) fnv1a_random – FNV-1a hash of key (ProducerRecord
s with null key are randomly partitioned) Default value: murmur2_random
|
staticconstexpr |
Maximum total message size sum allowed on the producer queue.
Default value: 0x100000 (1GB)
|
staticconstexpr |
Maximum number of messages allowed on the producer queue.
Default value: 100000
|
staticconstexpr |
This value is only enforced by the brokers and relies on ACKS
being non-zero.
Note: The leading broker waits for in-sync replicas to acknowledge the message, and will return an error if the time elapses without the necessary acks. Default value: 5000
|
staticconstexpr |
Th maximus amount of time in milliseconds that the transaction coordinator will wait for a trnsaction status update from the producer before proactively ablrting the ongoing transaction.
Default value: 60000