3 #include <kafka/Project.h>
5 #include <kafka/BrokerMetadata.h>
6 #include <kafka/ClientCommon.h>
7 #include <kafka/ClientConfig.h>
8 #include <kafka/Error.h>
9 #include <kafka/Interceptors.h>
10 #include <kafka/KafkaException.h>
11 #include <kafka/Log.h>
12 #include <kafka/Properties.h>
13 #include <kafka/RdKafkaHelper.h>
14 #include <kafka/Types.h>
16 #include <librdkafka/rdkafka.h>
28 namespace KAFKA_API {
namespace clients {
41 const std::string&
clientId()
const {
return _clientId; }
46 const std::string&
name()
const {
return _clientName; }
69 _pollable->poll(convertMsDurationToInt(timeout));
77 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_METADATA_TIMEOUT_MS),
78 bool disableErrorLogging =
false);
80 template<
class ...Args>
81 void doLog(
int level,
const char* filename,
int lineno,
const char* format, Args... args)
const
83 if (level >= 0 && level <= _logLevel && _logCb)
85 LogBuffer<LOG_BUFFER_SIZE> logBuffer;
86 logBuffer.print(
"%s ",
name().c_str()).print(format, args...);
87 _logCb(level, filename, lineno, logBuffer.c_str());
91 void doLog(
int level,
const char* filename,
int lineno,
const char* msg)
const
93 doLog(level, filename, lineno,
"%s", msg);
96 #define KAFKA_API_DO_LOG(lvl, ...) doLog(lvl, __FILE__, __LINE__, ##__VA_ARGS__)
98 #if COMPILER_SUPPORTS_CPP_17
99 static constexpr
int DEFAULT_METADATA_TIMEOUT_MS = 10000;
101 enum { DEFAULT_METADATA_TIMEOUT_MS = 10000 };
106 enum class ClientType { KafkaConsumer, KafkaProducer, AdminClient };
108 using ConfigCallbacksRegister = std::function<void(rd_kafka_conf_t*)>;
110 KafkaClient(ClientType clientType,
112 const ConfigCallbacksRegister& extraConfigRegister = ConfigCallbacksRegister{});
114 rd_kafka_t* getClientHandle()
const {
return _rk.get(); }
116 static const KafkaClient& kafkaClient(
const rd_kafka_t* rk) {
return *
static_cast<const KafkaClient*
>(rd_kafka_opaque(rk)); }
117 static KafkaClient& kafkaClient(rd_kafka_t* rk) {
return *
static_cast<KafkaClient*
>(rd_kafka_opaque(rk)); }
119 static constexpr
int TIMEOUT_INFINITE = -1;
121 static int convertMsDurationToInt(std::chrono::milliseconds ms)
123 return ms > std::chrono::milliseconds(INT_MAX) ? TIMEOUT_INFINITE :
static_cast<int>(ms.count());
126 void setLogCallback(LogCallback cb) { _logCb = std::move(cb); }
127 void setStatsCallback(StatsCallback cb) { _statsCb = std::move(cb); }
128 void setErrorCallback(ErrorCallback cb) { _errorCb = std::move(cb); }
129 void setOauthbearerTokenRefreshCallback(OauthbearerTokenRefreshCallback cb) { _oauthbearerTokenRefreshCb = std::move(cb); }
131 void setInterceptors(Interceptors interceptors) { _interceptors = std::move(interceptors); }
134 bool isWithAutoEventsPolling()
const {
return !_enableManualEventsPoll; }
137 static const constexpr
int LOG_BUFFER_SIZE = 1024;
140 static Properties validateAndReformProperties(
const Properties&
properties);
143 bool _opened =
false;
146 Properties _properties;
148 #if COMPILER_SUPPORTS_CPP_17
149 static constexpr
int EVENT_POLLING_INTERVAL_MS = 100;
151 enum { EVENT_POLLING_INTERVAL_MS = 100 };
155 std::string _clientId;
156 std::string _clientName;
158 std::atomic<int> _logLevel = {Log::Level::Notice};
160 LogCallback _logCb = DefaultLogger;
161 StatsCallback _statsCb;
162 ErrorCallback _errorCb;
163 OauthbearerTokenRefreshCallback _oauthbearerTokenRefreshCb;
165 bool _enableManualEventsPoll =
false;
166 Interceptors _interceptors;
168 rd_kafka_unique_ptr _rk;
170 static std::string getClientTypeString(ClientType type)
172 return (type == ClientType::KafkaConsumer ?
"KafkaConsumer"
173 : (type == ClientType::KafkaProducer ?
"KafkaProducer" :
"AdminClient"));
177 static void logCallback(
const rd_kafka_t* rk,
int level,
const char* fac,
const char* buf);
180 static int statsCallback(rd_kafka_t* rk,
char* jsonStrBuf,
size_t jsonStrLen,
void* opaque);
183 static void errorCallback(rd_kafka_t* rk,
int err,
const char* reason,
void* opaque);
186 static void oauthbearerTokenRefreshCallback(rd_kafka_t* rk,
const char* oauthbearerConfig,
void* );
189 static rd_kafka_resp_err_t configInterceptorOnNew(rd_kafka_t* rk,
const rd_kafka_conf_t* conf,
void* opaque,
char* errStr, std::size_t maxErrStrSize);
190 static rd_kafka_resp_err_t interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType,
const char* threadName,
void* opaque);
191 static rd_kafka_resp_err_t interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType,
const char* threadName,
void* opaque);
192 static rd_kafka_resp_err_t interceptorOnBrokerStateChange(rd_kafka_t* rk,
int id,
const char* secproto,
const char* host,
int port,
const char* state,
void* opaque);
195 void onLog(
int level,
const char* fac,
const char* buf)
const;
198 void onStats(
const std::string& jsonString);
201 void onError(
const Error& error);
204 SaslOauthbearerToken onOauthbearerTokenRefresh(
const std::string& oauthbearerConfig);
207 void interceptThreadStart(
const std::string& threadName,
const std::string& threadType);
208 void interceptThreadExit(
const std::string& threadName,
const std::string& threadType);
209 void interceptBrokerStateChange(
int id,
const std::string& secproto,
const std::string& host,
int port,
const std::string& state);
214 virtual ~Pollable() =
default;
215 virtual void poll(
int timeoutMs) = 0;
218 class PollableCallback:
public Pollable
221 using Callback = std::function<void(
int)>;
223 explicit PollableCallback(Callback cb): _cb(std::move(cb)) {}
225 void poll(
int timeoutMs)
override { _cb(timeoutMs); }
234 using InterceptorCb = std::function<void()>;
235 explicit PollThread(
const InterceptorCb& entryCb,
const InterceptorCb& exitCb, Pollable& pollable)
236 : _running(true), _thread(keepPolling, std::ref(_running), entryCb, exitCb, std::ref(pollable))
244 if (_thread.joinable()) _thread.join();
248 static void keepPolling(std::atomic_bool& running,
249 const InterceptorCb& entryCb,
250 const InterceptorCb& exitCb,
255 while (running.load())
257 pollable.poll(CALLBACK_POLLING_INTERVAL_MS);
263 static constexpr
int CALLBACK_POLLING_INTERVAL_MS = 10;
265 std::atomic_bool _running;
269 void startBackgroundPollingIfNecessary(
const PollableCallback::Callback& pollableCallback)
271 _pollable = std::make_unique<KafkaClient::PollableCallback>(pollableCallback);
273 auto entryCb = [
this]() { interceptThreadStart(
"events-polling",
"background"); };
274 auto exitCb = [
this]() { interceptThreadExit(
"events-polling",
"background"); };
276 if (isWithAutoEventsPolling()) _pollThread = std::make_unique<PollThread>(entryCb, exitCb, *_pollable);
279 void stopBackgroundPollingIfNecessary()
287 std::unique_ptr<Pollable> _pollable;
288 std::unique_ptr<PollThread> _pollThread;
293 KafkaClient::KafkaClient(ClientType clientType,
294 const Properties& properties,
295 const ConfigCallbacksRegister& extraConfigRegister)
297 static const std::set<std::string> PRIVATE_PROPERTY_KEYS = {
"max.poll.records",
"enable.manual.events.poll" };
303 _clientName = getClientTypeString(clientType) +
"[" + _clientId +
"]";
317 _logLevel = std::stoi(*logLevel);
319 catch (
const std::exception& e)
321 KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string(
"Invalid log_level[").append(*logLevel).append(
"], which must be an number!").append(e.what())));
324 if (_logLevel < Log::Level::Emerg || _logLevel > Log::Level::Debug)
326 KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string(
"Invalid log_level[").append(*logLevel).append(
"], which must be a value between 0 and 7!")));
333 if (*enableManualEventsPoll ==
"true" || *enableManualEventsPoll ==
"t" || *enableManualEventsPoll ==
"1")
335 _enableManualEventsPoll =
true;
337 else if (*enableManualEventsPoll ==
"false" || *enableManualEventsPoll ==
"f" || *enableManualEventsPoll ==
"0")
339 _enableManualEventsPoll =
false;
343 KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string(
"Invalid option[" + *enableManualEventsPoll +
"] for \"enable.manual.events.poll\", which must be a bool value (true or false)!")));
347 LogBuffer<LOG_BUFFER_SIZE> errInfo;
349 auto rk_conf = rd_kafka_conf_unique_ptr(rd_kafka_conf_new());
353 const auto& k = prop.first;
358 if (PRIVATE_PROPERTY_KEYS.count(prop.first))
360 _properties.put(prop.first, prop.second);
364 const rd_kafka_conf_res_t result = rd_kafka_conf_set(rk_conf.get(),
369 if (result == RD_KAFKA_CONF_OK)
371 _properties.put(prop.first, prop.second);
375 KAFKA_API_DO_LOG(Log::Level::Err,
"failed to be initialized with property[%s:%s], result[%d]: %s", k.c_str(), v->c_str(), result, errInfo.c_str());
380 rd_kafka_conf_set_opaque(rk_conf.get(),
this);
385 rd_kafka_conf_set_log_cb(rk_conf.get(), KafkaClient::logCallback);
393 rd_kafka_conf_set_stats_cb(rk_conf.get(), KafkaClient::statsCallback);
401 rd_kafka_conf_set_error_cb(rk_conf.get(), KafkaClient::errorCallback);
407 setOauthbearerTokenRefreshCallback(
properties.
get<OauthbearerTokenRefreshCallback>(
"oauthbearer_token_refresh_cb"));
409 rd_kafka_conf_set_oauthbearer_token_refresh_cb(rk_conf.get(), KafkaClient::oauthbearerTokenRefreshCallback);
415 setInterceptors(
properties.
get<Interceptors>(
"interceptors"));
417 const Error result{ rd_kafka_conf_interceptor_add_on_new(rk_conf.get(),
"on_new", KafkaClient::configInterceptorOnNew,
nullptr) };
418 KAFKA_THROW_IF_WITH_ERROR(result);
422 if (extraConfigRegister) extraConfigRegister(rk_conf.get());
425 _rk.reset(rd_kafka_new((clientType == ClientType::KafkaConsumer ? RD_KAFKA_CONSUMER : RD_KAFKA_PRODUCER),
427 errInfo.clear().str(),
428 errInfo.capacity()));
429 KAFKA_THROW_IF_WITH_ERROR(Error(rd_kafka_last_error()));
433 if (!brokers || rd_kafka_brokers_add(getClientHandle(), brokers->c_str()) == 0)
435 KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
436 "No broker could be added successfully, BOOTSTRAP_SERVERS=[" + (brokers ? *brokers :
"NA") +
"]"));
443 KafkaClient::validateAndReformProperties(
const Properties& properties)
450 KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
463 newProperties.put(
Config::LOG_LEVEL, std::to_string(
static_cast<int>(Log::Level::Notice)));
466 return newProperties;
469 inline Optional<std::string>
473 if (
auto property = _properties.getProperty(
name))
return *property;
475 const rd_kafka_conf_t* conf = rd_kafka_conf(getClientHandle());
477 constexpr
int DEFAULT_BUF_SIZE = 512;
479 std::vector<char> valueBuf(DEFAULT_BUF_SIZE);
480 std::size_t valueSize = valueBuf.size();
483 if (rd_kafka_conf_get(conf,
name.c_str(), valueBuf.data(), &valueSize) != RD_KAFKA_CONF_OK)
return Optional<std::string>{};
486 if (valueSize > valueBuf.size())
488 valueBuf.resize(valueSize);
489 [[maybe_unused]]
const rd_kafka_conf_res_t result = rd_kafka_conf_get(conf,
name.c_str(), valueBuf.data(), &valueSize);
490 assert(result == RD_KAFKA_CONF_OK);
493 return std::string(valueBuf.data());
499 _logLevel = level < Log::Level::Emerg ? Log::Level::Emerg : (level > Log::Level::Debug ? Log::Level::Debug : level);
500 rd_kafka_set_log_level(getClientHandle(), _logLevel);
504 KafkaClient::onLog(
int level,
const char* fac,
const char* buf)
const
506 doLog(level,
"LIBRDKAFKA", 0,
"%s | %s", fac, buf);
510 KafkaClient::logCallback(
const rd_kafka_t* rk,
int level,
const char* fac,
const char* buf)
512 kafkaClient(rk).onLog(level, fac, buf);
516 KafkaClient::onStats(
const std::string& jsonString)
518 if (_statsCb) _statsCb(jsonString);
522 KafkaClient::statsCallback(rd_kafka_t* rk,
char* jsonStrBuf,
size_t jsonStrLen,
void* )
524 const std::string stats(jsonStrBuf, jsonStrBuf+jsonStrLen);
525 kafkaClient(rk).onStats(stats);
530 KafkaClient::onError(
const Error& error)
532 if (_errorCb) _errorCb(error);
535 inline SaslOauthbearerToken
536 KafkaClient::onOauthbearerTokenRefresh(
const std::string& oauthbearerConfig)
538 if (!_oauthbearerTokenRefreshCb)
540 throw std::runtime_error(
"No OAUTHBEARER token refresh callback configured!");
543 return _oauthbearerTokenRefreshCb(oauthbearerConfig);
547 KafkaClient::errorCallback(rd_kafka_t* rk,
int err,
const char* reason,
void* )
549 auto respErr =
static_cast<rd_kafka_resp_err_t
>(err);
552 if (respErr != RD_KAFKA_RESP_ERR__FATAL)
554 error = Error{respErr, reason};
558 LogBuffer<LOG_BUFFER_SIZE> errInfo;
559 respErr = rd_kafka_fatal_error(rk, errInfo.str(), errInfo.capacity());
560 error = Error{respErr, errInfo.c_str(),
true};
563 kafkaClient(rk).onError(error);
567 KafkaClient::oauthbearerTokenRefreshCallback(rd_kafka_t* rk,
const char* oauthbearerConfig,
void* )
569 SaslOauthbearerToken oauthbearerToken;
573 oauthbearerToken = kafkaClient(rk).onOauthbearerTokenRefresh(oauthbearerConfig !=
nullptr ? oauthbearerConfig :
"");
575 catch (
const std::exception& e)
577 rd_kafka_oauthbearer_set_token_failure(rk, e.what());
581 LogBuffer<LOG_BUFFER_SIZE> errInfo;
583 std::vector<const char*> extensions;
584 extensions.reserve(oauthbearerToken.extensions.size() * 2);
585 for (
const auto& kv: oauthbearerToken.extensions)
587 extensions.push_back(kv.first.c_str());
588 extensions.push_back(kv.second.c_str());
591 if (rd_kafka_oauthbearer_set_token(rk,
592 oauthbearerToken.value.c_str(),
593 oauthbearerToken.mdLifetime.count(),
594 oauthbearerToken.mdPrincipalName.c_str(),
595 extensions.data(), extensions.size(),
596 errInfo.str(), errInfo.capacity()) != RD_KAFKA_RESP_ERR_NO_ERROR)
598 rd_kafka_oauthbearer_set_token_failure(rk, errInfo.c_str());
603 KafkaClient::interceptThreadStart(
const std::string& threadName,
const std::string& threadType)
605 if (
const auto& cb = _interceptors.onThreadStart()) cb(threadName, threadType);
609 KafkaClient::interceptThreadExit(
const std::string& threadName,
const std::string& threadType)
611 if (
const auto& cb = _interceptors.onThreadExit()) cb(threadName, threadType);
615 KafkaClient::interceptBrokerStateChange(
int id,
const std::string& secproto,
const std::string& host,
int port,
const std::string& state)
617 if (
const auto& cb = _interceptors.onBrokerStateChange()) cb(
id, secproto, host, port, state);
620 inline rd_kafka_resp_err_t
621 KafkaClient::configInterceptorOnNew(rd_kafka_t* rk,
const rd_kafka_conf_t* ,
void* opaque,
char* , std::size_t )
623 if (
auto result = rd_kafka_interceptor_add_on_thread_start(rk,
"on_thread_start", KafkaClient::interceptorOnThreadStart, opaque))
628 if (
auto result = rd_kafka_interceptor_add_on_thread_exit(rk,
"on_thread_exit", KafkaClient::interceptorOnThreadExit, opaque))
633 if (
auto result = rd_kafka_interceptor_add_on_broker_state_change(rk,
"on_broker_state_change", KafkaClient::interceptorOnBrokerStateChange, opaque))
638 return RD_KAFKA_RESP_ERR_NO_ERROR;
641 inline rd_kafka_resp_err_t
642 KafkaClient::interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType,
const char* threadName,
void* )
644 kafkaClient(rk).interceptThreadStart(threadName, toString(threadType));
646 return RD_KAFKA_RESP_ERR_NO_ERROR;
649 inline rd_kafka_resp_err_t
650 KafkaClient::interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType,
const char* threadName,
void* )
652 kafkaClient(rk).interceptThreadExit(threadName, toString(threadType));
654 return RD_KAFKA_RESP_ERR_NO_ERROR;
657 inline rd_kafka_resp_err_t
658 KafkaClient::interceptorOnBrokerStateChange(rd_kafka_t* rk,
int id,
const char* secproto,
const char* host,
int port,
const char* state,
void* )
660 kafkaClient(rk).interceptBrokerStateChange(
id, secproto, host, port, state);
662 return RD_KAFKA_RESP_ERR_NO_ERROR;
665 inline Optional<BrokerMetadata>
668 const rd_kafka_metadata_t* rk_metadata =
nullptr;
670 const rd_kafka_resp_err_t err = rd_kafka_metadata(getClientHandle(),
674 convertMsDurationToInt(timeout));
676 auto guard = rd_kafka_metadata_unique_ptr(rk_metadata);
678 if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
680 if (!disableErrorLogging)
682 KAFKA_API_DO_LOG(Log::Level::Err,
"failed to get BrokerMetadata! error[%s]", rd_kafka_err2str(err));
684 return Optional<BrokerMetadata>{};
687 const rd_kafka_metadata_topic* metadata_topic =
nullptr;
688 for (
int i = 0; i < rk_metadata->topic_cnt; ++i)
690 if (rk_metadata->topics[i].topic == topic)
692 metadata_topic = &rk_metadata->topics[i];
697 if (!metadata_topic || metadata_topic->err)
699 if (!disableErrorLogging)
703 KAFKA_API_DO_LOG(Log::Level::Err,
"failed to find BrokerMetadata for topic[%s]", topic.c_str());
707 KAFKA_API_DO_LOG(Log::Level::Err,
"failed to get BrokerMetadata for topic[%s]! error[%s]", topic.c_str(), rd_kafka_err2str(metadata_topic->err));
710 return Optional<BrokerMetadata>{};
715 metadata.setOrigNodeName(rk_metadata->orig_broker_name ? std::string(rk_metadata->orig_broker_name) :
"");
717 for (
int i = 0; i < rk_metadata->broker_cnt; ++i)
719 metadata.addNode(rk_metadata->brokers[i].id, rk_metadata->brokers[i].host, rk_metadata->brokers[i].port);
722 for (
int i = 0; i < metadata_topic->partition_cnt; ++i)
724 const rd_kafka_metadata_partition& metadata_partition = metadata_topic->partitions[i];
726 const Partition partition = metadata_partition.id;
728 if (metadata_partition.err != 0)
730 if (!disableErrorLogging)
732 KAFKA_API_DO_LOG(Log::Level::Err,
"got error[%s] while constructing BrokerMetadata for topic[%s]-partition[%d]", rd_kafka_err2str(metadata_partition.err), topic.c_str(), partition);
740 for (
int j = 0; j < metadata_partition.replica_cnt; ++j)
742 partitionInfo.addReplica(metadata_partition.replicas[j]);
745 for (
int j = 0; j < metadata_partition.isr_cnt; ++j)
747 partitionInfo.addInSyncReplica(metadata_partition.isrs[j]);
750 metadata.addPartitionInfo(partition, partitionInfo);
The properties for Kafka clients.
Definition: Properties.h:24
T & get(const std::string &key) const
Get a property reference.
Definition: Properties.h:155
bool contains(const std::string &key) const
Check whether the map contains a property.
Definition: Properties.h:144
const PropertiesMap & map() const
Get all properties with a map.
Definition: Properties.h:212
Optional< std::string > getProperty(const std::string &key) const
Get a property.
Definition: Properties.h:170
static constexpr const char * BOOTSTRAP_SERVERS
The string contains host:port pairs of brokers (splitted by ",") that the consumer will use to establ...
Definition: ClientConfig.h:62
static constexpr const char * ENABLE_MANUAL_EVENTS_POLL
To poll the events manually (otherwise, it would be done with a background polling thread).
Definition: ClientConfig.h:26
static constexpr const char * LOG_LEVEL
Log level (syslog(3) levels).
Definition: ClientConfig.h:72
static constexpr const char * CLIENT_ID
Client identifier.
Definition: ClientConfig.h:67
The base class for Kafka clients.
Definition: KafkaClient.h:34
Optional< BrokerMetadata > fetchBrokerMetadata(const std::string &topic, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_METADATA_TIMEOUT_MS), bool disableErrorLogging=false)
Fetch matadata from a available broker.
Definition: KafkaClient.h:666
const std::string & clientId() const
Get the client id.
Definition: KafkaClient.h:41
void setLogLevel(int level)
Set log level for the kafka client (the default value: 5).
Definition: KafkaClient.h:497
const Properties & properties() const
Return the properties which took effect.
Definition: KafkaClient.h:56
Optional< std::string > getProperty(const std::string &name) const
Fetch the effected property (including the property internally set by librdkafka).
Definition: KafkaClient.h:470
const std::string & name() const
Get the client name (i.e.
Definition: KafkaClient.h:46
void pollEvents(std::chrono::milliseconds timeout)
Call the OffsetCommit callbacks (if any) Note: The Kafka client should be constructed with option ena...
Definition: KafkaClient.h:67