3 #include <kafka/Project.h>
5 #include <kafka/ConsumerCommon.h>
6 #include <kafka/KafkaClient.h>
7 #include <kafka/ProducerCommon.h>
8 #include <kafka/ProducerConfig.h>
9 #include <kafka/ProducerRecord.h>
10 #include <kafka/Timestamp.h>
11 #include <kafka/Types.h>
13 #include <librdkafka/rdkafka.h>
16 #include <condition_variable>
20 #include <unordered_map>
23 namespace KAFKA_API {
namespace clients {
namespace producer {
51 Error flush(std::chrono::milliseconds timeout = InfiniteTimeout);
61 void close(std::chrono::milliseconds timeout = InfiniteTimeout);
66 enum class SendOption { NoCopyRecordValue, ToCopyRecordValue };
92 const producer::Callback& deliveryCb,
93 SendOption option = SendOption::NoCopyRecordValue,
115 const producer::Callback& deliveryCb,
117 SendOption option = SendOption::NoCopyRecordValue,
162 std::chrono::milliseconds timeout);
165 void pollCallbacks(
int timeoutMs)
167 rd_kafka_poll(getClientHandle(), timeoutMs);
171 class DeliveryCbOpaque
174 DeliveryCbOpaque(Optional<producer::ProducerRecord::Id>
id, producer::Callback cb): _recordId(id), _deliveryCb(std::move(cb)) {}
176 void operator()(rd_kafka_t* ,
const rd_kafka_message_t* rkmsg)
178 _deliveryCb(producer::RecordMetadata{rkmsg, _recordId}, Error{rkmsg->err});
182 const Optional<producer::ProducerRecord::Id> _recordId;
183 const producer::Callback _deliveryCb;
187 static Properties validateAndReformProperties(
const Properties&
properties);
190 static void deliveryCallback(rd_kafka_t* rk,
const rd_kafka_message_t* rkmsg,
void* opaque);
193 static void registerConfigCallbacks(rd_kafka_conf_t* conf);
195 #ifdef KAFKA_API_ENABLE_UNIT_TEST_STUBS
197 using HandleProduceResponseCb = std::function<rd_kafka_resp_err_t(rd_kafka_t* , int32_t , uint64_t , rd_kafka_resp_err_t )>;
203 void stubHandleProduceResponse(HandleProduceResponseCb cb = HandleProduceResponseCb()) { _handleProduceRespCb = std::move(cb); }
206 static rd_kafka_resp_err_t handleProduceResponse(rd_kafka_t* rk, int32_t brokerId, uint64_t msgSeq, rd_kafka_resp_err_t err)
208 auto* client =
static_cast<KafkaClient*
>(rd_kafka_opaque(rk));
210 auto respCb = producer->_handleProduceRespCb;
211 return respCb ? respCb(rk, brokerId, msgSeq, err) : err;
214 HandleProduceResponseCb _handleProduceRespCb;
223 startBackgroundPollingIfNecessary([
this](
int timeoutMs){ pollCallbacks(timeoutMs); });
226 KAFKA_API_DO_LOG(Log::Level::Notice,
"initializes with properties[%s]", propStr.c_str());
230 KafkaProducer::registerConfigCallbacks(rd_kafka_conf_t* conf)
233 rd_kafka_conf_set_dr_msg_cb(conf, deliveryCallback);
235 #ifdef KAFKA_API_ENABLE_UNIT_TEST_STUBS
237 LogBuffer<LOG_BUFFER_SIZE> errInfo;
238 if (rd_kafka_conf_set(conf,
"ut_handle_ProduceResponse",
reinterpret_cast<char*
>(&handleProduceResponse), errInfo.str(), errInfo.capacity()))
241 size_t clientPtrSize = 0;
242 if (rd_kafka_conf_get(conf,
"opaque",
reinterpret_cast<char*
>(&client), &clientPtrSize))
244 KAFKA_API_LOG(Log::Level::Crit,
"failed to stub ut_handle_ProduceResponse! error[%s]. Meanwhile, failed to get the Kafka client!", errInfo.c_str());
248 assert(clientPtrSize ==
sizeof(client));
249 client->KAFKA_API_DO_LOG(Log::Level::Err,
"failed to stub ut_handle_ProduceResponse! error[%s]", errInfo.c_str());
256 KafkaProducer::validateAndReformProperties(
const Properties& properties)
258 using namespace producer;
261 auto newProperties = KafkaClient::validateAndReformProperties(
properties);
264 const std::set<std::string> availPartitioners = {
"murmur2_random",
"murmur2",
"random",
"consistent",
"consistent_random",
"fnv1a",
"fnv1a_random"};
266 if (partitioner && !availPartitioners.count(*partitioner))
268 std::string errMsg =
"Invalid partitioner [" + *partitioner +
"]! Valid options: ";
269 bool isTheFirst =
true;
270 for (
const auto& availPartitioner: availPartitioners)
272 errMsg += (std::string(isTheFirst ? (isTheFirst =
false,
"") :
", ") + availPartitioner);
276 KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg));
280 constexpr
int KAFKA_IDEMP_MAX_INFLIGHT = 5;
282 if (enableIdempotence && *enableIdempotence ==
"true")
286 if (std::stoi(*maxInFlight) > KAFKA_IDEMP_MAX_INFLIGHT)
288 KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
289 "`max.in.flight` must be set <= " + std::to_string(KAFKA_IDEMP_MAX_INFLIGHT) +
" when `enable.idempotence` is `true`"));
295 if (*acks !=
"all" && *acks !=
"-1")
297 KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
298 "`acks` must be set to `all`/`-1` when `enable.idempotence` is `true`"));
303 return newProperties;
308 KafkaProducer::deliveryCallback(rd_kafka_t* rk,
const rd_kafka_message_t* rkmsg,
void* )
310 if (
auto* deliveryCbOpaque =
static_cast<DeliveryCbOpaque*
>(rkmsg->_private))
312 (*deliveryCbOpaque)(rk, rkmsg);
313 delete deliveryCbOpaque;
319 const producer::Callback& deliveryCb,
323 auto deliveryCbOpaque = std::make_unique<DeliveryCbOpaque>(record.
id(), deliveryCb);
324 auto queueFullAction = (isWithAutoEventsPolling() ? action : ActionWhileQueueIsFull::NoBlock);
326 const auto* topic = record.
topic().c_str();
327 const auto partition = record.
partition();
328 const auto msgFlags = (
static_cast<unsigned int>(option == SendOption::ToCopyRecordValue ? RD_KAFKA_MSG_F_COPY : 0)
329 |
static_cast<unsigned int>(queueFullAction == ActionWhileQueueIsFull::Block ? RD_KAFKA_MSG_F_BLOCK : 0));
330 const auto* keyPtr = record.
key().data();
331 const auto keyLen = record.
key().size();
332 const auto* valuePtr = record.
value().data();
333 const auto valueLen = record.
value().size();
335 auto* rk = getClientHandle();
336 auto* opaquePtr = deliveryCbOpaque.get();
338 constexpr std::size_t VU_LIST_SIZE_WITH_NO_HEADERS = 6;
339 std::vector<rd_kafka_vu_t> rkVUs(VU_LIST_SIZE_WITH_NO_HEADERS + record.
headers().size());
341 std::size_t uvCount = 0;
344 auto& vu = rkVUs[uvCount++];
345 vu.vtype = RD_KAFKA_VTYPE_TOPIC;
350 auto& vu = rkVUs[uvCount++];
351 vu.vtype = RD_KAFKA_VTYPE_PARTITION;
352 vu.u.i32 = partition;
356 auto& vu = rkVUs[uvCount++];
357 vu.vtype = RD_KAFKA_VTYPE_MSGFLAGS;
358 vu.u.i =
static_cast<int>(msgFlags);
362 auto& vu = rkVUs[uvCount++];
363 vu.vtype = RD_KAFKA_VTYPE_KEY;
364 vu.u.mem.ptr =
const_cast<void*
>(keyPtr);
365 vu.u.mem.size = keyLen;
369 auto& vu = rkVUs[uvCount++];
370 vu.vtype = RD_KAFKA_VTYPE_VALUE;
371 vu.u.mem.ptr =
const_cast<void*
>(valuePtr);
372 vu.u.mem.size = valueLen;
376 auto& vu = rkVUs[uvCount++];
377 vu.vtype = RD_KAFKA_VTYPE_OPAQUE;
378 vu.u.ptr = opaquePtr;
382 for (
const auto& header: record.
headers())
384 auto& vu = rkVUs[uvCount++];
385 vu.vtype = RD_KAFKA_VTYPE_HEADER;
386 vu.u.header.name = header.key.c_str();
387 vu.u.header.val = header.value.data();
388 vu.u.header.size =
static_cast<int64_t
>(header.value.size());
391 assert(uvCount == rkVUs.size());
393 const Error sendResult{ rd_kafka_produceva(rk, rkVUs.data(), rkVUs.size()) };
394 KAFKA_THROW_IF_WITH_ERROR(sendResult);
397 deliveryCbOpaque.release();
403 Optional<Error> deliveryResult;
406 std::condition_variable delivered;
409 const std::lock_guard<std::mutex> guard(mtx);
411 deliveryResult = error;
412 recordMetadata = metadata;
414 delivered.notify_one();
417 send(record, deliveryCb);
419 std::unique_lock<std::mutex> lock(mtx);
420 delivered.wait(lock, [&deliveryResult]{
return static_cast<bool>(deliveryResult); });
422 KAFKA_THROW_IF_WITH_ERROR(*deliveryResult);
424 return recordMetadata;
430 return Error{rd_kafka_flush(getClientHandle(), convertMsDurationToInt(timeout))};
436 return Error{rd_kafka_purge(getClientHandle(),
437 (
static_cast<unsigned>(RD_KAFKA_PURGE_F_QUEUE) |
static_cast<unsigned>(RD_KAFKA_PURGE_F_INFLIGHT)))};
445 stopBackgroundPollingIfNecessary();
448 if (result.
value() == RD_KAFKA_RESP_ERR__TIMED_OUT)
450 KAFKA_API_DO_LOG(Log::Level::Notice,
"purge messages before close, outQLen[%d]", rd_kafka_outq_len(getClientHandle()));
454 rd_kafka_poll(getClientHandle(), 0);
456 KAFKA_API_DO_LOG(Log::Level::Notice,
"closed");
463 const Error result{ rd_kafka_init_transactions(getClientHandle(),
static_cast<int>(timeout.count())) };
464 KAFKA_THROW_IF_WITH_ERROR(result);
470 const Error result{ rd_kafka_begin_transaction(getClientHandle()) };
471 KAFKA_THROW_IF_WITH_ERROR(result);
477 const Error result{ rd_kafka_commit_transaction(getClientHandle(),
static_cast<int>(timeout.count())) };
478 KAFKA_THROW_IF_WITH_ERROR(result);
484 const Error result{ rd_kafka_abort_transaction(getClientHandle(),
static_cast<int>(timeout.count())) };
485 KAFKA_THROW_IF_WITH_ERROR(result);
491 std::chrono::milliseconds timeout)
493 auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topicPartitionOffsets));
494 const Error result{ rd_kafka_send_offsets_to_transaction(getClientHandle(),
496 groupMetadata.rawHandle(),
497 static_cast<int>(timeout.count())) };
498 KAFKA_THROW_IF_WITH_ERROR(result);
Unified error type.
Definition: Error.h:32
int value() const
Obtains the underlying error code value.
Definition: Error.h:81
Specific exception for Kafka clients.
Definition: KafkaException.h:22
const Error & error() const
Obtains the underlying error.
Definition: KafkaException.h:34
The properties for Kafka clients.
Definition: Properties.h:24
The base class for Kafka clients.
Definition: KafkaClient.h:34
const Properties & properties() const
Return the properties which took effect.
Definition: KafkaClient.h:56
KafkaProducer class.
Definition: KafkaProducer.h:29
void abortTransaction(std::chrono::milliseconds timeout=InfiniteTimeout)
Abort the ongoing transaction.
Definition: KafkaProducer.h:482
void commitTransaction(std::chrono::milliseconds timeout=InfiniteTimeout)
Commit the ongoing transaction.
Definition: KafkaProducer.h:475
producer::RecordMetadata syncSend(const producer::ProducerRecord &record)
Synchronously send a record to a topic.
Definition: KafkaProducer.h:401
void send(const producer::ProducerRecord &record, const producer::Callback &deliveryCb, SendOption option=SendOption::NoCopyRecordValue, ActionWhileQueueIsFull action=ActionWhileQueueIsFull::Block)
Asynchronously send a record to a topic.
Definition: KafkaProducer.h:318
void close(std::chrono::milliseconds timeout=InfiniteTimeout)
Close this producer.
Definition: KafkaProducer.h:441
Error flush(std::chrono::milliseconds timeout=InfiniteTimeout)
Invoking this method makes all buffered records immediately available to send, and blocks on the comp...
Definition: KafkaProducer.h:428
void beginTransaction()
Should be called before the start of each new transaction.
Definition: KafkaProducer.h:468
~KafkaProducer() override
The destructor for KafkaProducer.
Definition: KafkaProducer.h:43
ActionWhileQueueIsFull
Choose the action while the sending buffer is full.
Definition: KafkaProducer.h:71
void sendOffsetsToTransaction(const TopicPartitionOffsets &topicPartitionOffsets, const consumer::ConsumerGroupMetadata &groupMetadata, std::chrono::milliseconds timeout)
Send a list of specified offsets to the consumer group coodinator, and also marks those offsets as pa...
Definition: KafkaProducer.h:489
SendOption
Options for sending messages.
Definition: KafkaProducer.h:66
KafkaProducer(const Properties &properties)
The constructor for KafkaProducer.
Definition: KafkaProducer.h:219
Error purge()
Purge messages currently handled by the KafkaProducer.
Definition: KafkaProducer.h:434
void send(const producer::ProducerRecord &record, const producer::Callback &deliveryCb, Error &error, SendOption option=SendOption::NoCopyRecordValue, ActionWhileQueueIsFull action=ActionWhileQueueIsFull::Block)
Asynchronously send a record to a topic.
Definition: KafkaProducer.h:114
void initTransactions(std::chrono::milliseconds timeout=InfiniteTimeout)
Needs to be called before any other methods when the transactional.id is set in the configuration.
Definition: KafkaProducer.h:461
static constexpr const char * PARTITIONER
The default partitioner for a ProducerRecord (with no partition assigned).
Definition: ProducerConfig.h:97
static constexpr const char * MAX_IN_FLIGHT
Maximum number of in-flight requests per broker connection.
Definition: ProducerConfig.h:103
static constexpr const char * ENABLE_IDEMPOTENCE
When set to true, the producer will ensure that messages are succefully sent exactly once and in the ...
Definition: ProducerConfig.h:109
static constexpr const char * ACKS
The acks parameter controls how many partition replicas must receive the record before the producer c...
Definition: ProducerConfig.h:31
A key/value pair to be sent to Kafka.
Definition: ProducerRecord.h:19
Key key() const
The key (or null if no key is specified).
Definition: ProducerRecord.h:48
Partition partition() const
The partition to which the record will be sent (or UNKNOWN_PARTITION if no partition was specified).
Definition: ProducerRecord.h:43
const Topic & topic() const
The topic this record is being sent to.
Definition: ProducerRecord.h:38
Optional< Id > id() const
The id to identify the message (consistent with Producer::Metadata::recordId()).
Definition: ProducerRecord.h:58
Value value() const
The value.
Definition: ProducerRecord.h:53
const Headers & headers() const
The headers.
Definition: ProducerRecord.h:63