Modern C++ Kafka API
Error.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/RdKafkaHelper.h>
6 
7 #include <librdkafka/rdkafka.h>
8 
9 #include <string>
10 #include <system_error>
11 
12 
13 namespace KAFKA_API {
14 
15 struct ErrorCategory: public std::error_category
16 {
17  const char* name() const noexcept override { return "KafkaError"; }
18  std::string message(int ev) const override { return rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(ev)); }
19 
20  template <typename T = void>
21  struct Global { static ErrorCategory category; };
22 };
23 
24 template <typename T>
25 ErrorCategory ErrorCategory::Global<T>::category;
26 
27 
31 class Error
32 {
33 public:
34  // The error with rich info
35  explicit Error(rd_kafka_error_t* error = nullptr): _rkError(error, RkErrorDeleter) {}
36  // The error with brief info
37  explicit Error(rd_kafka_resp_err_t respErr): _respErr(respErr) {}
38  // The error with detailed message
39  Error(rd_kafka_resp_err_t respErr, std::string message, bool fatal = false)
40  : _respErr(respErr), _message(std::move(message)), _isFatal(fatal) {}
41  // Copy constructor
42  Error(const Error& error) { *this = error; }
43 
44  // Assignment operator
45  Error& operator=(const Error& error)
46  {
47  if (this == &error) return *this;
48 
49  _rkError.reset();
50 
51  _respErr = static_cast<rd_kafka_resp_err_t>(error.value());
52  _message = error._message;
53  _isFatal = error.isFatal();
54  _txnRequiresAbort = error.transactionRequiresAbort();
55  _isRetriable = error.isRetriable();
56 
57  return *this;
58  }
59 
63  explicit operator bool() const { return static_cast<bool>(value()); }
64 
68  explicit operator std::error_code() const
69  {
70  return {value(), ErrorCategory::Global<>::category};
71  }
72 
81  int value() const
82  {
83  return static_cast<int>(_rkError ? rd_kafka_error_code(_rkError.get()) : _respErr);
84  }
85 
89  std::string message() const
90  {
91  return _message ? *_message :
92  (_rkError ? rd_kafka_error_string(_rkError.get()) : rd_kafka_err2str(_respErr));
93  }
94 
98  std::string toString() const
99  {
100  std::ostringstream oss;
101 
102  oss << rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(value())) << " [" << value() << "]" << (isFatal() ? " fatal" : "");
103  if (transactionRequiresAbort()) oss << " | transaction-requires-abort";
104  if (auto retriable = isRetriable()) oss << " | " << (*retriable ? "retriable" : "non-retriable");
105  if (_message) oss << " | " << *_message;
106 
107  return oss.str();
108  }
109 
113  bool isFatal() const
114  {
115  return _rkError ? rd_kafka_error_is_fatal(_rkError.get()) : _isFatal;
116  }
117 
121  Optional<bool> isRetriable() const
122  {
123  return _rkError ? rd_kafka_error_is_retriable(_rkError.get()) : _isRetriable;
124  }
125 
134  {
135  return _rkError ? rd_kafka_error_txn_requires_abort(_rkError.get()) : false;
136  }
137 
138 private:
139  rd_kafka_error_shared_ptr _rkError; // For error with rich info
140  rd_kafka_resp_err_t _respErr{}; // For error with a simple response code
141  Optional<std::string> _message; // Additional detailed message (if any)
142  bool _isFatal = false;
143  bool _txnRequiresAbort = false;
144  Optional<bool> _isRetriable; // Retriable flag (if any)
145 };
146 
147 } // end of KAFKA_API
148 
Unified error type.
Definition: Error.h:32
bool isFatal() const
Fatal error indicates that the client instance is no longer usable.
Definition: Error.h:113
bool transactionRequiresAbort() const
Show whether the error is an abortable transaction error.
Definition: Error.h:133
std::string message() const
Readable error string.
Definition: Error.h:89
int value() const
Obtains the underlying error code value.
Definition: Error.h:81
Optional< bool > isRetriable() const
Show whether the operation may be retried.
Definition: Error.h:121
std::string toString() const
Detailed error string.
Definition: Error.h:98