3 #include <kafka/Project.h>
5 #include <kafka/KafkaException.h>
6 #include <kafka/Types.h>
8 #include <librdkafka/rdkafka.h>
27 using Host = std::string;
50 std::string
toString()
const {
return host +
":" + std::to_string(
port) +
"/" + std::to_string(
id); }
60 void addReplica(Node::Id
id) {
replicas.emplace_back(
id); }
61 void addInSyncReplica(Node::Id
id) {
inSyncReplicas.emplace_back(
id); }
98 const std::string&
topic()
const {
return _topic; }
103 std::vector<std::shared_ptr<Node>>
nodes()
const;
108 const std::map<Partition, PartitionInfo>&
partitions()
const {
return _partitions; }
115 void setOrigNodeName(
const std::string& origNodeName) { _origNodeName = origNodeName; }
116 void addNode(Node::Id nodeId,
const Node::Host& host, Node::Port port) { _nodes[nodeId] = std::make_shared<Node>(nodeId, host, port); }
117 void addPartitionInfo(Partition partition,
const PartitionInfo& partitionInfo) { _partitions.emplace(partition, partitionInfo); }
121 std::string _origNodeName;
122 std::map<Node::Id, std::shared_ptr<Node>> _nodes;
123 std::map<Partition, PartitionInfo> _partitions;
126 inline std::vector<std::shared_ptr<BrokerMetadata::Node>>
129 std::vector<std::shared_ptr<BrokerMetadata::Node>> ret;
130 ret.reserve(_nodes.size());
131 for (
const auto& nodeInfo: _nodes)
133 ret.emplace_back(nodeInfo.second);
141 const auto& found = _nodes.find(
id);
142 if (found == _nodes.cend())
return "-:-/" + std::to_string(
id);
144 auto node = found->second;
145 return node->host +
":" + std::to_string(node->port) +
"/" + std::to_string(
id);
151 std::ostringstream oss;
153 auto streamNodes = [
this](std::ostringstream& ss,
const std::vector<Node::Id>& nodeIds) -> std::ostringstream& {
154 bool isTheFirst =
true;
155 for (
const auto id: nodeIds)
163 streamNodes(oss, partitionInfo.
replicas) <<
"], inSyncReplicas[";
172 std::ostringstream oss;
174 oss <<
"originatingNode[" << _origNodeName <<
"], topic[" << _topic <<
"], partitions{";
175 bool isTheFirst =
true;
176 for (
const auto& partitionInfoPair: _partitions)
178 const Partition partition = partitionInfoPair.first;
179 const PartitionInfo& partitionInfo = partitionInfoPair.second;
180 oss << (isTheFirst ? (isTheFirst =
false,
"") :
"; ") << partition <<
": " <<
toString(partitionInfo);