diff --git a/lib/src/Client.cc b/lib/src/Client.cc index 7da175d..cc04a55 100644 --- a/lib/src/Client.cc +++ b/lib/src/Client.cc @@ -112,7 +112,11 @@ int Client::sendRequest(Request *request) unsigned char *buffer = request->toWireFormat(); int numBytesSent = this->connection->write(request->size(), buffer); - if (numBytesSent == Connection::WRITE_ERROR) { E("Client::sendRequest():write error:" << strerror(errno) << "\n"); return numBytesSent; } + if (numBytesSent == Connection::WRITE_ERROR) { + E("Client::sendRequest():write error:" << strerror(errno) << "\n"); + delete this->connection; this->connection = NULL; + return numBytesSent; + } D(cout.flush() << "Client::sendRequest():request sent:numBytes:" << numBytesSent << "\n";) return numBytesSent; } @@ -126,13 +130,23 @@ ResponseClass *Client::receiveResponse() int netValueSize = -1; int numBytesReceived = this->connection->read(sizeof(int), (unsigned char *)(&netValueSize)); - if (numBytesReceived == Connection::READ_ERROR) { E("Client::receiveResponse():read error on size:" << strerror(errno) << "\n"); return NULL; } + if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR) + { + E("Client::receiveResponse():read error on size:" << strerror(errno) << "\n"); + delete this->connection; this->connection = NULL; + return NULL; + } int hostValueSize = ntohl(netValueSize); D(cout.flush() << "Client::receiveResponse():incoming response:size:" << hostValueSize << "\n";) unsigned char *buffer = new unsigned char[hostValueSize+sizeof(int)]; // add space for int32 size memcpy(buffer, &netValueSize, sizeof(int)); numBytesReceived = this->connection->read(hostValueSize, buffer + sizeof(int)); - if (numBytesReceived == Connection::READ_ERROR) { E("Client::receiveResponse():read error on body:" << strerror(errno) << "\n"); return NULL; } + if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR) + { + E("Client::receiveResponse():read error on body:" << strerror(errno) << "\n"); + delete this->connection; this->connection = NULL; + return NULL; + } return new ResponseClass(buffer, true); // true specfies delete buffer on ~Response() } diff --git a/lib/src/Connection.cc b/lib/src/Connection.cc index 598e479..4cc730e 100644 --- a/lib/src/Connection.cc +++ b/lib/src/Connection.cc @@ -43,6 +43,7 @@ const int Connection::DEFAULT_BUFFER_SIZE; const int Connection::SOCKET_UNINITIALIZED; const int Connection::OPEN_CONNECTION_ERROR; const int Connection::READ_ERROR; +const int Connection::END_OF_CONNECTION_ERROR; const int Connection::WRITE_ERROR; Connection::Connection(string host, int port) @@ -141,7 +142,7 @@ int Connection::read(int numBytes, unsigned char* buffer) while (numBytesReceived < numBytes) { int rcvd = (int)::recv(this->socketFd, p, (size_t)(numBytes-numBytesReceived), flags); - if (rcvd == READ_ERROR) { E("Connection::read():error:" << strerror(errno) << "\n"); break; } + if (rcvd == READ_ERROR || rcvd == END_OF_CONNECTION_ERROR) {E("Connection::read():error:" << strerror(errno) << "\n"); break; } p += rcvd; numBytesReceived += rcvd; D(cout.flush() << "--------------Connection::read(" << numBytes << "):read " << rcvd << " bytes\n";) @@ -155,7 +156,10 @@ int Connection::write(int numBytes, unsigned char* buffer) { D(cout.flush() << "--------------Connection::write(" << numBytes << ")\n";) - int flags = 0; + // MSG_NOSIGNAL (since Linux 2.2) + // The local end has been "shut down" on a connection oriented socket. In this case the process will also receive a SIGPIPE unless MSG_NOSIGNAL is set. + + int flags = MSG_NOSIGNAL; int numBytesSent = (int)::send(this->socketFd, (const void*)buffer, (ssize_t)numBytes, flags); if (numBytesSent == WRITE_ERROR) { E("Connection::write():error:" << strerror(errno) << "\n"); } D(cout.flush() << "--------------Connection::write(" << numBytes << "):wrote " << numBytesSent << "bytes\n";) diff --git a/lib/src/Connection.h b/lib/src/Connection.h index 8c7d26a..e9237f0 100644 --- a/lib/src/Connection.h +++ b/lib/src/Connection.h @@ -42,6 +42,7 @@ class Connection static const int SOCKET_UNINITIALIZED = -1; static const int OPEN_CONNECTION_ERROR = -1; static const int READ_ERROR = -1; + static const int END_OF_CONNECTION_ERROR = 0; static const int WRITE_ERROR = -1; Connection(std::string host, int port); diff --git a/lib/src/Request.cc b/lib/src/Request.cc index cc114f9..6648fcb 100644 --- a/lib/src/Request.cc +++ b/lib/src/Request.cc @@ -58,6 +58,17 @@ Request::Request(short int apiKey, short int apiVersion, int correlationId, stri this->clientId = clientId; } +Request::Request(short int apiKey, short int apiVersion, int correlationId, string clientId, long bufferSize) : RequestOrResponse(bufferSize) +{ + D(cout.flush() << "--------------Request(params)\n";) + + this->apiKey = apiKey; + this->apiVersion = apiVersion; + this->correlationId = correlationId; + this->clientId = clientId; +} + + unsigned char* Request::toWireFormat(bool updatePacketSize) { unsigned char* buffer = this->RequestOrResponse::toWireFormat(false); diff --git a/lib/src/Request.h b/lib/src/Request.h index e6efba8..e1d2fb9 100644 --- a/lib/src/Request.h +++ b/lib/src/Request.h @@ -42,6 +42,7 @@ class Request : public RequestOrResponse Request(unsigned char *buffer, bool releaseBuffer = false); Request(short int apiKey, short int apiVersion, int correlationId, std::string clientId); + Request(short int apiKey, short int apiVersion, int correlationId, std::string clientId, long bufferSize); unsigned char* toWireFormat(bool updatePacketSize = true); int getWireFormatSize(bool includePacketSize = true); diff --git a/lib/src/RequestOrResponse.cc b/lib/src/RequestOrResponse.cc index f5d6aaf..fb00d49 100644 --- a/lib/src/RequestOrResponse.cc +++ b/lib/src/RequestOrResponse.cc @@ -48,6 +48,13 @@ RequestOrResponse::RequestOrResponse() : WireFormatter() D(cout.flush() << "--------------RequestOrResponse(params)\n";) } +RequestOrResponse::RequestOrResponse(long bufferSize) : WireFormatter() +{ + this->packet = new Packet(bufferSize); + + D(cout.flush() << "--------------RequestOrResponse(params)\n";) +} + RequestOrResponse::~RequestOrResponse() { delete this->packet; diff --git a/lib/src/RequestOrResponse.h b/lib/src/RequestOrResponse.h index 54221ef..a661197 100644 --- a/lib/src/RequestOrResponse.h +++ b/lib/src/RequestOrResponse.h @@ -39,6 +39,7 @@ class RequestOrResponse : public WireFormatter, public PacketWriter public: RequestOrResponse(); + RequestOrResponse(long bufferSize); RequestOrResponse(unsigned char *buffer, bool releaseBuffer = false); ~RequestOrResponse(); diff --git a/lib/src/produce/ProduceRequest.cc b/lib/src/produce/ProduceRequest.cc index 99c6f12..524981b 100644 --- a/lib/src/produce/ProduceRequest.cc +++ b/lib/src/produce/ProduceRequest.cc @@ -61,6 +61,18 @@ namespace LibKafka { this->produceTopicArray = produceTopicArray; this->releaseArrays = releaseArrays; } + + ProduceRequest::ProduceRequest(int correlationId, std::string clientId, short int requiredAcks, int timeout, int produceTopicArraySize, TopicNameBlock** produceTopicArray, long bufferSize, bool releaseArrays) : Request(ApiConstants::PRODUCE_REQUEST_KEY, ApiConstants::API_VERSION, correlationId, clientId, bufferSize) + { + D(cout.flush() << "--------------ProduceRequest(params)\n";) + + this->requiredAcks = requiredAcks; + this->timeout = timeout; + this->produceTopicArraySize = produceTopicArraySize; + this->produceTopicArray = produceTopicArray; + this->releaseArrays = releaseArrays; + } + ProduceRequest::~ProduceRequest() { diff --git a/lib/src/produce/ProduceRequest.h b/lib/src/produce/ProduceRequest.h index b0d8403..876808f 100644 --- a/lib/src/produce/ProduceRequest.h +++ b/lib/src/produce/ProduceRequest.h @@ -48,6 +48,7 @@ class ProduceRequest : public Request ProduceRequest(unsigned char *buffer, bool releaseBuffer = false); ProduceRequest(int correlationId, std::string clientId, short int requiredAcks, int timeout, int produceTopicArraySize, TopicNameBlock **produceTopicArray, bool releaseArrays = false); + ProduceRequest(int correlationId, std::string clientId, short int requiredAcks, int timeout, int produceTopicArraySize, TopicNameBlock **produceTopicArray, long bufferSize, bool releaseArrays = false); ~ProduceRequest(); unsigned char* toWireFormat(bool updatePacketSize = true);