Modern C++ Kafka API
UnorderedOffsetCommitQueue.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/KafkaClient.h>
6 #include <kafka/Types.h>
7 
8 #include <deque>
9 #include <vector>
10 
11 namespace KAFKA_API { namespace clients { namespace consumer {
12 
13 template <typename T>
14 class Heap
15 {
16 public:
17  bool empty() const { return data.empty(); }
18  std::size_t size() const { return data.size(); }
19 
20  const T& front() const { return data[0]; }
21 
22  void push(const T& t)
23  {
24  data.emplace_back(t);
25 
26  for (std::size_t indexCurrent = data.size() - 1; indexCurrent > 0;)
27  {
28  const std::size_t indexParent = (indexCurrent + 1) / 2 - 1;
29 
30  if (!(data[indexCurrent] < data[indexParent])) return;
31 
32  std::swap(data[indexCurrent], data[indexParent]);
33  indexCurrent = indexParent;
34  }
35  }
36 
37  void pop_front()
38  {
39  data[0] = data.back();
40  data.pop_back();
41 
42  if (data.empty()) return;
43 
44  for (std::size_t indexCurrent = 0;;)
45  {
46  const std::size_t indexRightChild = (indexCurrent + 1) * 2;
47  const std::size_t indexLeftChild = indexRightChild - 1;
48 
49  if (indexLeftChild >= data.size()) return;
50 
51  const std::size_t indexMinChild = (indexRightChild >= data.size() || data[indexLeftChild] < data[indexRightChild]) ? indexLeftChild : indexRightChild;
52 
53  if (!(data[indexMinChild] < data[indexCurrent])) return;
54 
55  std::swap(data[indexCurrent], data[indexMinChild]);
56  indexCurrent = indexMinChild;
57  }
58  }
59 
60 private:
61  std::vector<T> data;
62 };
63 
64 
75 {
76 public:
77  UnorderedOffsetCommitQueue(const Topic& topic, Partition partition)
78  : _partitionInfo(std::string("topic[").append(topic).append("], paritition[").append(std::to_string(partition)).append("]"))
79  {
80  }
81  UnorderedOffsetCommitQueue() = default;
82 
86  std::size_t size() const { return _offsetsReceived.size(); }
87 
92  void waitOffset(Offset offset)
93  {
94  if (offset < 0 || (!_offsetsReceived.empty() && offset <= _offsetsReceived.back()))
95  {
96  // Invalid offset (might be fetched from the record which had no valid offset)
97  KAFKA_API_LOG(Log::Level::Err, "Got invalid offset to wait[%lld]! %s", offset, (_partitionInfo.empty() ? "" : _partitionInfo.c_str()));
98  return;
99  }
100 
101  _offsetsReceived.emplace_back(offset);
102  }
103 
108  void ackOffset(Offset offset)
109  {
110  const Offset maxOffsetReceived = _offsetsReceived.back();
111  if (offset > maxOffsetReceived)
112  {
113  // Runtime error
114  KAFKA_API_LOG(Log::Level::Err, "Got invalid ack offset[%lld]! Even larger than all offsets received[%lld]! %s", offset, maxOffsetReceived, (_partitionInfo.empty() ? "" : _partitionInfo.c_str()));
115  }
116 
117  _offsetsToCommit.push(offset);
118  do
119  {
120  const Offset minOffsetToCommit = _offsetsToCommit.front();
121  const Offset expectedOffset = _offsetsReceived.front();
122  if (minOffsetToCommit == expectedOffset)
123  {
124  _toCommit = expectedOffset + 1;
125  _offsetsToCommit.pop_front();
126  _offsetsReceived.pop_front();
127  }
128  else if (minOffsetToCommit < expectedOffset)
129  {
130  // Inconsist error (might be caused by duplicated ack)
131  KAFKA_API_LOG(Log::Level::Err, "Got invalid ack offset[%lld]! Even smaller than expected[%lld]! %s", minOffsetToCommit, expectedOffset, (_partitionInfo.empty() ? "" : _partitionInfo.c_str()));
132  _offsetsToCommit.pop_front();
133  }
134  else
135  {
136  break;
137  }
138  } while (!_offsetsToCommit.empty());
139  }
140 
144  Optional<Offset> popOffsetToCommit()
145  {
146  Optional<Offset> ret;
147  if (_committed != _toCommit)
148  {
149  ret = _committed = _toCommit;
150  }
151  return ret;
152  }
153 
157  Optional<Offset> lastPoppedOffset()
158  {
159  Optional<Offset> ret;
160  if (_committed != INVALID_OFFSET)
161  {
162  ret = _committed;
163  }
164  return ret;
165  }
166 
167 private:
168  std::deque<Offset> _offsetsReceived;
169  Heap<Offset> _offsetsToCommit;
170  Offset _toCommit = {INVALID_OFFSET};
171  Offset _committed = {INVALID_OFFSET};
172  std::string _partitionInfo;
173 
174  static constexpr Offset INVALID_OFFSET = -1;
175 };
176 
177 } } } // end of KAFKA_API::clients::consumer
178 
The queue can be used to determine the right offset to commit.
Definition: UnorderedOffsetCommitQueue.h:75
void ackOffset(Offset offset)
Ack the record has been handled and ready to be committed.
Definition: UnorderedOffsetCommitQueue.h:108
void waitOffset(Offset offset)
Add an offset (for a ConsumerRecord) to the waiting list, until it being acked (with ackOffset).
Definition: UnorderedOffsetCommitQueue.h:92
std::size_t size() const
Return how many received offsets have not been popped to commit (with popOffsetToCommit()).
Definition: UnorderedOffsetCommitQueue.h:86
Optional< Offset > popOffsetToCommit()
Pop the offset which is ready for the consumer (if any).
Definition: UnorderedOffsetCommitQueue.h:144
Optional< Offset > lastPoppedOffset()
Return the offset last popped.
Definition: UnorderedOffsetCommitQueue.h:157