Modern C++ Kafka API
KafkaProducer.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/ConsumerCommon.h>
6 #include <kafka/KafkaClient.h>
7 #include <kafka/ProducerCommon.h>
8 #include <kafka/ProducerConfig.h>
9 #include <kafka/ProducerRecord.h>
10 #include <kafka/Timestamp.h>
11 #include <kafka/Types.h>
12 
13 #include <librdkafka/rdkafka.h>
14 
15 #include <cassert>
16 #include <condition_variable>
17 #include <cstdint>
18 #include <memory>
19 #include <mutex>
20 #include <unordered_map>
21 
22 
23 namespace KAFKA_API { namespace clients { namespace producer {
24 
29 {
30 public:
38  explicit KafkaProducer(const Properties& properties);
39 
43  ~KafkaProducer() override { if (_opened) close(); }
44 
51  Error flush(std::chrono::milliseconds timeout = InfiniteTimeout);
52 
56  Error purge();
57 
61  void close(std::chrono::milliseconds timeout = InfiniteTimeout);
62 
66  enum class SendOption { NoCopyRecordValue, ToCopyRecordValue };
67 
71  enum class ActionWhileQueueIsFull { Block, NoBlock };
72 
91  void send(const producer::ProducerRecord& record,
92  const producer::Callback& deliveryCb,
93  SendOption option = SendOption::NoCopyRecordValue,
94  ActionWhileQueueIsFull action = ActionWhileQueueIsFull::Block);
95 
114  void send(const producer::ProducerRecord& record,
115  const producer::Callback& deliveryCb,
116  Error& error,
117  SendOption option = SendOption::NoCopyRecordValue,
118  ActionWhileQueueIsFull action = ActionWhileQueueIsFull::Block)
119  {
120  try { send(record, deliveryCb, option, action); } catch (const KafkaException& e) { error = e.error(); }
121  }
122 
135 
139  void initTransactions(std::chrono::milliseconds timeout = InfiniteTimeout);
140 
144  void beginTransaction();
145 
149  void commitTransaction(std::chrono::milliseconds timeout = InfiniteTimeout);
150 
154  void abortTransaction(std::chrono::milliseconds timeout = InfiniteTimeout);
155 
156 
160  void sendOffsetsToTransaction(const TopicPartitionOffsets& topicPartitionOffsets,
161  const consumer::ConsumerGroupMetadata& groupMetadata,
162  std::chrono::milliseconds timeout);
163 
164 private:
165  void pollCallbacks(int timeoutMs)
166  {
167  rd_kafka_poll(getClientHandle(), timeoutMs);
168  }
169 
170  // Define datatypes for "opaque" (as an input for rd_kafka_produceva), in order to handle the delivery callback
171  class DeliveryCbOpaque
172  {
173  public:
174  DeliveryCbOpaque(Optional<producer::ProducerRecord::Id> id, producer::Callback cb): _recordId(id), _deliveryCb(std::move(cb)) {}
175 
176  void operator()(rd_kafka_t* /*rk*/, const rd_kafka_message_t* rkmsg)
177  {
178  _deliveryCb(producer::RecordMetadata{rkmsg, _recordId}, Error{rkmsg->err});
179  }
180 
181  private:
182  const Optional<producer::ProducerRecord::Id> _recordId;
183  const producer::Callback _deliveryCb;
184  };
185 
186  // Validate properties (and fix it if necesary)
187  static Properties validateAndReformProperties(const Properties& properties);
188 
189  // Delivery Callback (for librdkafka)
190  static void deliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmsg, void* opaque);
191 
192  // Register Callbacks for rd_kafka_conf_t
193  static void registerConfigCallbacks(rd_kafka_conf_t* conf);
194 
195 #ifdef KAFKA_API_ENABLE_UNIT_TEST_STUBS
196 public:
197  using HandleProduceResponseCb = std::function<rd_kafka_resp_err_t(rd_kafka_t* /*rk*/, int32_t /*brokerid*/, uint64_t /*msgseq*/, rd_kafka_resp_err_t /*err*/)>;
198 
203  void stubHandleProduceResponse(HandleProduceResponseCb cb = HandleProduceResponseCb()) { _handleProduceRespCb = std::move(cb); }
204 
205 private:
206  static rd_kafka_resp_err_t handleProduceResponse(rd_kafka_t* rk, int32_t brokerId, uint64_t msgSeq, rd_kafka_resp_err_t err)
207  {
208  auto* client = static_cast<KafkaClient*>(rd_kafka_opaque(rk));
209  auto* producer = dynamic_cast<KafkaProducer*>(client);
210  auto respCb = producer->_handleProduceRespCb;
211  return respCb ? respCb(rk, brokerId, msgSeq, err) : err;
212  }
213 
214  HandleProduceResponseCb _handleProduceRespCb;
215 #endif
216 };
217 
218 inline
220  : KafkaClient(ClientType::KafkaProducer, validateAndReformProperties(properties), registerConfigCallbacks)
221 {
222  // Start background polling (if needed)
223  startBackgroundPollingIfNecessary([this](int timeoutMs){ pollCallbacks(timeoutMs); });
224 
225  const auto propStr = KafkaClient::properties().toString();
226  KAFKA_API_DO_LOG(Log::Level::Notice, "initializes with properties[%s]", propStr.c_str());
227 }
228 
229 inline void
230 KafkaProducer::registerConfigCallbacks(rd_kafka_conf_t* conf)
231 {
232  // Delivery Callback
233  rd_kafka_conf_set_dr_msg_cb(conf, deliveryCallback);
234 
235 #ifdef KAFKA_API_ENABLE_UNIT_TEST_STUBS
236  // UT stub for ProduceResponse
237  LogBuffer<LOG_BUFFER_SIZE> errInfo;
238  if (rd_kafka_conf_set(conf, "ut_handle_ProduceResponse", reinterpret_cast<char*>(&handleProduceResponse), errInfo.str(), errInfo.capacity())) // NOLINT
239  {
240  KafkaClient* client = nullptr;
241  size_t clientPtrSize = 0;
242  if (rd_kafka_conf_get(conf, "opaque", reinterpret_cast<char*>(&client), &clientPtrSize)) // NOLINT
243  {
244  KAFKA_API_LOG(Log::Level::Crit, "failed to stub ut_handle_ProduceResponse! error[%s]. Meanwhile, failed to get the Kafka client!", errInfo.c_str());
245  }
246  else
247  {
248  assert(clientPtrSize == sizeof(client)); // NOLINT
249  client->KAFKA_API_DO_LOG(Log::Level::Err, "failed to stub ut_handle_ProduceResponse! error[%s]", errInfo.c_str());
250  }
251  }
252 #endif
253 }
254 
255 inline Properties
256 KafkaProducer::validateAndReformProperties(const Properties& properties)
257 {
258  using namespace producer;
259 
260  // Let the base class validate first
261  auto newProperties = KafkaClient::validateAndReformProperties(properties);
262 
263  // Check whether it's an available partitioner
264  const std::set<std::string> availPartitioners = {"murmur2_random", "murmur2", "random", "consistent", "consistent_random", "fnv1a", "fnv1a_random"};
265  auto partitioner = newProperties.getProperty(ProducerConfig::PARTITIONER);
266  if (partitioner && !availPartitioners.count(*partitioner))
267  {
268  std::string errMsg = "Invalid partitioner [" + *partitioner + "]! Valid options: ";
269  bool isTheFirst = true;
270  for (const auto& availPartitioner: availPartitioners)
271  {
272  errMsg += (std::string(isTheFirst ? (isTheFirst = false, "") : ", ") + availPartitioner);
273  }
274  errMsg += ".";
275 
276  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg));
277  }
278 
279  // For "idempotence" feature
280  constexpr int KAFKA_IDEMP_MAX_INFLIGHT = 5;
281  const auto enableIdempotence = newProperties.getProperty(ProducerConfig::ENABLE_IDEMPOTENCE);
282  if (enableIdempotence && *enableIdempotence == "true")
283  {
284  if (const auto maxInFlight = newProperties.getProperty(ProducerConfig::MAX_IN_FLIGHT))
285  {
286  if (std::stoi(*maxInFlight) > KAFKA_IDEMP_MAX_INFLIGHT)
287  {
288  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
289  "`max.in.flight` must be set <= " + std::to_string(KAFKA_IDEMP_MAX_INFLIGHT) + " when `enable.idempotence` is `true`"));
290  }
291  }
292 
293  if (const auto acks = newProperties.getProperty(ProducerConfig::ACKS))
294  {
295  if (*acks != "all" && *acks != "-1")
296  {
297  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
298  "`acks` must be set to `all`/`-1` when `enable.idempotence` is `true`"));
299  }
300  }
301  }
302 
303  return newProperties;
304 }
305 
306 // Delivery Callback (for librdkafka)
307 inline void
308 KafkaProducer::deliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmsg, void* /*opaque*/)
309 {
310  if (auto* deliveryCbOpaque = static_cast<DeliveryCbOpaque*>(rkmsg->_private))
311  {
312  (*deliveryCbOpaque)(rk, rkmsg);
313  delete deliveryCbOpaque;
314  }
315 }
316 
317 inline void
319  const producer::Callback& deliveryCb,
320  SendOption option,
321  ActionWhileQueueIsFull action)
322 {
323  auto deliveryCbOpaque = std::make_unique<DeliveryCbOpaque>(record.id(), deliveryCb);
324  auto queueFullAction = (isWithAutoEventsPolling() ? action : ActionWhileQueueIsFull::NoBlock);
325 
326  const auto* topic = record.topic().c_str();
327  const auto partition = record.partition();
328  const auto msgFlags = (static_cast<unsigned int>(option == SendOption::ToCopyRecordValue ? RD_KAFKA_MSG_F_COPY : 0)
329  | static_cast<unsigned int>(queueFullAction == ActionWhileQueueIsFull::Block ? RD_KAFKA_MSG_F_BLOCK : 0));
330  const auto* keyPtr = record.key().data();
331  const auto keyLen = record.key().size();
332  const auto* valuePtr = record.value().data();
333  const auto valueLen = record.value().size();
334 
335  auto* rk = getClientHandle();
336  auto* opaquePtr = deliveryCbOpaque.get();
337 
338  constexpr std::size_t VU_LIST_SIZE_WITH_NO_HEADERS = 6;
339  std::vector<rd_kafka_vu_t> rkVUs(VU_LIST_SIZE_WITH_NO_HEADERS + record.headers().size());
340 
341  std::size_t uvCount = 0;
342 
343  { // Topic
344  auto& vu = rkVUs[uvCount++];
345  vu.vtype = RD_KAFKA_VTYPE_TOPIC;
346  vu.u.cstr = topic;
347  }
348 
349  { // Partition
350  auto& vu = rkVUs[uvCount++];
351  vu.vtype = RD_KAFKA_VTYPE_PARTITION;
352  vu.u.i32 = partition;
353  }
354 
355  { // Message flags
356  auto& vu = rkVUs[uvCount++];
357  vu.vtype = RD_KAFKA_VTYPE_MSGFLAGS;
358  vu.u.i = static_cast<int>(msgFlags);
359  }
360 
361  { // Key
362  auto& vu = rkVUs[uvCount++];
363  vu.vtype = RD_KAFKA_VTYPE_KEY;
364  vu.u.mem.ptr = const_cast<void*>(keyPtr); // NOLINT
365  vu.u.mem.size = keyLen;
366  }
367 
368  { // Value
369  auto& vu = rkVUs[uvCount++];
370  vu.vtype = RD_KAFKA_VTYPE_VALUE;
371  vu.u.mem.ptr = const_cast<void*>(valuePtr); // NOLINT
372  vu.u.mem.size = valueLen;
373  }
374 
375  { // Opaque
376  auto& vu = rkVUs[uvCount++];
377  vu.vtype = RD_KAFKA_VTYPE_OPAQUE;
378  vu.u.ptr = opaquePtr;
379  }
380 
381  // Headers
382  for (const auto& header: record.headers())
383  {
384  auto& vu = rkVUs[uvCount++];
385  vu.vtype = RD_KAFKA_VTYPE_HEADER;
386  vu.u.header.name = header.key.c_str();
387  vu.u.header.val = header.value.data();
388  vu.u.header.size = static_cast<int64_t>(header.value.size());
389  }
390 
391  assert(uvCount == rkVUs.size());
392 
393  const Error sendResult{ rd_kafka_produceva(rk, rkVUs.data(), rkVUs.size()) };
394  KAFKA_THROW_IF_WITH_ERROR(sendResult);
395 
396  // KafkaProducer::deliveryCallback would delete the "opaque"
397  deliveryCbOpaque.release();
398 }
399 
402 {
403  Optional<Error> deliveryResult;
404  producer::RecordMetadata recordMetadata;
405  std::mutex mtx;
406  std::condition_variable delivered;
407 
408  auto deliveryCb = [&deliveryResult, &recordMetadata, &mtx, &delivered] (const producer::RecordMetadata& metadata, const Error& error) {
409  const std::lock_guard<std::mutex> guard(mtx);
410 
411  deliveryResult = error;
412  recordMetadata = metadata;
413 
414  delivered.notify_one();
415  };
416 
417  send(record, deliveryCb);
418 
419  std::unique_lock<std::mutex> lock(mtx);
420  delivered.wait(lock, [&deliveryResult]{ return static_cast<bool>(deliveryResult); });
421 
422  KAFKA_THROW_IF_WITH_ERROR(*deliveryResult); // NOLINT
423 
424  return recordMetadata;
425 }
426 
427 inline Error
428 KafkaProducer::flush(std::chrono::milliseconds timeout)
429 {
430  return Error{rd_kafka_flush(getClientHandle(), convertMsDurationToInt(timeout))};
431 }
432 
433 inline Error
435 {
436  return Error{rd_kafka_purge(getClientHandle(),
437  (static_cast<unsigned>(RD_KAFKA_PURGE_F_QUEUE) | static_cast<unsigned>(RD_KAFKA_PURGE_F_INFLIGHT)))};
438 }
439 
440 inline void
441 KafkaProducer::close(std::chrono::milliseconds timeout)
442 {
443  _opened = false;
444 
445  stopBackgroundPollingIfNecessary();
446 
447  const Error result = flush(timeout);
448  if (result.value() == RD_KAFKA_RESP_ERR__TIMED_OUT)
449  {
450  KAFKA_API_DO_LOG(Log::Level::Notice, "purge messages before close, outQLen[%d]", rd_kafka_outq_len(getClientHandle()));
451  purge();
452  }
453 
454  rd_kafka_poll(getClientHandle(), 0);
455 
456  KAFKA_API_DO_LOG(Log::Level::Notice, "closed");
457 
458 }
459 
460 inline void
461 KafkaProducer::initTransactions(std::chrono::milliseconds timeout)
462 {
463  const Error result{ rd_kafka_init_transactions(getClientHandle(), static_cast<int>(timeout.count())) };
464  KAFKA_THROW_IF_WITH_ERROR(result);
465 }
466 
467 inline void
469 {
470  const Error result{ rd_kafka_begin_transaction(getClientHandle()) };
471  KAFKA_THROW_IF_WITH_ERROR(result);
472 }
473 
474 inline void
475 KafkaProducer::commitTransaction(std::chrono::milliseconds timeout)
476 {
477  const Error result{ rd_kafka_commit_transaction(getClientHandle(), static_cast<int>(timeout.count())) };
478  KAFKA_THROW_IF_WITH_ERROR(result);
479 }
480 
481 inline void
482 KafkaProducer::abortTransaction(std::chrono::milliseconds timeout)
483 {
484  const Error result{ rd_kafka_abort_transaction(getClientHandle(), static_cast<int>(timeout.count())) };
485  KAFKA_THROW_IF_WITH_ERROR(result);
486 }
487 
488 inline void
489 KafkaProducer::sendOffsetsToTransaction(const TopicPartitionOffsets& topicPartitionOffsets,
490  const consumer::ConsumerGroupMetadata& groupMetadata,
491  std::chrono::milliseconds timeout)
492 {
493  auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topicPartitionOffsets));
494  const Error result{ rd_kafka_send_offsets_to_transaction(getClientHandle(),
495  rk_tpos.get(),
496  groupMetadata.rawHandle(),
497  static_cast<int>(timeout.count())) };
498  KAFKA_THROW_IF_WITH_ERROR(result);
499 }
500 
501 } } } // end of KAFKA_API::clients::producer
502 
Unified error type.
Definition: Error.h:32
int value() const
Obtains the underlying error code value.
Definition: Error.h:81
Specific exception for Kafka clients.
Definition: KafkaException.h:22
const Error & error() const
Obtains the underlying error.
Definition: KafkaException.h:34
The properties for Kafka clients.
Definition: Properties.h:24
The base class for Kafka clients.
Definition: KafkaClient.h:34
const Properties & properties() const
Return the properties which took effect.
Definition: KafkaClient.h:56
A metadata struct containing the consumer group information.
Definition: ConsumerCommon.h:54
KafkaProducer class.
Definition: KafkaProducer.h:29
void abortTransaction(std::chrono::milliseconds timeout=InfiniteTimeout)
Abort the ongoing transaction.
Definition: KafkaProducer.h:482
void commitTransaction(std::chrono::milliseconds timeout=InfiniteTimeout)
Commit the ongoing transaction.
Definition: KafkaProducer.h:475
producer::RecordMetadata syncSend(const producer::ProducerRecord &record)
Synchronously send a record to a topic.
Definition: KafkaProducer.h:401
void send(const producer::ProducerRecord &record, const producer::Callback &deliveryCb, SendOption option=SendOption::NoCopyRecordValue, ActionWhileQueueIsFull action=ActionWhileQueueIsFull::Block)
Asynchronously send a record to a topic.
Definition: KafkaProducer.h:318
void close(std::chrono::milliseconds timeout=InfiniteTimeout)
Close this producer.
Definition: KafkaProducer.h:441
Error flush(std::chrono::milliseconds timeout=InfiniteTimeout)
Invoking this method makes all buffered records immediately available to send, and blocks on the comp...
Definition: KafkaProducer.h:428
void beginTransaction()
Should be called before the start of each new transaction.
Definition: KafkaProducer.h:468
~KafkaProducer() override
The destructor for KafkaProducer.
Definition: KafkaProducer.h:43
ActionWhileQueueIsFull
Choose the action while the sending buffer is full.
Definition: KafkaProducer.h:71
void sendOffsetsToTransaction(const TopicPartitionOffsets &topicPartitionOffsets, const consumer::ConsumerGroupMetadata &groupMetadata, std::chrono::milliseconds timeout)
Send a list of specified offsets to the consumer group coodinator, and also marks those offsets as pa...
Definition: KafkaProducer.h:489
SendOption
Options for sending messages.
Definition: KafkaProducer.h:66
KafkaProducer(const Properties &properties)
The constructor for KafkaProducer.
Definition: KafkaProducer.h:219
Error purge()
Purge messages currently handled by the KafkaProducer.
Definition: KafkaProducer.h:434
void send(const producer::ProducerRecord &record, const producer::Callback &deliveryCb, Error &error, SendOption option=SendOption::NoCopyRecordValue, ActionWhileQueueIsFull action=ActionWhileQueueIsFull::Block)
Asynchronously send a record to a topic.
Definition: KafkaProducer.h:114
void initTransactions(std::chrono::milliseconds timeout=InfiniteTimeout)
Needs to be called before any other methods when the transactional.id is set in the configuration.
Definition: KafkaProducer.h:461
static constexpr const char * PARTITIONER
The default partitioner for a ProducerRecord (with no partition assigned).
Definition: ProducerConfig.h:97
static constexpr const char * MAX_IN_FLIGHT
Maximum number of in-flight requests per broker connection.
Definition: ProducerConfig.h:103
static constexpr const char * ENABLE_IDEMPOTENCE
When set to true, the producer will ensure that messages are succefully sent exactly once and in the ...
Definition: ProducerConfig.h:109
static constexpr const char * ACKS
The acks parameter controls how many partition replicas must receive the record before the producer c...
Definition: ProducerConfig.h:31
A key/value pair to be sent to Kafka.
Definition: ProducerRecord.h:19
Key key() const
The key (or null if no key is specified).
Definition: ProducerRecord.h:48
Partition partition() const
The partition to which the record will be sent (or UNKNOWN_PARTITION if no partition was specified).
Definition: ProducerRecord.h:43
const Topic & topic() const
The topic this record is being sent to.
Definition: ProducerRecord.h:38
Optional< Id > id() const
The id to identify the message (consistent with Producer::Metadata::recordId()).
Definition: ProducerRecord.h:58
Value value() const
The value.
Definition: ProducerRecord.h:53
const Headers & headers() const
The headers.
Definition: ProducerRecord.h:63
The metadata for a record that has been acknowledged by the server.
Definition: ProducerCommon.h:22