Modern C++ Kafka API
BrokerMetadata.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/KafkaException.h>
6 #include <kafka/Types.h>
7 
8 #include <librdkafka/rdkafka.h>
9 
10 #include <map>
11 #include <vector>
12 
13 
14 namespace KAFKA_API {
15 
23  struct Node
24  {
25  public:
26  using Id = int;
27  using Host = std::string;
28  using Port = int;
29 
30  Node(Id i, Host h, Port p): id(i), host(std::move(h)), port(p) {}
31 
35  Node::Id id;
36 
40  Node::Host host;
41 
45  Node::Port port;
46 
50  std::string toString() const { return host + ":" + std::to_string(port) + "/" + std::to_string(id); }
51  };
52 
57  {
58  explicit PartitionInfo(Node::Id leaderId): leader(leaderId) {}
59 
60  void addReplica(Node::Id id) { replicas.emplace_back(id); }
61  void addInSyncReplica(Node::Id id) { inSyncReplicas.emplace_back(id); }
62 
66  Node::Id leader;
67 
71  std::vector<Node::Id> replicas;
72 
76  std::vector<Node::Id> inSyncReplicas;
77 
78  };
79 
83  std::string getNodeDescription(Node::Id id) const;
84 
88  std::string toString(const PartitionInfo& partitionInfo) const;
89 
93  explicit BrokerMetadata(Topic topic): _topic(std::move(topic)) {}
94 
98  const std::string& topic() const { return _topic; }
99 
103  std::vector<std::shared_ptr<Node>> nodes() const;
104 
108  const std::map<Partition, PartitionInfo>& partitions() const { return _partitions; }
109 
113  std::string toString() const;
114 
115  void setOrigNodeName(const std::string& origNodeName) { _origNodeName = origNodeName; }
116  void addNode(Node::Id nodeId, const Node::Host& host, Node::Port port) { _nodes[nodeId] = std::make_shared<Node>(nodeId, host, port); }
117  void addPartitionInfo(Partition partition, const PartitionInfo& partitionInfo) { _partitions.emplace(partition, partitionInfo); }
118 
119 private:
120  Topic _topic;
121  std::string _origNodeName;
122  std::map<Node::Id, std::shared_ptr<Node>> _nodes;
123  std::map<Partition, PartitionInfo> _partitions;
124 };
125 
126 inline std::vector<std::shared_ptr<BrokerMetadata::Node>>
128 {
129  std::vector<std::shared_ptr<BrokerMetadata::Node>> ret;
130  ret.reserve(_nodes.size());
131  for (const auto& nodeInfo: _nodes)
132  {
133  ret.emplace_back(nodeInfo.second);
134  }
135  return ret;
136 }
137 
138 inline std::string
140 {
141  const auto& found = _nodes.find(id);
142  if (found == _nodes.cend()) return "-:-/" + std::to_string(id);
143 
144  auto node = found->second;
145  return node->host + ":" + std::to_string(node->port) + "/" + std::to_string(id);
146 }
147 
148 inline std::string
149 BrokerMetadata::toString(const PartitionInfo& partitionInfo) const
150 {
151  std::ostringstream oss;
152 
153  auto streamNodes = [this](std::ostringstream& ss, const std::vector<Node::Id>& nodeIds) -> std::ostringstream& {
154  bool isTheFirst = true;
155  for (const auto id: nodeIds)
156  {
157  ss << (isTheFirst ? (isTheFirst = false, "") : ", ") << getNodeDescription(id);
158  }
159  return ss;
160  };
161 
162  oss << "leader[" << getNodeDescription(partitionInfo.leader) << "], replicas[";
163  streamNodes(oss, partitionInfo.replicas) << "], inSyncReplicas[";
164  streamNodes(oss, partitionInfo.inSyncReplicas) << "]";
165 
166  return oss.str();
167 }
168 
169 inline std::string
171 {
172  std::ostringstream oss;
173 
174  oss << "originatingNode[" << _origNodeName << "], topic[" << _topic << "], partitions{";
175  bool isTheFirst = true;
176  for (const auto& partitionInfoPair: _partitions)
177  {
178  const Partition partition = partitionInfoPair.first;
179  const PartitionInfo& partitionInfo = partitionInfoPair.second;
180  oss << (isTheFirst ? (isTheFirst = false, "") : "; ") << partition << ": " << toString(partitionInfo);
181  }
182  oss << "}";
183 
184  return oss.str();
185 }
186 
187 } // end of KAFKA_API
188 
Information for a Kafka node.
Definition: BrokerMetadata.h:24
Node::Id id
The node id.
Definition: BrokerMetadata.h:35
Node::Host host
The host name.
Definition: BrokerMetadata.h:40
Node::Port port
The port.
Definition: BrokerMetadata.h:45
std::string toString() const
Obtains explanatory string.
Definition: BrokerMetadata.h:50
It is used to describe per-partition state in the MetadataResponse.
Definition: BrokerMetadata.h:57
std::vector< Node::Id > replicas
The complete set of replicas id for this partition regardless of whether they are alive or up-to-date...
Definition: BrokerMetadata.h:71
std::vector< Node::Id > inSyncReplicas
The subset of the replicas id that are in sync, that is caught-up to the leader and ready to take ove...
Definition: BrokerMetadata.h:76
Node::Id leader
The node id currently acting as a leader for this partition or null if there is no leader.
Definition: BrokerMetadata.h:66
The metadata info for a topic.
Definition: BrokerMetadata.h:19
std::string toString() const
Obtains explanatory string.
Definition: BrokerMetadata.h:170
const std::string & topic() const
The topic name.
Definition: BrokerMetadata.h:98
const std::map< Partition, PartitionInfo > & partitions() const
The partitions' state in the MetadataResponse.
Definition: BrokerMetadata.h:108
std::string getNodeDescription(Node::Id id) const
Obtains explanatory string from Node::Id.
Definition: BrokerMetadata.h:139
BrokerMetadata(Topic topic)
The BrokerMetadata is per-topic constructed.
Definition: BrokerMetadata.h:93
std::vector< std::shared_ptr< Node > > nodes() const
The nodes info in the MetadataResponse.
Definition: BrokerMetadata.h:127