Commit ed3def01 authored by mbelshe@chromium.org's avatar mbelshe@chromium.org

Memory leak fix, log file reopen, remote ip fix, connection idle timeout,

pidfile.

SPDY frames were leaking when used with DataFrame and OutputList.
HUP signal will now reopen the log file.
Remote IP obtained for 'forward-ip-header' was incorrect.
Added connection idle timeout.
Fixed up pidfile handling.
Flip server now creates and locks a pid file.

Patch from Kevin Lindsay: kelindsay@gmail.com

BUG=none
TEST=self

Review URL: http://codereview.chromium.org/6332010

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@72046 0039d316-1c4b-4281-b951-d872f2087c98
parent 105a1bee
...@@ -136,6 +136,7 @@ public: ...@@ -136,6 +136,7 @@ public:
bool wait_for_iface_; bool wait_for_iface_;
int ssl_session_expiry_; int ssl_session_expiry_;
bool ssl_disable_compression_; bool ssl_disable_compression_;
int idle_timeout_s_;
FlipConfig() : FlipConfig() :
server_think_time_in_s_(0), server_think_time_in_s_(0),
...@@ -143,7 +144,8 @@ public: ...@@ -143,7 +144,8 @@ public:
forward_ip_header_enabled_(false), forward_ip_header_enabled_(false),
wait_for_iface_(false), wait_for_iface_(false),
ssl_session_expiry_(300), ssl_session_expiry_(300),
ssl_disable_compression_(false) ssl_disable_compression_(false),
idle_timeout_s_(300)
{} {}
~FlipConfig() {} ~FlipConfig() {}
......
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#include <netinet/tcp.h> // For TCP_NODELAY #include <netinet/tcp.h> // For TCP_NODELAY
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <openssl/err.h> #include <openssl/err.h>
#include <openssl/ssl.h> #include <openssl/ssl.h>
...@@ -48,6 +50,7 @@ using std::pair; ...@@ -48,6 +50,7 @@ using std::pair;
using std::string; using std::string;
using std::vector; using std::vector;
using std::cout; using std::cout;
using std::cerr;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -61,6 +64,8 @@ using std::cout; ...@@ -61,6 +64,8 @@ using std::cout;
#define SSL_CTX_DEFAULT_CIPHER_LIST "!aNULL:!ADH:!eNull:!LOW:!EXP:RC4+RSA:MEDIUM:HIGH" #define SSL_CTX_DEFAULT_CIPHER_LIST "!aNULL:!ADH:!eNull:!LOW:!EXP:RC4+RSA:MEDIUM:HIGH"
#define PIDFILE "/var/run/flip-server.pid"
// If true, then disables the nagle algorithm); // If true, then disables the nagle algorithm);
bool FLAGS_disable_nagle = true; bool FLAGS_disable_nagle = true;
...@@ -289,11 +294,24 @@ class DataFrame { ...@@ -289,11 +294,24 @@ class DataFrame {
bool delete_when_done; bool delete_when_done;
size_t index; size_t index;
DataFrame() : data(NULL), size(0), delete_when_done(false), index(0) {} DataFrame() : data(NULL), size(0), delete_when_done(false), index(0) {}
void MaybeDelete() { virtual void MaybeDelete() {
if (delete_when_done) { if (delete_when_done) {
delete[] data; delete[] data;
} }
} }
virtual ~DataFrame() {
MaybeDelete();
}
};
class SpdyFrameDataFrame : public DataFrame {
public:
SpdyFrame* frame;
virtual void MaybeDelete() {
if (delete_when_done) {
delete frame;
}
}
}; };
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -698,6 +716,7 @@ class SMInterface { ...@@ -698,6 +716,7 @@ class SMInterface {
int fd, int fd,
string server_ip, string server_ip,
string server_port, string server_port,
string remote_ip,
bool use_ssl) = 0; bool use_ssl) = 0;
virtual size_t ProcessReadInput(const char* data, size_t len) = 0; virtual size_t ProcessReadInput(const char* data, size_t len) = 0;
virtual size_t ProcessWriteInput(const char* data, size_t len) = 0; virtual size_t ProcessWriteInput(const char* data, size_t len) = 0;
...@@ -741,7 +760,7 @@ class SMConnectionInterface { ...@@ -741,7 +760,7 @@ class SMConnectionInterface {
class HttpSM; class HttpSM;
class SMConnection; class SMConnection;
typedef list<DataFrame> OutputList; typedef list<DataFrame*> OutputList;
class SMConnectionPoolInterface { class SMConnectionPoolInterface {
public: public:
...@@ -796,7 +815,8 @@ class SMConnection: public SMConnectionInterface, ...@@ -796,7 +815,8 @@ class SMConnection: public SMConnectionInterface,
sm_interface_(NULL), sm_interface_(NULL),
log_prefix_(log_prefix), log_prefix_(log_prefix),
max_bytes_sent_per_dowrite_(4096), max_bytes_sent_per_dowrite_(4096),
ssl_(NULL) ssl_(NULL),
last_read_time_(0)
{} {}
int fd_; int fd_;
...@@ -828,6 +848,7 @@ class SMConnection: public SMConnectionInterface, ...@@ -828,6 +848,7 @@ class SMConnection: public SMConnectionInterface,
SSL* ssl_; SSL* ssl_;
public: public:
time_t last_read_time_;
string server_ip_; string server_ip_;
string server_port_; string server_port_;
...@@ -839,10 +860,10 @@ class SMConnection: public SMConnectionInterface, ...@@ -839,10 +860,10 @@ class SMConnection: public SMConnectionInterface,
<< "Setting ready to send: EPOLLIN | EPOLLOUT"; << "Setting ready to send: EPOLLIN | EPOLLOUT";
epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT); epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT);
} }
void EnqueueDataFrame(const DataFrame& df) { void EnqueueDataFrame(DataFrame* df) {
output_list_.push_back(df); output_list_.push_back(df);
VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: " VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: "
<< "size = " << df.size << ": Setting FD ready."; << "size = " << df->size << ": Setting FD ready.";
ReadyToSend(); ReadyToSend();
} }
int fd() { return fd_; } int fd() { return fd_; }
...@@ -871,12 +892,15 @@ class SMConnection: public SMConnectionInterface, ...@@ -871,12 +892,15 @@ class SMConnection: public SMConnectionInterface,
int fd, int fd,
string server_ip, string server_ip,
string server_port, string server_port,
string remote_ip,
bool use_ssl) { bool use_ssl) {
if (initialized_) { if (initialized_) {
LOG(FATAL) << "Attempted to initialize already initialized server"; LOG(FATAL) << "Attempted to initialize already initialized server";
return; return;
} }
client_ip_ = remote_ip;
if (fd == -1) { if (fd == -1) {
// If fd == -1, then we are initializing a new connection that will // If fd == -1, then we are initializing a new connection that will
// connect to the backend. // connect to the backend.
...@@ -921,23 +945,12 @@ class SMConnection: public SMConnectionInterface, ...@@ -921,23 +945,12 @@ class SMConnection: public SMConnectionInterface,
} }
fd_ = fd; fd_ = fd;
struct sockaddr sock_addr;
socklen_t addr_size = sizeof(sock_addr);
addr_size = sizeof(sock_addr);
int res = getsockname(fd_, &sock_addr, &addr_size);
if (res < 0) {
LOG(ERROR) << "Could not get socket address for fd " << fd_
<< ": getsockname: " << strerror(errno);
} else {
struct sockaddr_in *sock_addr_in = (struct sockaddr_in *)&sock_addr;
char ip[16];
snprintf(ip, sizeof(ip), "%d.%d.%d.%d",
IPV4_PRINTABLE_FORMAT(sock_addr_in->sin_addr.s_addr));
client_ip_ = ip;
}
} }
registered_in_epoll_server_ = false; registered_in_epoll_server_ = false;
// Set the last read time here as the idle checker will start from
// now.
last_read_time_ = time(NULL);
initialized_ = true; initialized_ = true;
connection_pool_ = connection_pool; connection_pool_ = connection_pool;
...@@ -991,6 +1004,12 @@ class SMConnection: public SMConnectionInterface, ...@@ -991,6 +1004,12 @@ class SMConnection: public SMConnectionInterface,
if (rv != chunksize) if (rv != chunksize)
break; // If we couldn't write everything, we're implicitly stalled break; // If we couldn't write everything, we're implicitly stalled
} }
if (!(flags & MSG_MORE)) {
int state = 0;
setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) );
state = 1;
setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) );
}
} else { } else {
bytes_written = send(fd_, data, len, flags); bytes_written = send(fd_, data, len, flags);
} }
...@@ -1019,7 +1038,7 @@ class SMConnection: public SMConnectionInterface, ...@@ -1019,7 +1038,7 @@ class SMConnection: public SMConnectionInterface,
} }
void Cleanup(const char* cleanup) { void Cleanup(const char* cleanup) {
VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup"; VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup: " << cleanup;
if (!initialized_) { if (!initialized_) {
return; return;
} }
...@@ -1030,6 +1049,7 @@ class SMConnection: public SMConnectionInterface, ...@@ -1030,6 +1049,7 @@ class SMConnection: public SMConnectionInterface,
if (sm_interface_) { if (sm_interface_) {
sm_interface_->ResetForNewConnection(); sm_interface_->ResetForNewConnection();
} }
last_read_time_ = 0;
} }
private: private:
...@@ -1132,6 +1152,7 @@ class SMConnection: public SMConnectionInterface, ...@@ -1132,6 +1152,7 @@ class SMConnection: public SMConnectionInterface,
} else if (bytes_read > 0) { } else if (bytes_read > 0) {
VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read
<< " bytes"; << " bytes";
last_read_time_ = time(NULL);
if (!protocol_detected_) { if (!protocol_detected_) {
if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) {
// Http Server // Http Server
...@@ -1303,11 +1324,11 @@ class SMConnection: public SMConnectionInterface, ...@@ -1303,11 +1324,11 @@ class SMConnection: public SMConnectionInterface,
if (sm_interface_ && output_list_.size() < 2) { if (sm_interface_ && output_list_.size() < 2) {
sm_interface_->GetOutput(); sm_interface_->GetOutput();
} }
DataFrame& data_frame = output_list_.front(); DataFrame* data_frame = output_list_.front();
const char* bytes = data_frame.data; const char* bytes = data_frame->data;
int size = data_frame.size; int size = data_frame->size;
bytes += data_frame.index; bytes += data_frame->index;
size -= data_frame.index; size -= data_frame->index;
DCHECK_GE(size, 0); DCHECK_GE(size, 0);
if (size <= 0) { if (size <= 0) {
// Empty data frame. Indicates end of data from client. // Empty data frame. Indicates end of data from client.
...@@ -1315,8 +1336,8 @@ class SMConnection: public SMConnectionInterface, ...@@ -1315,8 +1336,8 @@ class SMConnection: public SMConnectionInterface,
int state = 0; int state = 0;
VLOG(2) << log_prefix_ << "Empty data frame, uncorking socket."; VLOG(2) << log_prefix_ << "Empty data frame, uncorking socket.";
setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) ); setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) );
data_frame.MaybeDelete();
output_list_.pop_front(); output_list_.pop_front();
delete data_frame;
continue; continue;
} }
...@@ -1349,7 +1370,7 @@ class SMConnection: public SMConnectionInterface, ...@@ -1349,7 +1370,7 @@ class SMConnection: public SMConnectionInterface,
} else if (bytes_written > 0) { } else if (bytes_written > 0) {
VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: " VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: "
<< bytes_written << " bytes"; << bytes_written << " bytes";
data_frame.index += bytes_written; data_frame->index += bytes_written;
bytes_sent += bytes_written; bytes_sent += bytes_written;
continue; continue;
} else if (bytes_written == -2) { } else if (bytes_written == -2) {
...@@ -1384,6 +1405,7 @@ class SMConnection: public SMConnectionInterface, ...@@ -1384,6 +1405,7 @@ class SMConnection: public SMConnectionInterface,
PrintSslError(); PrintSslError();
SSL_free(ssl_); SSL_free(ssl_);
PrintSslError(); PrintSslError();
ssl_ = NULL;
} }
if (registered_in_epoll_server_) { if (registered_in_epoll_server_) {
epoll_server_->UnregisterFD(fd_); epoll_server_->UnregisterFD(fd_);
...@@ -1398,6 +1420,12 @@ class SMConnection: public SMConnectionInterface, ...@@ -1398,6 +1420,12 @@ class SMConnection: public SMConnectionInterface,
initialized_ = false; initialized_ = false;
protocol_detected_ = false; protocol_detected_ = false;
events_ = 0; events_ = 0;
for (list<DataFrame*>::iterator i =
output_list_.begin();
i != output_list_.end();
++i) {
delete *i;
}
output_list_.clear(); output_list_.clear();
} }
...@@ -1639,12 +1667,13 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -1639,12 +1667,13 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
int fd, int fd,
string server_ip, string server_ip,
string server_port, string server_port,
string remote_ip,
bool use_ssl) { bool use_ssl) {
VLOG(2) << ACCEPTOR_CLIENT_IDENT VLOG(2) << ACCEPTOR_CLIENT_IDENT
<< "SpdySM: Initializing server connection."; << "SpdySM: Initializing server connection.";
connection_->InitSMConnection(connection_pool, sm_interface, connection_->InitSMConnection(connection_pool, sm_interface,
epoll_server, fd, server_ip, server_port, epoll_server, fd, server_ip, server_port,
use_ssl); remote_ip, use_ssl);
} }
private: private:
...@@ -1690,13 +1719,13 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -1690,13 +1719,13 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
sm_http_interface->InitSMInterface(this, server_idx); sm_http_interface->InitSMInterface(this, server_idx);
sm_http_interface->InitSMConnection(NULL, sm_http_interface, sm_http_interface->InitSMConnection(NULL, sm_http_interface,
epoll_server_, -1, epoll_server_, -1,
server_ip, server_port, false); server_ip, server_port, "", false);
return sm_http_interface; return sm_http_interface;
} }
int SpdyHandleNewStream(const SpdyControlFrame* frame, int SpdyHandleNewStream(const SpdyControlFrame* frame,
string *http_data, string &http_data,
bool *is_https_scheme) bool *is_https_scheme)
{ {
bool parsed_headers = false; bool parsed_headers = false;
...@@ -1743,21 +1772,21 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -1743,21 +1772,21 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
filename); filename);
} else { } else {
SpdyHeaderBlock::iterator version = headers.find("version"); SpdyHeaderBlock::iterator version = headers.find("version");
*http_data += method->second + " " + uri + " " + version->second + "\r\n"; http_data += method->second + " " + uri + " " + version->second + "\r\n";
VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " " VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " "
<< uri << " " << version->second; << uri << " " << version->second;
for (SpdyHeaderBlock::iterator i = headers.begin(); for (SpdyHeaderBlock::iterator i = headers.begin();
i != headers.end(); ++i) { i != headers.end(); ++i) {
*http_data += i->first + ": " + i->second + "\r\n"; http_data += i->first + ": " + i->second + "\r\n";
VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":" VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":"
<< i->second.c_str(); << i->second.c_str();
} }
if (g_proxy_config.forward_ip_header_enabled_) { if (g_proxy_config.forward_ip_header_enabled_) {
// X-Client-Cluster-IP header // X-Client-Cluster-IP header
*http_data += g_proxy_config.forward_ip_header_ + ": " + http_data += g_proxy_config.forward_ip_header_ + ": " +
connection_->client_ip() + "\r\n"; connection_->client_ip() + "\r\n";
} }
*http_data += "\r\n"; http_data += "\r\n";
} }
VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data; VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data;
...@@ -1775,7 +1804,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -1775,7 +1804,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
string http_data; string http_data;
bool is_https_scheme; bool is_https_scheme;
int ret = SpdyHandleNewStream(frame, &http_data, &is_https_scheme); int ret = SpdyHandleNewStream(frame, http_data, &is_https_scheme);
if (!ret) { if (!ret) {
LOG(ERROR) << "SpdySM: Could not convert spdy into http."; LOG(ERROR) << "SpdySM: Could not convert spdy into http.";
break; break;
...@@ -1879,8 +1908,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -1879,8 +1908,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
// some logic review and method renaming is probably in order. // some logic review and method renaming is probably in order.
void Cleanup() {} void Cleanup() {}
// Send a settings frame and possibly some NOOP packets to force // Send a settings frame
// opening of cwnd
int PostAcceptHook() { int PostAcceptHook() {
ssize_t bytes_written; ssize_t bytes_written;
spdy::SpdySettings settings; spdy::SpdySettings settings;
...@@ -2042,11 +2070,12 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -2042,11 +2070,12 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
SpdySynStreamControlFrame* fsrcf = SpdySynStreamControlFrame* fsrcf =
spdy_framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true, spdy_framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true,
&block); &block);
DataFrame df; SpdyFrameDataFrame* df = new SpdyFrameDataFrame;
df.size = fsrcf->length() + SpdyFrame::size(); df->size = fsrcf->length() + SpdyFrame::size();
size_t df_size = df.size; size_t df_size = df->size;
df.data = fsrcf->data(); df->data = fsrcf->data();
df.delete_when_done = true; df->frame = fsrcf;
df->delete_when_done = true;
EnqueueDataFrame(df); EnqueueDataFrame(df);
VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader " VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader "
...@@ -2063,11 +2092,12 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -2063,11 +2092,12 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
SpdySynReplyControlFrame* fsrcf = SpdySynReplyControlFrame* fsrcf =
spdy_framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block); spdy_framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block);
DataFrame df; SpdyFrameDataFrame* df = new SpdyFrameDataFrame;
df.size = fsrcf->length() + SpdyFrame::size(); df->size = fsrcf->length() + SpdyFrame::size();
size_t df_size = df.size; size_t df_size = df->size;
df.data = fsrcf->data(); df->data = fsrcf->data();
df.delete_when_done = true; df->frame = fsrcf;
df->delete_when_done = true;
EnqueueDataFrame(df); EnqueueDataFrame(df);
VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader " VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader "
...@@ -2084,14 +2114,13 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -2084,14 +2114,13 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
// TODO(mbelshe): We can't compress here - before going into the // TODO(mbelshe): We can't compress here - before going into the
// priority queue. Compression needs to be done // priority queue. Compression needs to be done
// with late binding. // with late binding.
if (len == 0) { if (len == 0) {
SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, len, SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, len,
flags); flags);
DataFrame df; SpdyFrameDataFrame* df = new SpdyFrameDataFrame;
df.size = fdf->length() + SpdyFrame::size(); df->size = fdf->length() + SpdyFrame::size();
df.data = fdf->data(); df->data = fdf->data();
df.delete_when_done = true; df->delete_when_done = true;
EnqueueDataFrame(df); EnqueueDataFrame(df);
return; return;
} }
...@@ -2109,10 +2138,10 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -2109,10 +2138,10 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, size, SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, size,
chunk_flags); chunk_flags);
DataFrame df; SpdyFrameDataFrame* df = new SpdyFrameDataFrame;
df.size = fdf->length() + SpdyFrame::size(); df->size = fdf->length() + SpdyFrame::size();
df.data = fdf->data(); df->data = fdf->data();
df.delete_when_done = true; df->delete_when_done = true;
EnqueueDataFrame(df); EnqueueDataFrame(df);
VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame " VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame "
...@@ -2124,7 +2153,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { ...@@ -2124,7 +2153,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
} }
} }
void EnqueueDataFrame(const DataFrame& df) { void EnqueueDataFrame(DataFrame* df) {
connection_->EnqueueDataFrame(df); connection_->EnqueueDataFrame(df);
} }
...@@ -2323,13 +2352,14 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface { ...@@ -2323,13 +2352,14 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface {
int fd, int fd,
string server_ip, string server_ip,
string server_port, string server_port,
string remote_ip,
bool use_ssl) bool use_ssl)
{ {
VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Initializing server " VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Initializing server "
<< "connection."; << "connection.";
connection_->InitSMConnection(connection_pool, sm_interface, connection_->InitSMConnection(connection_pool, sm_interface,
epoll_server, fd, server_ip, server_port, epoll_server, fd, server_ip, server_port,
use_ssl); remote_ip, use_ssl);
} }
size_t ProcessReadInput(const char* data, size_t len) { size_t ProcessReadInput(const char* data, size_t len) {
...@@ -2343,10 +2373,10 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface { ...@@ -2343,10 +2373,10 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface {
<< len << ": stream " << stream_id_; << len << ": stream " << stream_id_;
char * dataPtr = new char[len]; char * dataPtr = new char[len];
memcpy(dataPtr, data, len); memcpy(dataPtr, data, len);
DataFrame data_frame; DataFrame* data_frame = new DataFrame;
data_frame.data = (const char *)dataPtr; data_frame->data = (const char *)dataPtr;
data_frame.size = len; data_frame->size = len;
data_frame.delete_when_done = true; data_frame->delete_when_done = true;
connection_->EnqueueDataFrame(data_frame); connection_->EnqueueDataFrame(data_frame);
return len; return len;
} }
...@@ -2450,10 +2480,10 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface { ...@@ -2450,10 +2480,10 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface {
private: private:
void SendEOFImpl(uint32 stream_id) { void SendEOFImpl(uint32 stream_id) {
DataFrame df; DataFrame* df = new DataFrame;
df.data = "0\r\n\r\n"; df->data = "0\r\n\r\n";
df.size = 5; df->size = 5;
df.delete_when_done = false; df->delete_when_done = false;
EnqueueDataFrame(df); EnqueueDataFrame(df);
if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) {
Reset(); Reset();
...@@ -2485,15 +2515,15 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface { ...@@ -2485,15 +2515,15 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface {
size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) {
SimpleBuffer sb; SimpleBuffer sb;
headers.WriteHeaderAndEndingToBuffer(&sb); headers.WriteHeaderAndEndingToBuffer(&sb);
DataFrame df; DataFrame* df = new DataFrame;
df.size = sb.ReadableBytes(); df->size = sb.ReadableBytes();
char* buffer = new char[df.size]; char* buffer = new char[df->size];
df.data = buffer; df->data = buffer;
df.delete_when_done = true; df->delete_when_done = true;
sb.Read(buffer, df.size); sb.Read(buffer, df->size);
VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header "
<< stream_id_; << stream_id_;
size_t df_size = df.size; size_t df_size = df->size;
EnqueueDataFrame(df); EnqueueDataFrame(df);
return df_size; return df_size;
} }
...@@ -2501,15 +2531,15 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface { ...@@ -2501,15 +2531,15 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface {
size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) {
SimpleBuffer sb; SimpleBuffer sb;
headers.WriteHeaderAndEndingToBuffer(&sb); headers.WriteHeaderAndEndingToBuffer(&sb);
DataFrame df; DataFrame* df = new DataFrame;
df.size = sb.ReadableBytes(); df->size = sb.ReadableBytes();
char* buffer = new char[df.size]; char* buffer = new char[df->size];
df.data = buffer; df->data = buffer;
df.delete_when_done = true; df->delete_when_done = true;
sb.Read(buffer, df.size); sb.Read(buffer, df->size);
VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header "
<< stream_id_; << stream_id_;
size_t df_size = df.size; size_t df_size = df->size;
EnqueueDataFrame(df); EnqueueDataFrame(df);
return df_size; return df_size;
} }
...@@ -2519,18 +2549,18 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface { ...@@ -2519,18 +2549,18 @@ class HttpSM : public BalsaVisitorInterface, public SMInterface {
char chunk_buf[128]; char chunk_buf[128];
snprintf(chunk_buf, sizeof(chunk_buf), "%x\r\n", (unsigned int)len); snprintf(chunk_buf, sizeof(chunk_buf), "%x\r\n", (unsigned int)len);
string chunk_description(chunk_buf); string chunk_description(chunk_buf);
DataFrame df; DataFrame* df = new DataFrame;
df.size = chunk_description.size() + len + 2; df->size = chunk_description.size() + len + 2;
char* buffer = new char[df.size]; char* buffer = new char[df->size];
df.data = buffer; df->data = buffer;
df.delete_when_done = true; df->delete_when_done = true;
memcpy(buffer, chunk_description.data(), chunk_description.size()); memcpy(buffer, chunk_description.data(), chunk_description.size());
memcpy(buffer + chunk_description.size(), data, len); memcpy(buffer + chunk_description.size(), data, len);
memcpy(buffer + chunk_description.size() + len, "\r\n", 2); memcpy(buffer + chunk_description.size() + len, "\r\n", 2);
EnqueueDataFrame(df); EnqueueDataFrame(df);
} }
void EnqueueDataFrame(const DataFrame& df) { void EnqueueDataFrame(DataFrame* df) {
VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Enqueue data frame: stream " VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Enqueue data frame: stream "
<< stream_id_; << stream_id_;
connection_->EnqueueDataFrame(df); connection_->EnqueueDataFrame(df);
...@@ -2610,13 +2640,14 @@ class StreamerSM : public SMInterface { ...@@ -2610,13 +2640,14 @@ class StreamerSM : public SMInterface {
int fd, int fd,
string server_ip, string server_ip,
string server_port, string server_port,
string remote_ip,
bool use_ssl) bool use_ssl)
{ {
VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Initializing server " VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Initializing server "
<< "connection."; << "connection.";
connection_->InitSMConnection(connection_pool, sm_interface, connection_->InitSMConnection(connection_pool, sm_interface,
epoll_server, fd, server_ip, epoll_server, fd, server_ip,
server_port, use_ssl); server_port, remote_ip, use_ssl);
} }
size_t ProcessReadInput(const char* data, size_t len) { size_t ProcessReadInput(const char* data, size_t len) {
...@@ -2626,10 +2657,10 @@ class StreamerSM : public SMInterface { ...@@ -2626,10 +2657,10 @@ class StreamerSM : public SMInterface {
size_t ProcessWriteInput(const char* data, size_t len) { size_t ProcessWriteInput(const char* data, size_t len) {
char * dataPtr = new char[len]; char * dataPtr = new char[len];
memcpy(dataPtr, data, len); memcpy(dataPtr, data, len);
DataFrame df; DataFrame* df = new DataFrame;
df.data = (const char *)dataPtr; df->data = (const char *)dataPtr;
df.size = len; df->size = len;
df.delete_when_done = true; df->delete_when_done = true;
connection_->EnqueueDataFrame(df); connection_->EnqueueDataFrame(df);
return len; return len;
} }
...@@ -2684,6 +2715,7 @@ class StreamerSM : public SMInterface { ...@@ -2684,6 +2715,7 @@ class StreamerSM : public SMInterface {
epoll_server_, -1, epoll_server_, -1,
acceptor_->https_server_ip_, acceptor_->https_server_ip_,
acceptor_->https_server_port_, acceptor_->https_server_port_,
"",
false); false);
return 1; return 1;
...@@ -2773,6 +2805,7 @@ class SMAcceptorThread : public SimpleThread, ...@@ -2773,6 +2805,7 @@ class SMAcceptorThread : public SimpleThread,
vector<SMConnection*> unused_server_connections_; vector<SMConnection*> unused_server_connections_;
vector<SMConnection*> tmp_unused_server_connections_; vector<SMConnection*> tmp_unused_server_connections_;
vector<SMConnection*> allocated_server_connections_; vector<SMConnection*> allocated_server_connections_;
list<SMConnection*> active_server_connections_;
Notification quitting_; Notification quitting_;
MemoryCache* memory_cache_; MemoryCache* memory_cache_;
public: public:
...@@ -2806,6 +2839,7 @@ class SMAcceptorThread : public SimpleThread, ...@@ -2806,6 +2839,7 @@ class SMAcceptorThread : public SimpleThread,
++i) { ++i) {
delete *i; delete *i;
} }
delete ssl_state_;
} }
SMConnection* NewConnection() { SMConnection* NewConnection() {
...@@ -2832,7 +2866,7 @@ class SMAcceptorThread : public SimpleThread, ...@@ -2832,7 +2866,7 @@ class SMAcceptorThread : public SimpleThread,
epoll_server_.RegisterFD(acceptor_->listen_fd_, this, EPOLLIN | EPOLLET); epoll_server_.RegisterFD(acceptor_->listen_fd_, this, EPOLLIN | EPOLLET);
} }
void HandleConnection(int server_fd) { void HandleConnection(int server_fd, struct sockaddr_in *remote_addr) {
int on = 1; int on = 1;
int rc; int rc;
if (acceptor_->disable_nagle_) { if (acceptor_->disable_nagle_) {
...@@ -2851,12 +2885,15 @@ class SMAcceptorThread : public SimpleThread, ...@@ -2851,12 +2885,15 @@ class SMAcceptorThread : public SimpleThread,
close(server_fd); close(server_fd);
return; return;
} }
string remote_ip = inet_ntoa(remote_addr->sin_addr);
server_connection->InitSMConnection(this, server_connection->InitSMConnection(this,
NULL, NULL,
&epoll_server_, &epoll_server_,
server_fd, server_fd,
"", "", "", "", remote_ip,
use_ssl_); use_ssl_);
if (server_connection->initialized())
active_server_connections_.push_back(server_connection);
} }
void AcceptFromListenFD() { void AcceptFromListenFD() {
...@@ -2874,7 +2911,7 @@ class SMAcceptorThread : public SimpleThread, ...@@ -2874,7 +2911,7 @@ class SMAcceptorThread : public SimpleThread,
break; break;
} }
VLOG(1) << ACCEPTOR_CLIENT_IDENT << " Accepted connection"; VLOG(1) << ACCEPTOR_CLIENT_IDENT << " Accepted connection";
HandleConnection(fd); HandleConnection(fd, (struct sockaddr_in *)&address);
} }
} else { } else {
while (true) { while (true) {
...@@ -2890,7 +2927,7 @@ class SMAcceptorThread : public SimpleThread, ...@@ -2890,7 +2927,7 @@ class SMAcceptorThread : public SimpleThread,
break; break;
} }
VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Accepted connection"; VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Accepted connection";
HandleConnection(fd); HandleConnection(fd, (struct sockaddr_in *)&address);
} }
} }
} }
...@@ -2912,6 +2949,32 @@ class SMAcceptorThread : public SimpleThread, ...@@ -2912,6 +2949,32 @@ class SMAcceptorThread : public SimpleThread,
quitting_.Notify(); quitting_.Notify();
} }
// Iterates through a list of active connections expiring any that have been
// idle longer than the configured timeout.
void HandleConnectionIdleTimeout() {
int cur_time = time(NULL);
static time_t oldest_time = cur_time;
// Only iterate the list if we speculate that a connection is ready to be
// expired
if ((cur_time - oldest_time) < g_proxy_config.idle_timeout_s_)
return;
list<SMConnection*>::iterator iter = active_server_connections_.begin();
while (iter != active_server_connections_.end()) {
SMConnection *conn = *iter;
int elapsed_time = (cur_time - conn->last_read_time_);
if (elapsed_time > g_proxy_config.idle_timeout_s_) {
conn->Cleanup("Connection idle timeout reached.");
iter = active_server_connections_.erase(iter);
continue;
}
if (conn->last_read_time_ < oldest_time)
oldest_time = conn->last_read_time_;
iter++;
}
if ((cur_time - oldest_time) >= g_proxy_config.idle_timeout_s_)
oldest_time = cur_time;
}
void Run() { void Run() {
while (!quitting_.HasBeenNotified()) { while (!quitting_.HasBeenNotified()) {
epoll_server_.set_timeout_in_us(10 * 1000); // 10 ms epoll_server_.set_timeout_in_us(10 * 1000); // 10 ms
...@@ -2920,6 +2983,7 @@ class SMAcceptorThread : public SimpleThread, ...@@ -2920,6 +2983,7 @@ class SMAcceptorThread : public SimpleThread,
tmp_unused_server_connections_.begin(), tmp_unused_server_connections_.begin(),
tmp_unused_server_connections_.end()); tmp_unused_server_connections_.end());
tmp_unused_server_connections_.clear(); tmp_unused_server_connections_.clear();
HandleConnectionIdleTimeout();
} }
} }
...@@ -3000,12 +3064,76 @@ const char* BoolToStr(bool b) { ...@@ -3000,12 +3064,76 @@ const char* BoolToStr(bool b) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static bool wantExit = false;
static bool wantLogClose = false;
void SignalHandler(int signum)
{
switch(signum) {
case SIGTERM:
case SIGINT:
wantExit = true;
break;
case SIGHUP:
wantLogClose = true;
break;
}
}
static int OpenPidFile(const char *pidfile)
{
int fd;
struct stat pid_stat;
int ret;
fd = open(pidfile, O_RDWR | O_CREAT, 0600);
if (fd == -1) {
cerr << "Could not open pid file '" << pidfile << "' for reading.\n";
exit(1);
}
ret = flock(fd, LOCK_EX | LOCK_NB);
if (ret == -1) {
if (errno == EWOULDBLOCK) {
cerr << "Flip server is already running.\n";
} else {
cerr << "Error getting lock on pid file: " << strerror(errno) << "\n";
}
exit(1);
}
if (fstat(fd, &pid_stat) == -1) {
cerr << "Could not stat pid file '" << pidfile << "': " << strerror(errno)
<< "\n";
}
if (pid_stat.st_size != 0) {
if (ftruncate(fd, pid_stat.st_size) == -1) {
cerr << "Could not truncate pid file '" << pidfile << "': "
<< strerror(errno) << "\n";
}
}
char pid_str[8];
snprintf(pid_str, sizeof(pid_str), "%d", getpid());
int bytes = static_cast<int>(strlen(pid_str));
if (write(fd, pid_str, strlen(pid_str)) != bytes) {
cerr << "Could not write pid file: " << strerror(errno) << "\n";
close(fd);
exit(1);
}
return fd;
}
int main (int argc, char**argv) int main (int argc, char**argv)
{ {
unsigned int i = 0; unsigned int i = 0;
bool wait_for_iface = false; bool wait_for_iface = false;
int pidfile_fd;
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
signal(SIGTERM, SignalHandler);
signal(SIGINT, SignalHandler);
signal(SIGHUP, SignalHandler);
CommandLine::Init(argc, argv); CommandLine::Init(argc, argv);
CommandLine cl(argc, argv); CommandLine cl(argc, argv);
...@@ -3027,10 +3155,10 @@ int main (int argc, char**argv) ...@@ -3027,10 +3155,10 @@ int main (int argc, char**argv)
<< "\t through the proxy listen ip:port.\n"; << "\t through the proxy listen ip:port.\n";
cout << "\t--forward-ip-header=<header name>\n"; cout << "\t--forward-ip-header=<header name>\n";
cout << "\n Server options:\n"; cout << "\n Server options:\n";
cout << "\t--spdy-server=\"<listen ip>,<listen port>,[ssl cert filename],\n" cout << "\t--spdy-server=\"<listen ip>,<listen port>,[ssl cert filename],"
<< "\t [ssl key filename]\"\n"; << "\n\t [ssl key filename]\"\n";
cout << "\t--http-server=\"<listen ip>,<listen port>,[ssl cert filename],\n" cout << "\t--http-server=\"<listen ip>,<listen port>,[ssl cert filename],"
<< "\t [ssl key filename]\"\n"; << "\n\t [ssl key filename]\"\n";
cout << "\t * Leaving the ssl cert and key fields empty will disable ssl" cout << "\t * Leaving the ssl cert and key fields empty will disable ssl"
<< " for the\n" << " for the\n"
<< "\t http and spdy flip servers\n"; << "\t http and spdy flip servers\n";
...@@ -3042,10 +3170,18 @@ int main (int argc, char**argv) ...@@ -3042,10 +3170,18 @@ int main (int argc, char**argv)
<< " raised.\n"; << " raised.\n";
cout << "\t--ssl-session-expiry=<seconds> (default is 300)\n"; cout << "\t--ssl-session-expiry=<seconds> (default is 300)\n";
cout << "\t--ssl-disable-compression\n"; cout << "\t--ssl-disable-compression\n";
cout << "\t--idle-timeout=<seconds> (default is 300)\n";
cout << "\t--pidfile=<filepath> (default /var/run/flip-server.pid)\n";
cout << "\t--help\n"; cout << "\t--help\n";
exit(0); exit(0);
} }
if (cl.HasSwitch("pidfile")) {
pidfile_fd = OpenPidFile(cl.GetSwitchValueASCII("pidfile").c_str());
} else {
pidfile_fd = OpenPidFile(PIDFILE);
}
g_proxy_config.server_think_time_in_s_ = FLAGS_server_think_time_in_s; g_proxy_config.server_think_time_in_s_ = FLAGS_server_think_time_in_s;
if (cl.HasSwitch("forward-ip-header")) { if (cl.HasSwitch("forward-ip-header")) {
...@@ -3089,13 +3225,18 @@ int main (int argc, char**argv) ...@@ -3089,13 +3225,18 @@ int main (int argc, char**argv)
if (cl.HasSwitch("ssl-session-expiry")) { if (cl.HasSwitch("ssl-session-expiry")) {
string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry"); string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry");
g_proxy_config.ssl_session_expiry_ = atoi( session_expiry.c_str() ); g_proxy_config.ssl_session_expiry_ = atoi(session_expiry.c_str());
} }
if (cl.HasSwitch("ssl-disable-compression")) { if (cl.HasSwitch("ssl-disable-compression")) {
g_proxy_config.ssl_disable_compression_ = true; g_proxy_config.ssl_disable_compression_ = true;
} }
if (cl.HasSwitch("idle-timeout")) {
g_proxy_config.idle_timeout_s_ =
atoi(cl.GetSwitchValueASCII("idle-timeout").c_str());
}
if (cl.HasSwitch("force_spdy")) if (cl.HasSwitch("force_spdy"))
FLAGS_force_spdy = true; FLAGS_force_spdy = true;
...@@ -3116,13 +3257,15 @@ int main (int argc, char**argv) ...@@ -3116,13 +3257,15 @@ int main (int argc, char**argv)
LOG(INFO) << "Accepts per wake : " << FLAGS_accepts_per_wake; LOG(INFO) << "Accepts per wake : " << FLAGS_accepts_per_wake;
LOG(INFO) << "Disable nagle : " LOG(INFO) << "Disable nagle : "
<< (FLAGS_disable_nagle?"true":"false"); << (FLAGS_disable_nagle?"true":"false");
LOG(INFO) << "Reuseport : " << (FLAGS_reuseport?"true":"false"); LOG(INFO) << "Reuseport : "
<< (FLAGS_reuseport?"true":"false");
LOG(INFO) << "Force SPDY : " LOG(INFO) << "Force SPDY : "
<< (FLAGS_force_spdy?"true":"false"); << (FLAGS_force_spdy?"true":"false");
LOG(INFO) << "SSL session expiry : " LOG(INFO) << "SSL session expiry : "
<< g_proxy_config.ssl_session_expiry_; << g_proxy_config.ssl_session_expiry_;
LOG(INFO) << "SSL disable compression : " LOG(INFO) << "SSL disable compression : "
<< g_proxy_config.ssl_disable_compression_; << g_proxy_config.ssl_disable_compression_;
LOG(INFO) << "Connection idle timeout : " << g_proxy_config.idle_timeout_s_;
// Proxy Acceptors // Proxy Acceptors
while (true) { while (true) {
...@@ -3209,7 +3352,14 @@ int main (int argc, char**argv) ...@@ -3209,7 +3352,14 @@ int main (int argc, char**argv)
sm_worker_threads_.back()->Start(); sm_worker_threads_.back()->Start();
} }
while (true) { while (!wantExit) {
// Close logfile when HUP signal is received. Logging system will
// automatically reopen on next log message.
if ( wantLogClose ) {
wantLogClose = false;
VLOG(1) << "HUP received, reopening log file.";
logging::CloseLogFile();
}
if (GotQuitFromStdin()) { if (GotQuitFromStdin()) {
for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) { for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) {
sm_worker_threads_[i]->Quit(); sm_worker_threads_[i]->Quit();
...@@ -3217,10 +3367,12 @@ int main (int argc, char**argv) ...@@ -3217,10 +3367,12 @@ int main (int argc, char**argv)
for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) { for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) {
sm_worker_threads_[i]->Join(); sm_worker_threads_[i]->Join();
} }
return 0; break;
} }
usleep(1000*10); // 10 ms usleep(1000*10); // 10 ms
} }
unlink(PIDFILE);
close(pidfile_fd);
return 0; return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment