Modern C++ Kafka API
Types.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <algorithm>
6 #include <cctype>
7 #include <chrono>
8 #include <cstdint>
9 #include <iomanip>
10 #include <map>
11 #include <memory>
12 #include <set>
13 #include <sstream>
14 #include <string>
15 #include <vector>
16 
17 
18 // Use `boost::optional` for C++14, which doesn't support `std::optional`
19 #if COMPILER_SUPPORTS_CPP_17
20 #include <optional>
21 template<class T>
22 using Optional = std::optional<T>;
23 #else
24 #include <boost/optional.hpp>
25 #include <boost/optional/optional_io.hpp>
26 template<class T>
27 using Optional = boost::optional<T>;
28 #endif
29 
30 
31 namespace KAFKA_API {
32 
33 // Which is similar with `boost::const_buffer` (thus avoid the dependency towards `boost`)
34 class ConstBuffer
35 {
36 public:
37  explicit ConstBuffer(const void* data = nullptr, std::size_t size = 0): _data(data), _size(size) {}
38  const void* data() const { return _data; }
39  std::size_t size() const { return _size; }
40  std::string toString() const
41  {
42  if (_size == 0) return _data ? "[empty]" : "[null]";
43 
44  std::ostringstream oss;
45 
46  auto printChar = [&oss](const unsigned char c) {
47  if (std::isprint(c)) {
48  oss << c;
49  } else {
50  oss << "[0x" << std::hex << std::setfill('0') << std::setw(2) << static_cast<int>(c) << "]";
51  }
52  };
53  const auto* beg = static_cast<const unsigned char*>(_data);
54  std::for_each(beg, beg + _size, printChar);
55 
56  return oss.str();
57  }
58 private:
59  const void* _data;
60  std::size_t _size;
61 };
62 
63 
67 #if COMPILER_SUPPORTS_CPP_17
68 const inline std::chrono::milliseconds InfiniteTimeout = (std::chrono::milliseconds::max)();
69 #else
70 const static std::chrono::milliseconds InfiniteTimeout = (std::chrono::milliseconds::max)();
71 #endif
72 
73 
77 using Topic = std::string;
78 
82 using Partition = std::int32_t;
83 
87 using Offset = std::int64_t;
88 
92 using Key = ConstBuffer;
93 using KeySize = std::size_t;
94 
98 #if COMPILER_SUPPORTS_CPP_17
99 const inline Key NullKey = Key{};
100 #else
101 const static Key NullKey = Key{};
102 #endif
103 
107 using Value = ConstBuffer;
108 using ValueSize = std::size_t;
109 
113 #if COMPILER_SUPPORTS_CPP_17
114 const inline Value NullValue = Value{};
115 #else
116 const static Value NullValue = Value{};
117 #endif
118 
122 using Topics = std::set<Topic>;
123 
127 using TopicPartition = std::pair<Topic, Partition>;
128 
132 using TopicPartitions = std::set<TopicPartition>;
133 
137 using TopicPartitionOffset = std::tuple<Topic, Partition, Offset>;
138 
142 using TopicPartitionOffsets = std::map<TopicPartition, Offset>;
143 
144 
148 inline std::string toString(const Topics& topics)
149 {
150  std::string ret;
151  std::for_each(topics.cbegin(), topics.cend(),
152  [&ret](const auto& topic) {
153  ret.append(ret.empty() ? "" : ",").append(topic);
154  });
155  return ret;
156 }
157 
161 inline std::string toString(const TopicPartition& tp)
162 {
163  return tp.first + std::string("-") + std::to_string(tp.second);
164 }
165 
169 inline std::string toString(const TopicPartitions& tps)
170 {
171  std::string ret;
172  std::for_each(tps.cbegin(), tps.cend(),
173  [&ret](const auto& tp) {
174  ret.append((ret.empty() ? "" : ",") + tp.first + "-" + std::to_string(tp.second));
175  });
176  return ret;
177 }
178 
182 inline std::string toString(const TopicPartitionOffset& tpo)
183 {
184  return std::get<0>(tpo) + "-" + std::to_string(std::get<1>(tpo)) + ":" + std::to_string(std::get<2>(tpo));
185 }
186 
190 inline std::string toString(const TopicPartitionOffsets& tpos)
191 {
192  std::string ret;
193  std::for_each(tpos.cbegin(), tpos.cend(),
194  [&ret](const auto& tp_o) {
195  const TopicPartition& tp = tp_o.first;
196  const Offset& o = tp_o.second;
197  ret.append((ret.empty() ? "" : ",") + tp.first + "-" + std::to_string(tp.second) + ":" + std::to_string(o));
198  });
199  return ret;
200 }
201 
202 } // end of KAFKA_API
203