Modern C++ Kafka API
KafkaClient.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/BrokerMetadata.h>
6 #include <kafka/ClientCommon.h>
7 #include <kafka/ClientConfig.h>
8 #include <kafka/Error.h>
9 #include <kafka/Interceptors.h>
10 #include <kafka/KafkaException.h>
11 #include <kafka/Log.h>
12 #include <kafka/Properties.h>
13 #include <kafka/RdKafkaHelper.h>
14 #include <kafka/Types.h>
15 
16 #include <librdkafka/rdkafka.h>
17 
18 #include <atomic>
19 #include <cassert>
20 #include <climits>
21 #include <functional>
22 #include <memory>
23 #include <string>
24 #include <thread>
25 #include <vector>
26 
27 
28 namespace KAFKA_API { namespace clients {
29 
34 {
35 public:
36  virtual ~KafkaClient() = default;
37 
41  const std::string& clientId() const { return _clientId; }
42 
46  const std::string& name() const { return _clientName; }
47 
51  void setLogLevel(int level);
52 
56  const Properties& properties() const { return _properties; }
57 
61  Optional<std::string> getProperty(const std::string& name) const;
62 
67  void pollEvents(std::chrono::milliseconds timeout)
68  {
69  _pollable->poll(convertMsDurationToInt(timeout));
70  }
71 
76  Optional<BrokerMetadata> fetchBrokerMetadata(const std::string& topic,
77  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_METADATA_TIMEOUT_MS),
78  bool disableErrorLogging = false);
79 
80  template<class ...Args>
81  void doLog(int level, const char* filename, int lineno, const char* format, Args... args) const
82  {
83  if (level >= 0 && level <= _logLevel && _logCb)
84  {
85  LogBuffer<LOG_BUFFER_SIZE> logBuffer;
86  logBuffer.print("%s ", name().c_str()).print(format, args...);
87  _logCb(level, filename, lineno, logBuffer.c_str());
88  }
89  }
90 
91  void doLog(int level, const char* filename, int lineno, const char* msg) const
92  {
93  doLog(level, filename, lineno, "%s", msg);
94  }
95 
96 #define KAFKA_API_DO_LOG(lvl, ...) doLog(lvl, __FILE__, __LINE__, ##__VA_ARGS__)
97 
98 #if COMPILER_SUPPORTS_CPP_17
99  static constexpr int DEFAULT_METADATA_TIMEOUT_MS = 10000;
100 #else
101  enum { DEFAULT_METADATA_TIMEOUT_MS = 10000 };
102 #endif
103 
104 protected:
105  // There're 3 derived classes: KafkaConsumer, KafkaProducer, AdminClient
106  enum class ClientType { KafkaConsumer, KafkaProducer, AdminClient };
107 
108  using ConfigCallbacksRegister = std::function<void(rd_kafka_conf_t*)>;
109 
110  KafkaClient(ClientType clientType,
111  const Properties& properties,
112  const ConfigCallbacksRegister& extraConfigRegister = ConfigCallbacksRegister{});
113 
114  rd_kafka_t* getClientHandle() const { return _rk.get(); }
115 
116  static const KafkaClient& kafkaClient(const rd_kafka_t* rk) { return *static_cast<const KafkaClient*>(rd_kafka_opaque(rk)); }
117  static KafkaClient& kafkaClient(rd_kafka_t* rk) { return *static_cast<KafkaClient*>(rd_kafka_opaque(rk)); }
118 
119  static constexpr int TIMEOUT_INFINITE = -1;
120 
121  static int convertMsDurationToInt(std::chrono::milliseconds ms)
122  {
123  return ms > std::chrono::milliseconds(INT_MAX) ? TIMEOUT_INFINITE : static_cast<int>(ms.count());
124  }
125 
126  void setLogCallback(LogCallback cb) { _logCb = std::move(cb); }
127  void setStatsCallback(StatsCallback cb) { _statsCb = std::move(cb); }
128  void setErrorCallback(ErrorCallback cb) { _errorCb = std::move(cb); }
129  void setOauthbearerTokenRefreshCallback(OauthbearerTokenRefreshCallback cb) { _oauthbearerTokenRefreshCb = std::move(cb); }
130 
131  void setInterceptors(Interceptors interceptors) { _interceptors = std::move(interceptors); }
132 
133  // Show whether it's using automatical events polling
134  bool isWithAutoEventsPolling() const { return !_enableManualEventsPoll; }
135 
136  // Buffer size for single line logging
137  static const constexpr int LOG_BUFFER_SIZE = 1024;
138 
139  // Validate properties (and fix it if necesary)
140  static Properties validateAndReformProperties(const Properties& properties);
141 
142  // To avoid double-close
143  bool _opened = false;
144 
145  // Accepted properties
146  Properties _properties;
147 
148 #if COMPILER_SUPPORTS_CPP_17
149  static constexpr int EVENT_POLLING_INTERVAL_MS = 100;
150 #else
151  enum { EVENT_POLLING_INTERVAL_MS = 100 };
152 #endif
153 
154 private:
155  std::string _clientId;
156  std::string _clientName;
157 
158  std::atomic<int> _logLevel = {Log::Level::Notice};
159 
160  LogCallback _logCb = DefaultLogger;
161  StatsCallback _statsCb;
162  ErrorCallback _errorCb;
163  OauthbearerTokenRefreshCallback _oauthbearerTokenRefreshCb;
164 
165  bool _enableManualEventsPoll = false;
166  Interceptors _interceptors;
167 
168  rd_kafka_unique_ptr _rk;
169 
170  static std::string getClientTypeString(ClientType type)
171  {
172  return (type == ClientType::KafkaConsumer ? "KafkaConsumer"
173  : (type == ClientType::KafkaProducer ? "KafkaProducer" : "AdminClient"));
174  }
175 
176  // Log callback (for librdkafka)
177  static void logCallback(const rd_kafka_t* rk, int level, const char* fac, const char* buf);
178 
179  // Statistics callback (for librdkafka)
180  static int statsCallback(rd_kafka_t* rk, char* jsonStrBuf, size_t jsonStrLen, void* opaque);
181 
182  // Error callback (for librdkafka)
183  static void errorCallback(rd_kafka_t* rk, int err, const char* reason, void* opaque);
184 
185  // OAUTHBEARER Toker Refresh Callback (for librdkafka)
186  static void oauthbearerTokenRefreshCallback(rd_kafka_t* rk, const char* oauthbearerConfig, void* /*opaque*/);
187 
188  // Interceptor callback (for librdkafka)
189  static rd_kafka_resp_err_t configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* conf, void* opaque, char* errStr, std::size_t maxErrStrSize);
190  static rd_kafka_resp_err_t interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque);
191  static rd_kafka_resp_err_t interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque);
192  static rd_kafka_resp_err_t interceptorOnBrokerStateChange(rd_kafka_t* rk, int id, const char* secproto, const char* host, int port, const char* state, void* opaque);
193 
194  // Log callback (for class instance)
195  void onLog(int level, const char* fac, const char* buf) const;
196 
197  // Stats callback (for class instance)
198  void onStats(const std::string& jsonString);
199 
200  // Error callback (for class instance)
201  void onError(const Error& error);
202 
203  // OAUTHBEARER Toker Refresh Callback (for class instance)
204  SaslOauthbearerToken onOauthbearerTokenRefresh(const std::string& oauthbearerConfig);
205 
206  // Interceptor callback (for class instance)
207  void interceptThreadStart(const std::string& threadName, const std::string& threadType);
208  void interceptThreadExit(const std::string& threadName, const std::string& threadType);
209  void interceptBrokerStateChange(int id, const std::string& secproto, const std::string& host, int port, const std::string& state);
210 
211 protected:
212  struct Pollable
213  {
214  virtual ~Pollable() = default;
215  virtual void poll(int timeoutMs) = 0;
216  };
217 
218  class PollableCallback: public Pollable
219  {
220  public:
221  using Callback = std::function<void(int)>;
222 
223  explicit PollableCallback(Callback cb): _cb(std::move(cb)) {}
224 
225  void poll(int timeoutMs) override { _cb(timeoutMs); }
226 
227  private:
228  const Callback _cb;
229  };
230 
231  class PollThread
232  {
233  public:
234  using InterceptorCb = std::function<void()>;
235  explicit PollThread(const InterceptorCb& entryCb, const InterceptorCb& exitCb, Pollable& pollable)
236  : _running(true), _thread(keepPolling, std::ref(_running), entryCb, exitCb, std::ref(pollable))
237  {
238  }
239 
240  ~PollThread()
241  {
242  _running = false;
243 
244  if (_thread.joinable()) _thread.join();
245  }
246 
247  private:
248  static void keepPolling(std::atomic_bool& running,
249  const InterceptorCb& entryCb,
250  const InterceptorCb& exitCb,
251  Pollable& pollable)
252  {
253  entryCb();
254 
255  while (running.load())
256  {
257  pollable.poll(CALLBACK_POLLING_INTERVAL_MS);
258  }
259 
260  exitCb();
261  }
262 
263  static constexpr int CALLBACK_POLLING_INTERVAL_MS = 10;
264 
265  std::atomic_bool _running;
266  std::thread _thread;
267  };
268 
269  void startBackgroundPollingIfNecessary(const PollableCallback::Callback& pollableCallback)
270  {
271  _pollable = std::make_unique<KafkaClient::PollableCallback>(pollableCallback);
272 
273  auto entryCb = [this]() { interceptThreadStart("events-polling", "background"); };
274  auto exitCb = [this]() { interceptThreadExit("events-polling", "background"); };
275 
276  if (isWithAutoEventsPolling()) _pollThread = std::make_unique<PollThread>(entryCb, exitCb, *_pollable);
277  }
278 
279  void stopBackgroundPollingIfNecessary()
280  {
281  _pollThread.reset(); // Join the polling thread (in case it's running)
282 
283  _pollable.reset();
284  }
285 
286 private:
287  std::unique_ptr<Pollable> _pollable;
288  std::unique_ptr<PollThread> _pollThread;
289 };
290 
291 
292 inline
293 KafkaClient::KafkaClient(ClientType clientType,
294  const Properties& properties,
295  const ConfigCallbacksRegister& extraConfigRegister)
296 {
297  static const std::set<std::string> PRIVATE_PROPERTY_KEYS = { "max.poll.records", "enable.manual.events.poll" };
298 
299  // Save clientID
301  {
302  _clientId = *clientId;
303  _clientName = getClientTypeString(clientType) + "[" + _clientId + "]";
304  }
305 
306  // Log Callback
307  if (properties.contains("log_cb"))
308  {
309  setLogCallback(properties.get<LogCallback>("log_cb"));
310  }
311 
312  // Save LogLevel
313  if (auto logLevel = properties.getProperty(Config::LOG_LEVEL))
314  {
315  try
316  {
317  _logLevel = std::stoi(*logLevel);
318  }
319  catch (const std::exception& e)
320  {
321  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be an number!").append(e.what())));
322  }
323 
324  if (_logLevel < Log::Level::Emerg || _logLevel > Log::Level::Debug)
325  {
326  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be a value between 0 and 7!")));
327  }
328  }
329 
330  // Save "enable.manual.events.poll" option
331  if (auto enableManualEventsPoll = properties.getProperty(Config::ENABLE_MANUAL_EVENTS_POLL))
332  {
333  if (*enableManualEventsPoll == "true" || *enableManualEventsPoll == "t" || *enableManualEventsPoll == "1")
334  {
335  _enableManualEventsPoll = true;
336  }
337  else if (*enableManualEventsPoll == "false" || *enableManualEventsPoll == "f" || *enableManualEventsPoll == "0")
338  {
339  _enableManualEventsPoll = false;
340  }
341  else
342  {
343  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid option[" + *enableManualEventsPoll + "] for \"enable.manual.events.poll\", which must be a bool value (true or false)!")));
344  }
345  }
346 
347  LogBuffer<LOG_BUFFER_SIZE> errInfo;
348 
349  auto rk_conf = rd_kafka_conf_unique_ptr(rd_kafka_conf_new());
350 
351  for (const auto& prop: properties.map())
352  {
353  const auto& k = prop.first;
354  const auto& v = properties.getProperty(k);
355  if (!v) continue;
356 
357  // Those private properties are only available for `C++ wrapper`, not for librdkafka
358  if (PRIVATE_PROPERTY_KEYS.count(prop.first))
359  {
360  _properties.put(prop.first, prop.second);
361  continue;
362  }
363 
364  const rd_kafka_conf_res_t result = rd_kafka_conf_set(rk_conf.get(),
365  k.c_str(),
366  v->c_str(),
367  errInfo.str(),
368  errInfo.capacity());
369  if (result == RD_KAFKA_CONF_OK)
370  {
371  _properties.put(prop.first, prop.second);
372  }
373  else
374  {
375  KAFKA_API_DO_LOG(Log::Level::Err, "failed to be initialized with property[%s:%s], result[%d]: %s", k.c_str(), v->c_str(), result, errInfo.c_str());
376  }
377  }
378 
379  // Save KafkaClient's raw pointer to the "opaque" field, thus we could fetch it later (for kinds of callbacks)
380  rd_kafka_conf_set_opaque(rk_conf.get(), this);
381 
382  // Log Callback
383  if (properties.contains("log_cb"))
384  {
385  rd_kafka_conf_set_log_cb(rk_conf.get(), KafkaClient::logCallback);
386  }
387 
388  // Statistics Callback
389  if (properties.contains("stats_cb"))
390  {
391  setStatsCallback(properties.get<StatsCallback>("stats_cb"));
392 
393  rd_kafka_conf_set_stats_cb(rk_conf.get(), KafkaClient::statsCallback);
394  }
395 
396  // Error Callback
397  if (properties.contains("error_cb"))
398  {
399  setErrorCallback(properties.get<ErrorCallback>("error_cb"));
400 
401  rd_kafka_conf_set_error_cb(rk_conf.get(), KafkaClient::errorCallback);
402  }
403 
404  // OAUTHBEARER Toker Refresh Callback
405  if (properties.contains("oauthbearer_token_refresh_cb"))
406  {
407  setOauthbearerTokenRefreshCallback(properties.get<OauthbearerTokenRefreshCallback>("oauthbearer_token_refresh_cb"));
408 
409  rd_kafka_conf_set_oauthbearer_token_refresh_cb(rk_conf.get(), KafkaClient::oauthbearerTokenRefreshCallback);
410  }
411 
412  // Interceptor
413  if (properties.contains("interceptors"))
414  {
415  setInterceptors(properties.get<Interceptors>("interceptors"));
416 
417  const Error result{ rd_kafka_conf_interceptor_add_on_new(rk_conf.get(), "on_new", KafkaClient::configInterceptorOnNew, nullptr) };
418  KAFKA_THROW_IF_WITH_ERROR(result);
419  }
420 
421  // Other Callbacks
422  if (extraConfigRegister) extraConfigRegister(rk_conf.get());
423 
424  // Set client handler
425  _rk.reset(rd_kafka_new((clientType == ClientType::KafkaConsumer ? RD_KAFKA_CONSUMER : RD_KAFKA_PRODUCER),
426  rk_conf.release(), // rk_conf's ownship would be transferred to rk, after the "rd_kafka_new()" call
427  errInfo.clear().str(),
428  errInfo.capacity()));
429  KAFKA_THROW_IF_WITH_ERROR(Error(rd_kafka_last_error()));
430 
431  // Add brokers
433  if (!brokers || rd_kafka_brokers_add(getClientHandle(), brokers->c_str()) == 0)
434  {
435  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
436  "No broker could be added successfully, BOOTSTRAP_SERVERS=[" + (brokers ? *brokers : "NA") + "]"));
437  }
438 
439  _opened = true;
440 }
441 
442 inline Properties
443 KafkaClient::validateAndReformProperties(const Properties& properties)
444 {
445  auto newProperties = properties;
446 
447  // BOOTSTRAP_SERVERS property is mandatory
448  if (!newProperties.getProperty(Config::BOOTSTRAP_SERVERS))
449  {
450  KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
451  "Validation failed! With no property [" + std::string(Config::BOOTSTRAP_SERVERS) + "]"));
452  }
453 
454  // If no "client.id" configured, generate a random one for user
455  if (!newProperties.getProperty(Config::CLIENT_ID))
456  {
457  newProperties.put(Config::CLIENT_ID, utility::getRandomString());
458  }
459 
460  // If no "log_level" configured, use Log::Level::Notice as default
461  if (!newProperties.getProperty(Config::LOG_LEVEL))
462  {
463  newProperties.put(Config::LOG_LEVEL, std::to_string(static_cast<int>(Log::Level::Notice)));
464  }
465 
466  return newProperties;
467 }
468 
469 inline Optional<std::string>
470 KafkaClient::getProperty(const std::string& name) const
471 {
472  // Find it in pre-saved properties
473  if (auto property = _properties.getProperty(name)) return *property;
474 
475  const rd_kafka_conf_t* conf = rd_kafka_conf(getClientHandle());
476 
477  constexpr int DEFAULT_BUF_SIZE = 512;
478 
479  std::vector<char> valueBuf(DEFAULT_BUF_SIZE);
480  std::size_t valueSize = valueBuf.size();
481 
482  // Try with a default buf size. If could not find the property, return immediately.
483  if (rd_kafka_conf_get(conf, name.c_str(), valueBuf.data(), &valueSize) != RD_KAFKA_CONF_OK) return Optional<std::string>{};
484 
485  // If the default buf size is not big enough, retry with a larger one
486  if (valueSize > valueBuf.size())
487  {
488  valueBuf.resize(valueSize);
489  [[maybe_unused]] const rd_kafka_conf_res_t result = rd_kafka_conf_get(conf, name.c_str(), valueBuf.data(), &valueSize);
490  assert(result == RD_KAFKA_CONF_OK);
491  }
492 
493  return std::string(valueBuf.data());
494 }
495 
496 inline void
498 {
499  _logLevel = level < Log::Level::Emerg ? Log::Level::Emerg : (level > Log::Level::Debug ? Log::Level::Debug : level);
500  rd_kafka_set_log_level(getClientHandle(), _logLevel);
501 }
502 
503 inline void
504 KafkaClient::onLog(int level, const char* fac, const char* buf) const
505 {
506  doLog(level, "LIBRDKAFKA", 0, "%s | %s", fac, buf); // The log is coming from librdkafka
507 }
508 
509 inline void
510 KafkaClient::logCallback(const rd_kafka_t* rk, int level, const char* fac, const char* buf)
511 {
512  kafkaClient(rk).onLog(level, fac, buf);
513 }
514 
515 inline void
516 KafkaClient::onStats(const std::string& jsonString)
517 {
518  if (_statsCb) _statsCb(jsonString);
519 }
520 
521 inline int
522 KafkaClient::statsCallback(rd_kafka_t* rk, char* jsonStrBuf, size_t jsonStrLen, void* /*opaque*/)
523 {
524  const std::string stats(jsonStrBuf, jsonStrBuf+jsonStrLen);
525  kafkaClient(rk).onStats(stats);
526  return 0;
527 }
528 
529 inline void
530 KafkaClient::onError(const Error& error)
531 {
532  if (_errorCb) _errorCb(error);
533 }
534 
535 inline SaslOauthbearerToken
536 KafkaClient::onOauthbearerTokenRefresh(const std::string& oauthbearerConfig)
537 {
538  if (!_oauthbearerTokenRefreshCb)
539  {
540  throw std::runtime_error("No OAUTHBEARER token refresh callback configured!");
541  }
542 
543  return _oauthbearerTokenRefreshCb(oauthbearerConfig);
544 }
545 
546 inline void
547 KafkaClient::errorCallback(rd_kafka_t* rk, int err, const char* reason, void* /*opaque*/)
548 {
549  auto respErr = static_cast<rd_kafka_resp_err_t>(err);
550 
551  Error error;
552  if (respErr != RD_KAFKA_RESP_ERR__FATAL)
553  {
554  error = Error{respErr, reason};
555  }
556  else
557  {
558  LogBuffer<LOG_BUFFER_SIZE> errInfo;
559  respErr = rd_kafka_fatal_error(rk, errInfo.str(), errInfo.capacity());
560  error = Error{respErr, errInfo.c_str(), true};
561  }
562 
563  kafkaClient(rk).onError(error);
564 }
565 
566 inline void
567 KafkaClient::oauthbearerTokenRefreshCallback(rd_kafka_t* rk, const char* oauthbearerConfig, void* /* opaque */)
568 {
569  SaslOauthbearerToken oauthbearerToken;
570 
571  try
572  {
573  oauthbearerToken = kafkaClient(rk).onOauthbearerTokenRefresh(oauthbearerConfig != nullptr ? oauthbearerConfig : "");
574  }
575  catch (const std::exception& e)
576  {
577  rd_kafka_oauthbearer_set_token_failure(rk, e.what());
578  return;
579  }
580 
581  LogBuffer<LOG_BUFFER_SIZE> errInfo;
582 
583  std::vector<const char*> extensions;
584  extensions.reserve(oauthbearerToken.extensions.size() * 2);
585  for (const auto& kv: oauthbearerToken.extensions)
586  {
587  extensions.push_back(kv.first.c_str());
588  extensions.push_back(kv.second.c_str());
589  }
590 
591  if (rd_kafka_oauthbearer_set_token(rk,
592  oauthbearerToken.value.c_str(),
593  oauthbearerToken.mdLifetime.count(),
594  oauthbearerToken.mdPrincipalName.c_str(),
595  extensions.data(), extensions.size(),
596  errInfo.str(), errInfo.capacity()) != RD_KAFKA_RESP_ERR_NO_ERROR)
597  {
598  rd_kafka_oauthbearer_set_token_failure(rk, errInfo.c_str());
599  }
600 }
601 
602 inline void
603 KafkaClient::interceptThreadStart(const std::string& threadName, const std::string& threadType)
604 {
605  if (const auto& cb = _interceptors.onThreadStart()) cb(threadName, threadType);
606 }
607 
608 inline void
609 KafkaClient::interceptThreadExit(const std::string& threadName, const std::string& threadType)
610 {
611  if (const auto& cb = _interceptors.onThreadExit()) cb(threadName, threadType);
612 }
613 
614 inline void
615 KafkaClient::interceptBrokerStateChange(int id, const std::string& secproto, const std::string& host, int port, const std::string& state)
616 {
617  if (const auto& cb = _interceptors.onBrokerStateChange()) cb(id, secproto, host, port, state);
618 }
619 
620 inline rd_kafka_resp_err_t
621 KafkaClient::configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* /*conf*/, void* opaque, char* /*errStr*/, std::size_t /*maxErrStrSize*/)
622 {
623  if (auto result = rd_kafka_interceptor_add_on_thread_start(rk, "on_thread_start", KafkaClient::interceptorOnThreadStart, opaque))
624  {
625  return result;
626  }
627 
628  if (auto result = rd_kafka_interceptor_add_on_thread_exit(rk, "on_thread_exit", KafkaClient::interceptorOnThreadExit, opaque))
629  {
630  return result;
631  }
632 
633  if (auto result = rd_kafka_interceptor_add_on_broker_state_change(rk, "on_broker_state_change", KafkaClient::interceptorOnBrokerStateChange, opaque))
634  {
635  return result;
636  }
637 
638  return RD_KAFKA_RESP_ERR_NO_ERROR;
639 }
640 
641 inline rd_kafka_resp_err_t
642 KafkaClient::interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /* opaque */)
643 {
644  kafkaClient(rk).interceptThreadStart(threadName, toString(threadType));
645 
646  return RD_KAFKA_RESP_ERR_NO_ERROR;
647 }
648 
649 inline rd_kafka_resp_err_t
650 KafkaClient::interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /* opaque */)
651 {
652  kafkaClient(rk).interceptThreadExit(threadName, toString(threadType));
653 
654  return RD_KAFKA_RESP_ERR_NO_ERROR;
655 }
656 
657 inline rd_kafka_resp_err_t
658 KafkaClient::interceptorOnBrokerStateChange(rd_kafka_t* rk, int id, const char* secproto, const char* host, int port, const char* state, void* /* opaque */)
659 {
660  kafkaClient(rk).interceptBrokerStateChange(id, secproto, host, port, state);
661 
662  return RD_KAFKA_RESP_ERR_NO_ERROR;
663 }
664 
665 inline Optional<BrokerMetadata>
666 KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::milliseconds timeout, bool disableErrorLogging)
667 {
668  const rd_kafka_metadata_t* rk_metadata = nullptr;
669  // Here the input parameter for `all_topics` is `true`, since we want the `cgrp_update`
670  const rd_kafka_resp_err_t err = rd_kafka_metadata(getClientHandle(),
671  true,
672  nullptr,
673  &rk_metadata,
674  convertMsDurationToInt(timeout));
675 
676  auto guard = rd_kafka_metadata_unique_ptr(rk_metadata);
677 
678  if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
679  {
680  if (!disableErrorLogging)
681  {
682  KAFKA_API_DO_LOG(Log::Level::Err, "failed to get BrokerMetadata! error[%s]", rd_kafka_err2str(err));
683  }
684  return Optional<BrokerMetadata>{};
685  }
686 
687  const rd_kafka_metadata_topic* metadata_topic = nullptr;
688  for (int i = 0; i < rk_metadata->topic_cnt; ++i)
689  {
690  if (rk_metadata->topics[i].topic == topic)
691  {
692  metadata_topic = &rk_metadata->topics[i];
693  break;
694  }
695  }
696 
697  if (!metadata_topic || metadata_topic->err)
698  {
699  if (!disableErrorLogging)
700  {
701  if (!metadata_topic)
702  {
703  KAFKA_API_DO_LOG(Log::Level::Err, "failed to find BrokerMetadata for topic[%s]", topic.c_str());
704  }
705  else
706  {
707  KAFKA_API_DO_LOG(Log::Level::Err, "failed to get BrokerMetadata for topic[%s]! error[%s]", topic.c_str(), rd_kafka_err2str(metadata_topic->err));
708  }
709  }
710  return Optional<BrokerMetadata>{};
711  }
712 
713  // Construct the BrokerMetadata
714  BrokerMetadata metadata(metadata_topic->topic);
715  metadata.setOrigNodeName(rk_metadata->orig_broker_name ? std::string(rk_metadata->orig_broker_name) : "");
716 
717  for (int i = 0; i < rk_metadata->broker_cnt; ++i)
718  {
719  metadata.addNode(rk_metadata->brokers[i].id, rk_metadata->brokers[i].host, rk_metadata->brokers[i].port);
720  }
721 
722  for (int i = 0; i < metadata_topic->partition_cnt; ++i)
723  {
724  const rd_kafka_metadata_partition& metadata_partition = metadata_topic->partitions[i];
725 
726  const Partition partition = metadata_partition.id;
727 
728  if (metadata_partition.err != 0)
729  {
730  if (!disableErrorLogging)
731  {
732  KAFKA_API_DO_LOG(Log::Level::Err, "got error[%s] while constructing BrokerMetadata for topic[%s]-partition[%d]", rd_kafka_err2str(metadata_partition.err), topic.c_str(), partition);
733  }
734 
735  continue;
736  }
737 
738  BrokerMetadata::PartitionInfo partitionInfo(metadata_partition.leader);
739 
740  for (int j = 0; j < metadata_partition.replica_cnt; ++j)
741  {
742  partitionInfo.addReplica(metadata_partition.replicas[j]);
743  }
744 
745  for (int j = 0; j < metadata_partition.isr_cnt; ++j)
746  {
747  partitionInfo.addInSyncReplica(metadata_partition.isrs[j]);
748  }
749 
750  metadata.addPartitionInfo(partition, partitionInfo);
751  }
752 
753  return metadata;
754 }
755 
756 
757 } } // end of KAFKA_API::clients
758 
The properties for Kafka clients.
Definition: Properties.h:24
T & get(const std::string &key) const
Get a property reference.
Definition: Properties.h:155
bool contains(const std::string &key) const
Check whether the map contains a property.
Definition: Properties.h:144
const PropertiesMap & map() const
Get all properties with a map.
Definition: Properties.h:212
Optional< std::string > getProperty(const std::string &key) const
Get a property.
Definition: Properties.h:170
static constexpr const char * BOOTSTRAP_SERVERS
The string contains host:port pairs of brokers (splitted by ",") that the consumer will use to establ...
Definition: ClientConfig.h:62
static constexpr const char * ENABLE_MANUAL_EVENTS_POLL
To poll the events manually (otherwise, it would be done with a background polling thread).
Definition: ClientConfig.h:26
static constexpr const char * LOG_LEVEL
Log level (syslog(3) levels).
Definition: ClientConfig.h:72
static constexpr const char * CLIENT_ID
Client identifier.
Definition: ClientConfig.h:67
The base class for Kafka clients.
Definition: KafkaClient.h:34
Optional< BrokerMetadata > fetchBrokerMetadata(const std::string &topic, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_METADATA_TIMEOUT_MS), bool disableErrorLogging=false)
Fetch matadata from a available broker.
Definition: KafkaClient.h:666
const std::string & clientId() const
Get the client id.
Definition: KafkaClient.h:41
void setLogLevel(int level)
Set log level for the kafka client (the default value: 5).
Definition: KafkaClient.h:497
const Properties & properties() const
Return the properties which took effect.
Definition: KafkaClient.h:56
Optional< std::string > getProperty(const std::string &name) const
Fetch the effected property (including the property internally set by librdkafka).
Definition: KafkaClient.h:470
const std::string & name() const
Get the client name (i.e.
Definition: KafkaClient.h:46
void pollEvents(std::chrono::milliseconds timeout)
Call the OffsetCommit callbacks (if any) Note: The Kafka client should be constructed with option ena...
Definition: KafkaClient.h:67
It is used to describe per-partition state in the MetadataResponse.
Definition: BrokerMetadata.h:57
The metadata info for a topic.
Definition: BrokerMetadata.h:19