3 #include <kafka/Project.h>
5 #include <kafka/Types.h>
7 #include <librdkafka/rdkafka.h>
16 struct RkQueueDeleter {
void operator()(rd_kafka_queue_t* p) { rd_kafka_queue_destroy(p); } };
17 using rd_kafka_queue_unique_ptr = std::unique_ptr<rd_kafka_queue_t, RkQueueDeleter>;
19 struct RkEventDeleter {
void operator()(rd_kafka_event_t* p) { rd_kafka_event_destroy(p); } };
20 using rd_kafka_event_unique_ptr = std::unique_ptr<rd_kafka_event_t, RkEventDeleter>;
22 struct RkTopicDeleter {
void operator()(rd_kafka_topic_t* p) { rd_kafka_topic_destroy(p); } };
23 using rd_kafka_topic_unique_ptr = std::unique_ptr<rd_kafka_topic_t, RkTopicDeleter>;
25 struct RkTopicPartitionListDeleter {
void operator()(rd_kafka_topic_partition_list_t* p) { rd_kafka_topic_partition_list_destroy(p); } };
26 using rd_kafka_topic_partition_list_unique_ptr = std::unique_ptr<rd_kafka_topic_partition_list_t, RkTopicPartitionListDeleter>;
28 struct RkConfDeleter {
void operator()(rd_kafka_conf_t* p) { rd_kafka_conf_destroy(p); } };
29 using rd_kafka_conf_unique_ptr = std::unique_ptr<rd_kafka_conf_t, RkConfDeleter>;
31 struct RkMetadataDeleter {
void operator()(
const rd_kafka_metadata_t* p) { rd_kafka_metadata_destroy(p); } };
32 using rd_kafka_metadata_unique_ptr = std::unique_ptr<const rd_kafka_metadata_t, RkMetadataDeleter>;
34 struct RkDeleter {
void operator()(rd_kafka_t* p) { rd_kafka_destroy(p); } };
35 using rd_kafka_unique_ptr = std::unique_ptr<rd_kafka_t, RkDeleter>;
37 struct RkNewTopicDeleter {
void operator()(rd_kafka_NewTopic_t* p) { rd_kafka_NewTopic_destroy(p); } };
38 using rd_kafka_NewTopic_unique_ptr = std::unique_ptr<rd_kafka_NewTopic_t, RkNewTopicDeleter>;
40 struct RkDeleteTopicDeleter {
void operator()(rd_kafka_DeleteTopic_t* p) { rd_kafka_DeleteTopic_destroy(p); } };
41 using rd_kafka_DeleteTopic_unique_ptr = std::unique_ptr<rd_kafka_DeleteTopic_t, RkDeleteTopicDeleter>;
43 struct RkDeleteRecordsDeleter {
void operator()(rd_kafka_DeleteRecords_t* p) { rd_kafka_DeleteRecords_destroy(p); } };
44 using rd_kafka_DeleteRecords_unique_ptr = std::unique_ptr<rd_kafka_DeleteRecords_t, RkDeleteRecordsDeleter>;
46 struct RkConsumerGroupMetadataDeleter {
void operator()(rd_kafka_consumer_group_metadata_t* p) { rd_kafka_consumer_group_metadata_destroy(p) ; } };
47 using rd_kafka_consumer_group_metadata_unique_ptr = std::unique_ptr<rd_kafka_consumer_group_metadata_t, RkConsumerGroupMetadataDeleter>;
49 inline void RkErrorDeleter(rd_kafka_error_t* p) { rd_kafka_error_destroy(p); }
50 using rd_kafka_error_shared_ptr = std::shared_ptr<rd_kafka_error_t>;
53 inline std::string toString(rd_kafka_thread_type_t threadType)
57 case RD_KAFKA_THREAD_MAIN:
59 case RD_KAFKA_THREAD_BACKGROUND:
61 case RD_KAFKA_THREAD_BROKER:
70 inline TopicPartitionOffsets getTopicPartitionOffsets(
const rd_kafka_topic_partition_list_t* rk_tpos)
72 TopicPartitionOffsets ret;
73 const int count = rk_tpos ? rk_tpos->cnt : 0;
74 for (
int i = 0; i < count; ++i)
76 const Topic t = rk_tpos->elems[i].topic;
77 const Partition p = rk_tpos->elems[i].partition;
78 const Offset o = rk_tpos->elems[i].offset;
80 ret[TopicPartition(t, p)] = o;
85 inline Topics getTopics(
const rd_kafka_topic_partition_list_t* rk_topics)
88 for (
int i = 0; i < (rk_topics ? rk_topics->cnt : 0); ++i)
90 result.insert(rk_topics->elems[i].topic);
95 inline TopicPartitions getTopicPartitions(
const rd_kafka_topic_partition_list_t* rk_tpos)
97 TopicPartitions result;
98 for (
int i = 0; i < (rk_tpos ? rk_tpos->cnt : 0); ++i)
100 result.insert(TopicPartition{rk_tpos->elems[i].topic, rk_tpos->elems[i].partition});
106 inline rd_kafka_topic_partition_list_t* createRkTopicPartitionList(
const TopicPartitionOffsets& tpos)
108 rd_kafka_topic_partition_list_t* rk_tpos = rd_kafka_topic_partition_list_new(
static_cast<int>(tpos.size()));
109 for (
const auto& tp_o: tpos)
111 const auto& tp = tp_o.first;
112 const auto& o = tp_o.second;
113 rd_kafka_topic_partition_t* rk_tp = rd_kafka_topic_partition_list_add(rk_tpos, tp.first.c_str(), tp.second);
119 inline rd_kafka_topic_partition_list_t* createRkTopicPartitionList(
const TopicPartitions& tps)
121 TopicPartitionOffsets tpos;
122 for (
const auto& tp: tps)
124 tpos[TopicPartition(tp.first, tp.second)] = RD_KAFKA_OFFSET_INVALID;
126 return createRkTopicPartitionList(tpos);
129 inline rd_kafka_topic_partition_list_t* createRkTopicPartitionList(
const Topics& topics)
131 TopicPartitionOffsets tpos;
132 for (
const auto& topic: topics)
134 tpos[TopicPartition(topic, RD_KAFKA_PARTITION_UA)] = RD_KAFKA_OFFSET_INVALID;
136 return createRkTopicPartitionList(tpos);