3 #include <kafka/Project.h>
5 #include <kafka/AdminClientConfig.h>
6 #include <kafka/AdminCommon.h>
7 #include <kafka/Error.h>
8 #include <kafka/KafkaClient.h>
9 #include <kafka/RdKafkaHelper.h>
11 #include <librdkafka/rdkafka.h>
21 namespace KAFKA_API {
namespace clients {
namespace admin {
39 int replicationFactor,
41 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
46 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
59 std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
62 static std::list<Error> getPerTopicResults(
const rd_kafka_topic_result_t** topicResults, std::size_t topicCount);
63 static std::list<Error> getPerTopicPartitionResults(
const rd_kafka_topic_partition_list_t* partitionResults);
64 static Error combineErrors(
const std::list<Error>& errors);
66 #if COMPILER_SUPPORTS_CPP_17
67 static constexpr
int DEFAULT_COMMAND_TIMEOUT_MS = 30000;
69 enum { DEFAULT_COMMAND_TIMEOUT_MS = 30000 };
74 inline std::list<Error>
75 AdminClient::getPerTopicResults(
const rd_kafka_topic_result_t** topicResults, std::size_t topicCount)
77 std::list<Error> errors;
79 for (std::size_t i = 0; i < topicCount; ++i)
81 const rd_kafka_topic_result_t* topicResult = topicResults[i];
82 if (
const rd_kafka_resp_err_t topicError = rd_kafka_topic_result_error(topicResult))
84 const std::string detailedMsg =
"topic[" + std::string(rd_kafka_topic_result_name(topicResult)) +
"] with error[" + rd_kafka_topic_result_error_string(topicResult) +
"]";
85 errors.emplace_back(topicError, detailedMsg);
91 inline std::list<Error>
92 AdminClient::getPerTopicPartitionResults(
const rd_kafka_topic_partition_list_t* partitionResults)
94 std::list<Error> errors;
96 for (
int i = 0; i < (partitionResults ? partitionResults->cnt : 0); ++i)
98 if (
const rd_kafka_resp_err_t partitionError = partitionResults->elems[i].err)
100 const std::string detailedMsg =
"topic-partition[" + std::string(partitionResults->elems[i].topic) +
"-" + std::to_string(partitionResults->elems[i].partition) +
"] with error[" + rd_kafka_err2str(partitionError) +
"]";
101 errors.emplace_back(partitionError, detailedMsg);
108 AdminClient::combineErrors(
const std::list<Error>& errors)
112 std::string detailedMsg;
113 std::for_each(errors.cbegin(), errors.cend(),
114 [&detailedMsg](
const auto& error) {
115 if (!detailedMsg.empty()) detailedMsg +=
"; ";
117 detailedMsg += error.message();
120 return Error{
static_cast<rd_kafka_resp_err_t
>(errors.front().value()), detailedMsg};
123 return Error{RD_KAFKA_RESP_ERR_NO_ERROR,
"Success"};
126 inline admin::CreateTopicsResult
129 int replicationFactor,
131 std::chrono::milliseconds timeout)
133 LogBuffer<500> errInfo;
135 std::vector<rd_kafka_NewTopic_unique_ptr> rkNewTopics;
137 for (
const auto& topic: topics)
139 rkNewTopics.emplace_back(rd_kafka_NewTopic_new(topic.c_str(), numPartitions, replicationFactor, errInfo.str(), errInfo.capacity()));
140 if (!rkNewTopics.back())
145 for (
const auto& conf: topicConfig.
map())
147 const auto& k = conf.first;
151 const rd_kafka_resp_err_t err = rd_kafka_NewTopic_set_config(rkNewTopics.back().get(), k.c_str(), v->c_str());
152 if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
154 const std::string errMsg =
"Invalid config[" + k +
"=" + *v +
"]";
155 KAFKA_API_DO_LOG(Log::Level::Err, errMsg.c_str());
161 std::vector<rd_kafka_NewTopic_t*> rk_topics;
162 rk_topics.reserve(rkNewTopics.size());
163 for (
const auto& topic : rkNewTopics) { rk_topics.emplace_back(topic.get()); }
165 auto rk_queue = rd_kafka_queue_unique_ptr(rd_kafka_queue_new(getClientHandle()));
167 rd_kafka_CreateTopics(getClientHandle(), rk_topics.data(), rk_topics.size(),
nullptr, rk_queue.get());
169 auto rk_ev = rd_kafka_event_unique_ptr();
171 const auto end = std::chrono::steady_clock::now() + timeout;
174 rk_ev.reset(rd_kafka_queue_poll(rk_queue.get(), EVENT_POLLING_INTERVAL_MS));
176 if (rd_kafka_event_type(rk_ev.get()) == RD_KAFKA_EVENT_CREATETOPICS_RESULT)
break;
180 KAFKA_API_DO_LOG(Log::Level::Err,
"rd_kafka_queue_poll got event[%s], with error[%s]", rd_kafka_event_name(rk_ev.get()), rd_kafka_event_error_string(rk_ev.get()));
183 }
while (std::chrono::steady_clock::now() < end);
190 std::list<Error> errors;
192 if (
const rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
194 errors.emplace_back(respErr, rd_kafka_event_error_string(rk_ev.get()));
198 const rd_kafka_CreateTopics_result_t* res = rd_kafka_event_CreateTopics_result(rk_ev.get());
199 std::size_t res_topic_cnt{};
200 const rd_kafka_topic_result_t** res_topics = rd_kafka_CreateTopics_result_topics(res, &res_topic_cnt);
202 errors.splice(errors.end(), getPerTopicResults(res_topics, res_topic_cnt));
214 if (!listResult.error)
218 }
while (std::chrono::steady_clock::now() < end);
226 std::vector<rd_kafka_DeleteTopic_unique_ptr> rkDeleteTopics;
228 for (
const auto& topic: topics)
230 rkDeleteTopics.emplace_back(rd_kafka_DeleteTopic_new(topic.c_str()));
231 assert(rkDeleteTopics.back());
234 std::vector<rd_kafka_DeleteTopic_t*> rk_topics;
235 rk_topics.reserve(rkDeleteTopics.size());
236 for (
const auto& topic : rkDeleteTopics) { rk_topics.emplace_back(topic.get()); }
238 auto rk_queue = rd_kafka_queue_unique_ptr(rd_kafka_queue_new(getClientHandle()));
240 rd_kafka_DeleteTopics(getClientHandle(), rk_topics.data(), rk_topics.size(),
nullptr, rk_queue.get());
242 auto rk_ev = rd_kafka_event_unique_ptr();
244 const auto end = std::chrono::steady_clock::now() + timeout;
247 rk_ev.reset(rd_kafka_queue_poll(rk_queue.get(), EVENT_POLLING_INTERVAL_MS));
249 if (rd_kafka_event_type(rk_ev.get()) == RD_KAFKA_EVENT_DELETETOPICS_RESULT)
break;
253 KAFKA_API_DO_LOG(Log::Level::Err,
"rd_kafka_queue_poll got event[%s], with error[%s]", rd_kafka_event_name(rk_ev.get()), rd_kafka_event_error_string(rk_ev.get()));
256 }
while (std::chrono::steady_clock::now() < end);
263 std::list<Error> errors;
265 if (
const rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
267 errors.emplace_back(respErr, rd_kafka_event_error_string(rk_ev.get()));
271 const rd_kafka_DeleteTopics_result_t* res = rd_kafka_event_DeleteTopics_result(rk_ev.get());
272 std::size_t res_topic_cnt{};
273 const rd_kafka_topic_result_t** res_topics = rd_kafka_DeleteTopics_result_topics(res, &res_topic_cnt);
275 errors.splice(errors.end(), getPerTopicResults(res_topics, res_topic_cnt));
283 const rd_kafka_metadata_t* rk_metadata =
nullptr;
284 const rd_kafka_resp_err_t err = rd_kafka_metadata(getClientHandle(),
true,
nullptr, &rk_metadata, convertMsDurationToInt(timeout));
285 auto guard = rd_kafka_metadata_unique_ptr(rk_metadata);
287 if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
293 for (
int i = 0; i < rk_metadata->topic_cnt; ++i)
295 names.insert(rk_metadata->topics[i].topic);
302 std::chrono::milliseconds timeout)
304 auto rk_queue = rd_kafka_queue_unique_ptr(rd_kafka_queue_new(getClientHandle()));
306 const rd_kafka_DeleteRecords_unique_ptr rkDeleteRecords(rd_kafka_DeleteRecords_new(createRkTopicPartitionList(topicPartitionOffsets)));
307 std::array<rd_kafka_DeleteRecords_t*, 1> rk_del_records{rkDeleteRecords.get()};
309 rd_kafka_DeleteRecords(getClientHandle(), rk_del_records.data(), rk_del_records.size(),
nullptr, rk_queue.get());
311 auto rk_ev = rd_kafka_event_unique_ptr();
313 const auto end = std::chrono::steady_clock::now() + timeout;
316 rk_ev.reset(rd_kafka_queue_poll(rk_queue.get(), EVENT_POLLING_INTERVAL_MS));
318 if (rd_kafka_event_type(rk_ev.get()) == RD_KAFKA_EVENT_DELETERECORDS_RESULT)
break;
322 KAFKA_API_DO_LOG(Log::Level::Err,
"rd_kafka_queue_poll got event[%s], with error[%s]", rd_kafka_event_name(rk_ev.get()), rd_kafka_event_error_string(rk_ev.get()));
325 }
while (std::chrono::steady_clock::now() < end);
332 std::list<Error> errors;
334 if (
const rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
336 errors.emplace_back(respErr, rd_kafka_event_error_string(rk_ev.get()));
339 const rd_kafka_DeleteRecords_result_t* res = rd_kafka_event_DeleteRecords_result(rk_ev.get());
340 const rd_kafka_topic_partition_list_t* res_offsets = rd_kafka_DeleteRecords_result_offsets(res);
342 errors.splice(errors.end(), getPerTopicPartitionResults(res_offsets));
Unified error type.
Definition: Error.h:32
The properties for Kafka clients.
Definition: Properties.h:24
const PropertiesMap & map() const
Get all properties with a map.
Definition: Properties.h:212
Optional< std::string > getProperty(const std::string &key) const
Get a property.
Definition: Properties.h:170
The base class for Kafka clients.
Definition: KafkaClient.h:34
const Properties & properties() const
Return the properties which took effect.
Definition: KafkaClient.h:56
The administrative client for Kafka, which supports managing and inspecting topics,...
Definition: AdminClient.h:27
admin::CreateTopicsResult createTopics(const Topics &topics, int numPartitions, int replicationFactor, const Properties &topicConfig=Properties(), std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS))
Create a batch of new topics.
Definition: AdminClient.h:127
admin::DeleteTopicsResult deleteTopics(const Topics &topics, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS))
Delete a batch of topics.
Definition: AdminClient.h:224
admin::DeleteRecordsResult deleteRecords(const TopicPartitionOffsets &topicPartitionOffsets, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS))
Delete records whose offset is smaller than the given offset of the corresponding partition.
Definition: AdminClient.h:301
admin::ListTopicsResult listTopics(std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS))
List the topics available in the cluster.
Definition: AdminClient.h:281
The result of AdminClient::createTopics().
Definition: AdminCommon.h:15
The result of AdminClient::deleteRecords().
Definition: AdminCommon.h:41
The result of AdminClient::deleteTopics().
Definition: AdminCommon.h:28
The result of AdminClient::listTopics().
Definition: AdminCommon.h:54