Modern C++ Kafka API
AdminClient.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/AdminClientConfig.h>
6 #include <kafka/AdminCommon.h>
7 #include <kafka/Error.h>
8 #include <kafka/KafkaClient.h>
9 #include <kafka/RdKafkaHelper.h>
10 
11 #include <librdkafka/rdkafka.h>
12 
13 #include <array>
14 #include <cassert>
15 #include <list>
16 #include <memory>
17 #include <thread>
18 #include <vector>
19 
20 
21 namespace KAFKA_API { namespace clients { namespace admin {
22 
26 class AdminClient: public KafkaClient
27 {
28 public:
29  explicit AdminClient(const Properties& properties)
30  : KafkaClient(ClientType::AdminClient, KafkaClient::validateAndReformProperties(properties))
31  {
32  }
33 
37  admin::CreateTopicsResult createTopics(const Topics& topics,
38  int numPartitions,
39  int replicationFactor,
40  const Properties& topicConfig = Properties(),
41  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
45  admin::DeleteTopicsResult deleteTopics(const Topics& topics,
46  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
50  admin::ListTopicsResult listTopics(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
51 
58  admin::DeleteRecordsResult deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
59  std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
60 
61 private:
62  static std::list<Error> getPerTopicResults(const rd_kafka_topic_result_t** topicResults, std::size_t topicCount);
63  static std::list<Error> getPerTopicPartitionResults(const rd_kafka_topic_partition_list_t* partitionResults);
64  static Error combineErrors(const std::list<Error>& errors);
65 
66 #if COMPILER_SUPPORTS_CPP_17
67  static constexpr int DEFAULT_COMMAND_TIMEOUT_MS = 30000;
68 #else
69  enum { DEFAULT_COMMAND_TIMEOUT_MS = 30000 };
70 #endif
71 };
72 
73 
74 inline std::list<Error>
75 AdminClient::getPerTopicResults(const rd_kafka_topic_result_t** topicResults, std::size_t topicCount)
76 {
77  std::list<Error> errors;
78 
79  for (std::size_t i = 0; i < topicCount; ++i)
80  {
81  const rd_kafka_topic_result_t* topicResult = topicResults[i];
82  if (const rd_kafka_resp_err_t topicError = rd_kafka_topic_result_error(topicResult))
83  {
84  const std::string detailedMsg = "topic[" + std::string(rd_kafka_topic_result_name(topicResult)) + "] with error[" + rd_kafka_topic_result_error_string(topicResult) + "]";
85  errors.emplace_back(topicError, detailedMsg);
86  }
87  }
88  return errors;
89 }
90 
91 inline std::list<Error>
92 AdminClient::getPerTopicPartitionResults(const rd_kafka_topic_partition_list_t* partitionResults)
93 {
94  std::list<Error> errors;
95 
96  for (int i = 0; i < (partitionResults ? partitionResults->cnt : 0); ++i)
97  {
98  if (const rd_kafka_resp_err_t partitionError = partitionResults->elems[i].err)
99  {
100  const std::string detailedMsg = "topic-partition[" + std::string(partitionResults->elems[i].topic) + "-" + std::to_string(partitionResults->elems[i].partition) + "] with error[" + rd_kafka_err2str(partitionError) + "]";
101  errors.emplace_back(partitionError, detailedMsg);
102  }
103  }
104  return errors;
105 }
106 
107 inline Error
108 AdminClient::combineErrors(const std::list<Error>& errors)
109 {
110  if (!errors.empty())
111  {
112  std::string detailedMsg;
113  std::for_each(errors.cbegin(), errors.cend(),
114  [&detailedMsg](const auto& error) {
115  if (!detailedMsg.empty()) detailedMsg += "; ";
116 
117  detailedMsg += error.message();
118  });
119 
120  return Error{static_cast<rd_kafka_resp_err_t>(errors.front().value()), detailedMsg};
121  }
122 
123  return Error{RD_KAFKA_RESP_ERR_NO_ERROR, "Success"};
124 }
125 
126 inline admin::CreateTopicsResult
127 AdminClient::createTopics(const Topics& topics,
128  int numPartitions,
129  int replicationFactor,
130  const Properties& topicConfig,
131  std::chrono::milliseconds timeout)
132 {
133  LogBuffer<500> errInfo;
134 
135  std::vector<rd_kafka_NewTopic_unique_ptr> rkNewTopics;
136 
137  for (const auto& topic: topics)
138  {
139  rkNewTopics.emplace_back(rd_kafka_NewTopic_new(topic.c_str(), numPartitions, replicationFactor, errInfo.str(), errInfo.capacity()));
140  if (!rkNewTopics.back())
141  {
142  return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, rd_kafka_err2str(RD_KAFKA_RESP_ERR__INVALID_ARG)});
143  }
144 
145  for (const auto& conf: topicConfig.map())
146  {
147  const auto& k = conf.first;
148  const auto& v = topicConfig.getProperty(k);
149  if (!v) continue;
150 
151  const rd_kafka_resp_err_t err = rd_kafka_NewTopic_set_config(rkNewTopics.back().get(), k.c_str(), v->c_str());
152  if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
153  {
154  const std::string errMsg = "Invalid config[" + k + "=" + *v + "]";
155  KAFKA_API_DO_LOG(Log::Level::Err, errMsg.c_str());
156  return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg});
157  }
158  }
159  }
160 
161  std::vector<rd_kafka_NewTopic_t*> rk_topics;
162  rk_topics.reserve(rkNewTopics.size());
163  for (const auto& topic : rkNewTopics) { rk_topics.emplace_back(topic.get()); }
164 
165  auto rk_queue = rd_kafka_queue_unique_ptr(rd_kafka_queue_new(getClientHandle()));
166 
167  rd_kafka_CreateTopics(getClientHandle(), rk_topics.data(), rk_topics.size(), nullptr, rk_queue.get());
168 
169  auto rk_ev = rd_kafka_event_unique_ptr();
170 
171  const auto end = std::chrono::steady_clock::now() + timeout;
172  do
173  {
174  rk_ev.reset(rd_kafka_queue_poll(rk_queue.get(), EVENT_POLLING_INTERVAL_MS));
175 
176  if (rd_kafka_event_type(rk_ev.get()) == RD_KAFKA_EVENT_CREATETOPICS_RESULT) break;
177 
178  if (rk_ev)
179  {
180  KAFKA_API_DO_LOG(Log::Level::Err, "rd_kafka_queue_poll got event[%s], with error[%s]", rd_kafka_event_name(rk_ev.get()), rd_kafka_event_error_string(rk_ev.get()));
181  rk_ev.reset();
182  }
183  } while (std::chrono::steady_clock::now() < end);
184 
185  if (!rk_ev)
186  {
187  return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
188  }
189 
190  std::list<Error> errors;
191 
192  if (const rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
193  {
194  errors.emplace_back(respErr, rd_kafka_event_error_string(rk_ev.get()));
195  }
196 
197  // Fetch per-topic results
198  const rd_kafka_CreateTopics_result_t* res = rd_kafka_event_CreateTopics_result(rk_ev.get());
199  std::size_t res_topic_cnt{};
200  const rd_kafka_topic_result_t** res_topics = rd_kafka_CreateTopics_result_topics(res, &res_topic_cnt);
201 
202  errors.splice(errors.end(), getPerTopicResults(res_topics, res_topic_cnt));
203 
204  // Return the error if any
205  if (!errors.empty())
206  {
207  return admin::CreateTopicsResult{combineErrors(errors)};
208  }
209 
210  // Update metedata
211  do
212  {
213  auto listResult = listTopics();
214  if (!listResult.error)
215  {
216  return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR_NO_ERROR, "Success"});
217  }
218  } while (std::chrono::steady_clock::now() < end);
219 
220  return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "Updating metadata timed out"});
221 }
222 
224 AdminClient::deleteTopics(const Topics& topics, std::chrono::milliseconds timeout)
225 {
226  std::vector<rd_kafka_DeleteTopic_unique_ptr> rkDeleteTopics;
227 
228  for (const auto& topic: topics)
229  {
230  rkDeleteTopics.emplace_back(rd_kafka_DeleteTopic_new(topic.c_str()));
231  assert(rkDeleteTopics.back());
232  }
233 
234  std::vector<rd_kafka_DeleteTopic_t*> rk_topics;
235  rk_topics.reserve(rkDeleteTopics.size());
236  for (const auto& topic : rkDeleteTopics) { rk_topics.emplace_back(topic.get()); }
237 
238  auto rk_queue = rd_kafka_queue_unique_ptr(rd_kafka_queue_new(getClientHandle()));
239 
240  rd_kafka_DeleteTopics(getClientHandle(), rk_topics.data(), rk_topics.size(), nullptr, rk_queue.get());
241 
242  auto rk_ev = rd_kafka_event_unique_ptr();
243 
244  const auto end = std::chrono::steady_clock::now() + timeout;
245  do
246  {
247  rk_ev.reset(rd_kafka_queue_poll(rk_queue.get(), EVENT_POLLING_INTERVAL_MS));
248 
249  if (rd_kafka_event_type(rk_ev.get()) == RD_KAFKA_EVENT_DELETETOPICS_RESULT) break;
250 
251  if (rk_ev)
252  {
253  KAFKA_API_DO_LOG(Log::Level::Err, "rd_kafka_queue_poll got event[%s], with error[%s]", rd_kafka_event_name(rk_ev.get()), rd_kafka_event_error_string(rk_ev.get()));
254  rk_ev.reset();
255  }
256  } while (std::chrono::steady_clock::now() < end);
257 
258  if (!rk_ev)
259  {
260  return admin::DeleteTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
261  }
262 
263  std::list<Error> errors;
264 
265  if (const rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
266  {
267  errors.emplace_back(respErr, rd_kafka_event_error_string(rk_ev.get()));
268  }
269 
270  // Fetch per-topic results
271  const rd_kafka_DeleteTopics_result_t* res = rd_kafka_event_DeleteTopics_result(rk_ev.get());
272  std::size_t res_topic_cnt{};
273  const rd_kafka_topic_result_t** res_topics = rd_kafka_DeleteTopics_result_topics(res, &res_topic_cnt);
274 
275  errors.splice(errors.end(), getPerTopicResults(res_topics, res_topic_cnt));
276 
277  return admin::DeleteTopicsResult(combineErrors(errors));
278 }
279 
281 AdminClient::listTopics(std::chrono::milliseconds timeout)
282 {
283  const rd_kafka_metadata_t* rk_metadata = nullptr;
284  const rd_kafka_resp_err_t err = rd_kafka_metadata(getClientHandle(), true, nullptr, &rk_metadata, convertMsDurationToInt(timeout));
285  auto guard = rd_kafka_metadata_unique_ptr(rk_metadata);
286 
287  if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
288  {
289  return admin::ListTopicsResult(Error{err, rd_kafka_err2str(err)});
290  }
291 
292  Topics names;
293  for (int i = 0; i < rk_metadata->topic_cnt; ++i)
294  {
295  names.insert(rk_metadata->topics[i].topic);
296  }
297  return admin::ListTopicsResult(names);
298 }
299 
301 AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
302  std::chrono::milliseconds timeout)
303 {
304  auto rk_queue = rd_kafka_queue_unique_ptr(rd_kafka_queue_new(getClientHandle()));
305 
306  const rd_kafka_DeleteRecords_unique_ptr rkDeleteRecords(rd_kafka_DeleteRecords_new(createRkTopicPartitionList(topicPartitionOffsets)));
307  std::array<rd_kafka_DeleteRecords_t*, 1> rk_del_records{rkDeleteRecords.get()};
308 
309  rd_kafka_DeleteRecords(getClientHandle(), rk_del_records.data(), rk_del_records.size(), nullptr, rk_queue.get());
310 
311  auto rk_ev = rd_kafka_event_unique_ptr();
312 
313  const auto end = std::chrono::steady_clock::now() + timeout;
314  do
315  {
316  rk_ev.reset(rd_kafka_queue_poll(rk_queue.get(), EVENT_POLLING_INTERVAL_MS));
317 
318  if (rd_kafka_event_type(rk_ev.get()) == RD_KAFKA_EVENT_DELETERECORDS_RESULT) break;
319 
320  if (rk_ev)
321  {
322  KAFKA_API_DO_LOG(Log::Level::Err, "rd_kafka_queue_poll got event[%s], with error[%s]", rd_kafka_event_name(rk_ev.get()), rd_kafka_event_error_string(rk_ev.get()));
323  rk_ev.reset();
324  }
325  } while (std::chrono::steady_clock::now() < end);
326 
327  if (!rk_ev)
328  {
329  return admin::DeleteRecordsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
330  }
331 
332  std::list<Error> errors;
333 
334  if (const rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
335  {
336  errors.emplace_back(respErr, rd_kafka_event_error_string(rk_ev.get()));
337  }
338 
339  const rd_kafka_DeleteRecords_result_t* res = rd_kafka_event_DeleteRecords_result(rk_ev.get());
340  const rd_kafka_topic_partition_list_t* res_offsets = rd_kafka_DeleteRecords_result_offsets(res);
341 
342  errors.splice(errors.end(), getPerTopicPartitionResults(res_offsets));
343 
344  return admin::DeleteRecordsResult(combineErrors(errors));
345 }
346 
347 } } } // end of KAFKA_API::clients::admin
348 
Unified error type.
Definition: Error.h:32
The properties for Kafka clients.
Definition: Properties.h:24
const PropertiesMap & map() const
Get all properties with a map.
Definition: Properties.h:212
Optional< std::string > getProperty(const std::string &key) const
Get a property.
Definition: Properties.h:170
The base class for Kafka clients.
Definition: KafkaClient.h:34
const Properties & properties() const
Return the properties which took effect.
Definition: KafkaClient.h:56
The administrative client for Kafka, which supports managing and inspecting topics,...
Definition: AdminClient.h:27
admin::CreateTopicsResult createTopics(const Topics &topics, int numPartitions, int replicationFactor, const Properties &topicConfig=Properties(), std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS))
Create a batch of new topics.
Definition: AdminClient.h:127
admin::DeleteTopicsResult deleteTopics(const Topics &topics, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS))
Delete a batch of topics.
Definition: AdminClient.h:224
admin::DeleteRecordsResult deleteRecords(const TopicPartitionOffsets &topicPartitionOffsets, std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS))
Delete records whose offset is smaller than the given offset of the corresponding partition.
Definition: AdminClient.h:301
admin::ListTopicsResult listTopics(std::chrono::milliseconds timeout=std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS))
List the topics available in the cluster.
Definition: AdminClient.h:281
The result of AdminClient::createTopics().
Definition: AdminCommon.h:15
The result of AdminClient::deleteRecords().
Definition: AdminCommon.h:41
The result of AdminClient::deleteTopics().
Definition: AdminCommon.h:28
The result of AdminClient::listTopics().
Definition: AdminCommon.h:54