Modern C++ Kafka API
RdKafkaHelper.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/Types.h>
6 
7 #include <librdkafka/rdkafka.h>
8 
9 #include <cassert>
10 #include <memory>
11 
12 namespace KAFKA_API {
13 
14 // define smart pointers for rk_kafka_xxx datatypes
15 
16 struct RkQueueDeleter { void operator()(rd_kafka_queue_t* p) { rd_kafka_queue_destroy(p); } };
17 using rd_kafka_queue_unique_ptr = std::unique_ptr<rd_kafka_queue_t, RkQueueDeleter>;
18 
19 struct RkEventDeleter { void operator()(rd_kafka_event_t* p) { rd_kafka_event_destroy(p); } };
20 using rd_kafka_event_unique_ptr = std::unique_ptr<rd_kafka_event_t, RkEventDeleter>;
21 
22 struct RkTopicDeleter { void operator()(rd_kafka_topic_t* p) { rd_kafka_topic_destroy(p); } };
23 using rd_kafka_topic_unique_ptr = std::unique_ptr<rd_kafka_topic_t, RkTopicDeleter>;
24 
25 struct RkTopicPartitionListDeleter { void operator()(rd_kafka_topic_partition_list_t* p) { rd_kafka_topic_partition_list_destroy(p); } };
26 using rd_kafka_topic_partition_list_unique_ptr = std::unique_ptr<rd_kafka_topic_partition_list_t, RkTopicPartitionListDeleter>;
27 
28 struct RkConfDeleter { void operator()(rd_kafka_conf_t* p) { rd_kafka_conf_destroy(p); } };
29 using rd_kafka_conf_unique_ptr = std::unique_ptr<rd_kafka_conf_t, RkConfDeleter>;
30 
31 struct RkMetadataDeleter { void operator()(const rd_kafka_metadata_t* p) { rd_kafka_metadata_destroy(p); } };
32 using rd_kafka_metadata_unique_ptr = std::unique_ptr<const rd_kafka_metadata_t, RkMetadataDeleter>;
33 
34 struct RkDeleter { void operator()(rd_kafka_t* p) { rd_kafka_destroy(p); } };
35 using rd_kafka_unique_ptr = std::unique_ptr<rd_kafka_t, RkDeleter>;
36 
37 struct RkNewTopicDeleter { void operator()(rd_kafka_NewTopic_t* p) { rd_kafka_NewTopic_destroy(p); } };
38 using rd_kafka_NewTopic_unique_ptr = std::unique_ptr<rd_kafka_NewTopic_t, RkNewTopicDeleter>;
39 
40 struct RkDeleteTopicDeleter { void operator()(rd_kafka_DeleteTopic_t* p) { rd_kafka_DeleteTopic_destroy(p); } };
41 using rd_kafka_DeleteTopic_unique_ptr = std::unique_ptr<rd_kafka_DeleteTopic_t, RkDeleteTopicDeleter>;
42 
43 struct RkDeleteRecordsDeleter { void operator()(rd_kafka_DeleteRecords_t* p) { rd_kafka_DeleteRecords_destroy(p); } };
44 using rd_kafka_DeleteRecords_unique_ptr = std::unique_ptr<rd_kafka_DeleteRecords_t, RkDeleteRecordsDeleter>;
45 
46 struct RkConsumerGroupMetadataDeleter { void operator()(rd_kafka_consumer_group_metadata_t* p) { rd_kafka_consumer_group_metadata_destroy(p) ; } };
47 using rd_kafka_consumer_group_metadata_unique_ptr = std::unique_ptr<rd_kafka_consumer_group_metadata_t, RkConsumerGroupMetadataDeleter>;
48 
49 inline void RkErrorDeleter(rd_kafka_error_t* p) { rd_kafka_error_destroy(p); }
50 using rd_kafka_error_shared_ptr = std::shared_ptr<rd_kafka_error_t>;
51 
52 
53 inline std::string toString(rd_kafka_thread_type_t threadType)
54 {
55  switch (threadType)
56  {
57  case RD_KAFKA_THREAD_MAIN:
58  return "main";
59  case RD_KAFKA_THREAD_BACKGROUND:
60  return "background";
61  case RD_KAFKA_THREAD_BROKER:
62  return "broker";
63  default:
64  assert(false);
65  return "NA";
66  }
67 }
68 
69 // Convert from rd_kafka_xxx datatypes
70 inline TopicPartitionOffsets getTopicPartitionOffsets(const rd_kafka_topic_partition_list_t* rk_tpos)
71 {
72  TopicPartitionOffsets ret;
73  const int count = rk_tpos ? rk_tpos->cnt : 0;
74  for (int i = 0; i < count; ++i)
75  {
76  const Topic t = rk_tpos->elems[i].topic;
77  const Partition p = rk_tpos->elems[i].partition;
78  const Offset o = rk_tpos->elems[i].offset;
79 
80  ret[TopicPartition(t, p)] = o;
81  }
82  return ret;
83 }
84 
85 inline Topics getTopics(const rd_kafka_topic_partition_list_t* rk_topics)
86 {
87  Topics result;
88  for (int i = 0; i < (rk_topics ? rk_topics->cnt : 0); ++i)
89  {
90  result.insert(rk_topics->elems[i].topic);
91  }
92  return result;
93 }
94 
95 inline TopicPartitions getTopicPartitions(const rd_kafka_topic_partition_list_t* rk_tpos)
96 {
97  TopicPartitions result;
98  for (int i = 0; i < (rk_tpos ? rk_tpos->cnt : 0); ++i)
99  {
100  result.insert(TopicPartition{rk_tpos->elems[i].topic, rk_tpos->elems[i].partition});
101  }
102  return result;
103 }
104 
105 // Convert to rd_kafka_xxx datatypes
106 inline rd_kafka_topic_partition_list_t* createRkTopicPartitionList(const TopicPartitionOffsets& tpos)
107 {
108  rd_kafka_topic_partition_list_t* rk_tpos = rd_kafka_topic_partition_list_new(static_cast<int>(tpos.size()));
109  for (const auto& tp_o: tpos)
110  {
111  const auto& tp = tp_o.first;
112  const auto& o = tp_o.second;
113  rd_kafka_topic_partition_t* rk_tp = rd_kafka_topic_partition_list_add(rk_tpos, tp.first.c_str(), tp.second);
114  rk_tp->offset = o;
115  }
116  return rk_tpos;
117 }
118 
119 inline rd_kafka_topic_partition_list_t* createRkTopicPartitionList(const TopicPartitions& tps)
120 {
121  TopicPartitionOffsets tpos;
122  for (const auto& tp: tps)
123  {
124  tpos[TopicPartition(tp.first, tp.second)] = RD_KAFKA_OFFSET_INVALID;
125  }
126  return createRkTopicPartitionList(tpos);
127 }
128 
129 inline rd_kafka_topic_partition_list_t* createRkTopicPartitionList(const Topics& topics)
130 {
131  TopicPartitionOffsets tpos;
132  for (const auto& topic: topics)
133  {
134  tpos[TopicPartition(topic, RD_KAFKA_PARTITION_UA)] = RD_KAFKA_OFFSET_INVALID;
135  }
136  return createRkTopicPartitionList(tpos);
137 }
138 
139 } // end of KAFKA_API
140