文章目录
-
- 全流程实现博客链接
- 前引
- (二十)---- C++ High-Performance WebServer源码实现(Util核心代码部分)
-
- 1、acceptor.h
- 1、acceptor.cc
- 2、buffer.h
- 2、buffer.cc
- 3、channel.h
- 3、channel.cc
- 4、epoller.h
- 4、epoller.cc
- 5、eventloop.h
- 5、eventloop.cc
- 6、eventloopthread.h
- 6、eventloopthread.cc
- 7、eventloopthreadpool.h
- 7、eventloopthreadpool.cc
- 8、tcpconnection.h
- 8、tcpconnection.cc
- 9、tcpserver.h
- 9、tcpserver.cc
- 10、thread.h
- 10、thread.cc
全流程实现博客链接
从零开始自制实现C++ High-Performance WebServer 全流程记录(基于muduo网络库)
前引
这部分还是挺单纯的
就是单纯把源码贴出来 没有多一点的意思
一是 为了方便各位查看代码 毕竟所有的代码都以网页的形式呈现出来 定位某个源文件的话 ctrl+f
一下子就能定位的到 所以我还是多做点功夫吧
那就开始了 去喝口水把代码贴出来了
(二十)---- C++ High-Performance WebServer源码实现(Util核心代码部分)
事件驱动最核心的代码块 类如Acceptor
Channel
TcpServer
Epoller
EventLoop
等等
1、acceptor.h
#ifndef TINY_MUDUO_ACCEPTOR_H_
#define TINY_MUDUO_ACCEPTOR_H_#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>#include <memory>
#include <functional>#include "noncopyable.h"
#include "address.h"namespace tiny_muduo {
class EventLoop;
class Address;
class Channel;class Acceptor : public NonCopyAble {
public:typedef std::function<void (int, const Address&)> NewConnectionCallback;Acceptor(EventLoop* loop, const Address& address);~Acceptor();void BindListenFd(const Address& address); void Listen();void NewConnection();int SetSockoptKeepAlive(int fd) {
int option_val = 1;return setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE,&option_val, static_cast<socklen_t>(sizeof(option_val)));}int SetSockoptReuseAddr(int fd) {
int option_val = 1;return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,&option_val, static_cast<socklen_t>(sizeof(option_val)));}int SetSockoptTcpNoDelay(int fd) {
int option_val = 1;return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,&option_val, static_cast<socklen_t>(sizeof(option_val)));}void SetNewConnectionCallback(const NewConnectionCallback& callback) {
new_connection_callback_ = callback;}void SetNewConnectionCallback(NewConnectionCallback&& callback) {
new_connection_callback_ = std::move(callback);}private:EventLoop* loop_;int listenfd_;int idlefd_;std::unique_ptr<Channel> channel_;NewConnectionCallback new_connection_callback_;
};}
#endif
1、acceptor.cc
#include "acceptor.h"#include <assert.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <bits/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <functional>#include <cstring>#include "address.h"
#include "channel.h"
#include "logging.h"using namespace tiny_muduo;Acceptor::Acceptor(EventLoop* loop, const Address& address): loop_(loop),listenfd_(::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP)),idlefd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)),channel_(new Channel(loop_, listenfd_)) {
SetSockoptReuseAddr(listenfd_);SetSockoptKeepAlive(listenfd_);BindListenFd(address);channel_->SetReadCallback(std::bind(&Acceptor::NewConnection, this));
}Acceptor::~Acceptor() {
channel_->DisableAll();loop_->Remove(channel_.get());::close(listenfd_);
}void Acceptor::BindListenFd(const Address& addr) {
struct sockaddr_in address;bzero((char*)&address, sizeof(address));address.sin_family = AF_INET;address.sin_addr.s_addr = htonl(INADDR_ANY); // FIXME : it should be addr.ip() and need some conversionaddress.sin_port = htons(static_cast<uint16_t>(addr.port()));int ret = bind(listenfd_, (struct sockaddr*)(&address), sizeof(address));assert(ret != -1); (void)ret;
}void Acceptor::Listen() {
int ret = listen(listenfd_, SOMAXCONN);assert(ret != -1);(void)ret;channel_->EnableReading();
}void Acceptor::NewConnection() {
struct sockaddr_in client, peeraddr;socklen_t client_addrlength = sizeof(client);int connfd = ::accept4(listenfd_, (struct sockaddr*)&client, &client_addrlength, SOCK_NONBLOCK | SOCK_CLOEXEC);if (connfd < 0) {
if (errno == EMFILE) {
::close(idlefd_);idlefd_ = ::accept(listenfd_, NULL, NULL);::close(idlefd_);idlefd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);}return;}assert(connfd > 0);if (SetSockoptKeepAlive(connfd) == -1) {
LOG_ERROR << "Acceptor::NewConnection SetSockoptKeepAlive failed";close(connfd);return;}if (SetSockoptTcpNoDelay(connfd) == -1) {
LOG_ERROR << "Acceptor::NewConnection SetSockoptTcpNoDelay failed";close(connfd);return;}socklen_t peer_addrlength = sizeof(peeraddr);getpeername(connfd, (struct sockaddr *)&peeraddr, &peer_addrlength);new_connection_callback_(connfd, Address(inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port)));
}
2、buffer.h
#ifndef TINY_MUDUO_BUFFER_H_
#define TINY_MUDUO_BUFFER_H_#include <vector>
#include <algorithm>
#include <string>#include <assert.h>
#include <cstring>#include "noncopyable.h"using std::string;namespace tiny_muduo {
static const int kPrePendIndex = 8;
static const int kInitialSize = 1024;
static const char* kCRLF = "\r\n";class Buffer : public NonCopyAble {
public:Buffer() : buffer_(kInitialSize), read_index_(kPrePendIndex), write_index_(kPrePendIndex) {
}~Buffer() {
}int ReadFd(int fd);char* begin() {
return &*buffer_.begin(); };const char* begin() const {
return &*buffer_.begin(); };char* beginread() {
return begin() + read_index_; } const char* beginread() const {
return begin() + read_index_; }char* beginwrite() {
return begin() + write_index_; }const char* beginwrite() const {
return begin() + write_index_; }const char* FindCRLF() const {
const char* find = std::search(Peek(), beginwrite(), kCRLF, kCRLF + 2); return find == beginwrite() ? nullptr : find;}void Append(const char* message) {
Append(message, static_cast<int>(strlen(message)));}void Append(const char* message, int len) {
MakeSureEnoughStorage(len);std::copy(message, message + len, beginwrite());write_index_ += len;}void Append(const string& message) {
Append(message.data(), static_cast<int>(message.size())); }void Retrieve(int len) {
assert(readablebytes() >= len);if (len + read_index_ < write_index_) {
read_index_ += len;} else {
RetrieveAll();}}void RetrieveUntilIndex(const char* index) {
assert(beginwrite() >= index);read_index_ += static_cast<int>(index - beginread());}void RetrieveAll() {
write_index_ = kPrePendIndex;read_index_ = write_index_;}string RetrieveAsString(int len) {
assert(read_index_ + len <= write_index_);string ret = std::move(PeekAsString(len));Retrieve(len); return ret;}string RetrieveAllAsString() {
string ret = std::move(PeekAllAsString());RetrieveAll();return ret;}const char* Peek() const {
return beginread();}char* Peek() {
return beginread();}string PeekAsString(int len) {
return string(beginread(), beginread() + len);}string PeekAllAsString() {
return string(beginread(), beginwrite()); }int readablebytes() const {
return write_index_ - read_index_; }int writablebytes() const {
return static_cast<int>(buffer_.size()) - write_index_; } int prependablebytes() const {
return read_index_; }void MakeSureEnoughStorage(int len) {
if (writablebytes() >= len) return;if (writablebytes() + prependablebytes() >= kPrePendIndex + len) {
std::copy(beginread(), beginwrite(), begin() + kPrePendIndex);write_index_ = kPrePendIndex + readablebytes();read_index_ = kPrePendIndex;} else {
buffer_.resize(write_index_ + len);}}private:std::vector<char> buffer_;int read_index_;int write_index_;
};}
#endif
2、buffer.cc
#include "buffer.h"#include <sys/uio.h>#include "logging.h"using namespace tiny_muduo;int Buffer::ReadFd(int fd) {
char extrabuf[65536] = {
0};struct iovec iv[2];const int writable = writablebytes();iv[0].iov_base = beginwrite();iv[0].iov_len = writable;iv[1].iov_base = extrabuf;iv[1].iov_len = sizeof(extrabuf);const int iovcnt = (writable < static_cast<int>(sizeof(extrabuf)) ? 2 : 1);int readn = static_cast<int>(::readv(fd, iv, iovcnt));if (readn < 0) {
// LOG_ERRNO << "Buffer::ReadFd readv failed";} else if (readn <= writable) {
write_index_ += readn;} else {
write_index_ = static_cast<int>(buffer_.size());Append(extrabuf, readn - writable);}return readn;
}
3、channel.h
#ifndef TINY_MUDUO_CHANNEL_H_
#define TINY_MUDUO_CHANNEL_H_#include <sys/epoll.h>#include <utility>
#include <memory>#include "eventloop.h"
#include "callback.h"
#include "noncopyable.h"namespace tiny_muduo {
enum ChannelState {
kNew,kAdded,kDeleted
};class Channel : public NonCopyAble {
public:typedef std::function<void()> ErrorCallback;Channel(EventLoop* loop, const int& fd);~Channel();void HandleEvent();void HandleEventWithGuard();void SetReadCallback(ReadCallback&& callback) {
read_callback_ = std::move(callback); }void SetReadCallback(const ReadCallback& callback) {
read_callback_ = callback; }void SetWriteCallback(WriteCallback&& callback) {
write_callback_ = std::move(callback);}void SetWriteCallback(const WriteCallback& callback) {
write_callback_ = callback;}void SetErrorCallback(ErrorCallback&& callback) {
error_callback_ = std::move(callback);}void SetErrorCallback(const ErrorCallback& callback) {
error_callback_ = callback;} void Tie(const std::shared_ptr<void>& ptr) {
tied_ = true;tie_ = ptr;} void EnableReading() {
events_ |= (EPOLLIN | EPOLLPRI); Update(); }void EnableWriting() {
events_ |= EPOLLOUT;Update();}void DisableAll() {
events_ = 0;Update(); }void DisableWriting() {
events_ &= ~EPOLLOUT;Update(); }void Update() {
loop_->Update(this);}void SetReceivedEvents(int events__) {
recv_events_ = events__;}void SetChannelState(ChannelState state__) {
state_ = state__;}int fd() const {
return fd_; } int events() const {
return events_; }int recv_events() const {
return recv_events_; }ChannelState state() const {
return state_; }bool IsWriting() const {
return events_ & EPOLLOUT; }bool IsReading() const {
return events_ & (EPOLLIN | EPOLLPRI); }private:EventLoop* loop_;int fd_;int events_; // update eventsint recv_events_; // epoll received eventsstd::weak_ptr<void> tie_;bool tied_;int errno_;ChannelState state_;ReadCallback read_callback_;WriteCallback write_callback_; ErrorCallback error_callback_;
};}
#endif
3、channel.cc
#include "channel.h"#include <sys/poll.h>
#include <sys/epoll.h>using namespace tiny_muduo;Channel::Channel(EventLoop* loop,const int& fd__): loop_(loop),fd_(fd__),events_(0),recv_events_(0),tied_(false),state_(kNew) {
}Channel::~Channel() {
}void Channel::HandleEvent() {
if (tied_) {
std::shared_ptr<void> guard = tie_.lock();HandleEventWithGuard();} else {
HandleEventWithGuard();}
}void Channel::HandleEventWithGuard() {
if (recv_events_ & POLLNVAL) {
LOG_ERROR << "Channel::HandleEventWithGuard POLLNVAL";}if (recv_events_ & (EPOLLERR | POLLNVAL)) {
if (error_callback_) {
error_callback_();}}if (recv_events_ & (EPOLLIN | EPOLLPRI | EPOLLRDHUP)) {
if (read_callback_) {
read_callback_();}} if (recv_events_ & EPOLLOUT) {
if (write_callback_) {
write_callback_();}}
}
4、epoller.h
#ifndef TINY_MUDUO_EPOLLER_H_
#define TINY_MUDUO_EPOLLER_H_#include <sys/epoll.h>
#include <vector>
#include <map>#include "noncopyable.h"static const int kDefaultEvents = 16;namespace tiny_muduo {
class Channel;class Epoller : public NonCopyAble {
public: typedef std::vector<epoll_event> Events;typedef std::vector<Channel*> Channels;Epoller();~Epoller();void Remove(Channel* channel_);void Poll(Channels& channels);int EpollWait() {
return epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), -1); }void FillActiveChannels(int eventnums, Channels& channels); void Update(Channel* channel);void UpdateChannel(int operation, Channel* channel);private: typedef std::map<int, Channel*> ChannelMap;int epollfd_;Events events_;ChannelMap channels_;
};}
#endif
4、epoller.cc
#include "epoller.h"#include <assert.h>
#include <string.h>
#include <sys/epoll.h>#include <vector>#include "channel.h"
#include "logging.h"using namespace tiny_muduo;Epoller::Epoller(): epollfd_(::epoll_create1(EPOLL_CLOEXEC)),events_(kDefaultEvents), channels_() {
}Epoller::~Epoller() {
::close(epollfd_);
}void Epoller::Poll(Channels& channels) {
int eventnums = EpollWait();FillActiveChannels(eventnums, channels);
}void Epoller::FillActiveChannels(int eventnums, Channels& channels) {
for (int i = 0; i < eventnums; ++i) {
Channel* ptr = static_cast<Channel*> (events_[i].data.ptr);ptr->SetReceivedEvents(events_[i].events);channels.emplace_back(ptr);}if (eventnums == static_cast<int>(events_.size())) {
events_.resize(eventnums * 2);}
}void Epoller::Remove(Channel* channel) {
int fd = channel->fd();ChannelState state = channel->state();assert(state == kDeleted || state == kAdded);if (state == kAdded) {
UpdateChannel(EPOLL_CTL_DEL, channel);}channel->SetChannelState(kNew);channels_.erase(fd);return;
}void Epoller::Update(Channel* channel) {
int op = 0, events = channel->events();ChannelState state = channel->state(); int fd = channel->fd();if (state == kNew || state == kDeleted) {
if (state == kNew) {
assert(channels_.find(fd) == channels_.end());channels_[fd] = channel;} else {
assert(channels_.find(fd) != channels_.end());assert(channels_[fd] == channel);}op = EPOLL_CTL_ADD;channel->SetChannelState(kAdded);} else {
assert(channels_.find(fd) != channels_.end());assert(channels_[fd] == channel);if (events == 0) {
op = EPOLL_CTL_DEL;channel->SetChannelState(kDeleted); } else {
op = EPOLL_CTL_MOD;}}UpdateChannel(op, channel);
}void Epoller::UpdateChannel(int operation, Channel* channel) {
struct epoll_event event;event.events = channel->events();event.data.ptr = static_cast<void*>(channel);if (epoll_ctl(epollfd_, operation, channel->fd(), &event) < 0) {
LOG_ERROR << "Epoller::UpdataChannel epoll_ctl failed"; }return;
}
5、eventloop.h
#ifndef TINY_MUDUO_EVENTLOOP_H_
#define TINY_MUDUO_EVENTLOOP_H_#include <stdint.h>
#include <unistd.h>
#include <assert.h>
#include <sys/eventfd.h>
#include <pthread.h>#include <vector>
#include <functional>
#include <memory>
#include <utility>#include "mutex.h"
#include "epoller.h"
#include "currentthread.h"
#include "timestamp.h"
#include "timerqueue.h"
#include "noncopyable.h"namespace tiny_muduo {
class Epoller;
class Channel;class EventLoop : public NonCopyAble {
public:typedef std::vector<Channel*> Channels;typedef std::function<void()> BasicFunc; typedef std::vector<BasicFunc> ToDoList; EventLoop();~EventLoop();void RunAt(Timestamp timestamp, BasicFunc&& cb) {
timer_queue_->AddTimer(timestamp, std::move(cb), 0.0);}void RunAfter(double wait_time, BasicFunc&& cb) {
Timestamp timestamp(Timestamp::AddTime(Timestamp::Now(), wait_time)); timer_queue_->AddTimer(timestamp, std::move(cb), 0.0);}void RunEvery(double interval, BasicFunc&& cb) {
Timestamp timestamp(Timestamp::AddTime(Timestamp::Now(), interval)); timer_queue_->AddTimer(timestamp, std::move(cb), interval);}bool IsInThreadLoop() {
return CurrentThread::tid() == tid_; }void Update(Channel* channel) {
epoller_->Update(channel); }void Remove(Channel* channel) {
epoller_->Remove(channel); }void Loop();void HandleRead();void QueueOneFunc(BasicFunc func);void RunOneFunc(BasicFunc func);void DoToDoList();private:bool running_;pid_t tid_; std::unique_ptr<Epoller> epoller_;int wakeup_fd_;std::unique_ptr<Channel> wakeup_channel_;std::unique_ptr<TimerQueue> timer_queue_;bool calling_functors_;Channels active_channels_;ToDoList to_do_list_;MutexLock mutex_;
};} // namespace tiny_muduo
#endif
5、eventloop.cc
#include "eventloop.h"#include <unistd.h>
#include <sys/eventfd.h>
#include <pthread.h>
#include <signal.h>#include <utility>#include "mutex.h"
#include "channel.h"using namespace tiny_muduo;namespace {
class IgnoreSigPipe {
public:IgnoreSigPipe() {
::signal(SIGPIPE, SIG_IGN);}
};IgnoreSigPipe initObj;} // namespaceEventLoop::EventLoop(): running_(false),tid_(CurrentThread::tid()),epoller_(new Epoller()),wakeup_fd_(::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)),wakeup_channel_(new Channel(this, wakeup_fd_)),timer_queue_(new TimerQueue(this)),calling_functors_(false) {
wakeup_channel_->SetReadCallback(std::bind(&EventLoop::HandleRead, this));wakeup_channel_->EnableReading();
}EventLoop::~EventLoop() {
if (running_) running_ = false;wakeup_channel_->DisableAll();Remove(wakeup_channel_.get());close(wakeup_fd_);
}void EventLoop::Loop() {
assert(IsInThreadLoop());running_ = true;while (running_) {
active_channels_.clear();epoller_->Poll(active_channels_);for (const auto& channel : active_channels_) {
channel->HandleEvent();}DoToDoList();}running_ = false;
}void EventLoop::HandleRead() {
uint64_t read_one_byte = 1;ssize_t read_size = ::read(wakeup_fd_, &read_one_byte, sizeof(read_one_byte));(void) read_size;assert(read_size == sizeof(read_one_byte));return;
}void EventLoop::QueueOneFunc(BasicFunc func) {
{
MutexLockGuard lock(mutex_);to_do_list_.emplace_back(std::move(func));} if (!IsInThreadLoop() || calling_functors_) {
uint64_t write_one_byte = 1; ssize_t write_size = ::write(wakeup_fd_, &write_one_byte, sizeof(write_one_byte));(void) write_size;assert(write_size == sizeof(write_one_byte));}
}void EventLoop::RunOneFunc(BasicFunc func) {
if (IsInThreadLoop()) {
func(); } else {
QueueOneFunc(std::move(func)); }
}void EventLoop::DoToDoList() {
ToDoList functors;calling_functors_ = true;{
MutexLockGuard lock(mutex_);functors.swap(to_do_list_);}for (const auto& func : functors) {
func();}calling_functors_ = false;
}
6、eventloopthread.h
#ifndef TINY_MUDUO_EVENTLOOPTHREAD_H_
#define TINY_MUDUO_EVENTLOOPTHREAD_H_#include "thread.h"
#include "mutex.h"
#include "condition.h"
#include "noncopyable.h"namespace tiny_muduo {
class EventLoop;class EventLoopThread : public NonCopyAble {
public:EventLoopThread();~EventLoopThread();void StartFunc();EventLoop* StartLoop();private:EventLoop* loop_;Thread thread_;MutexLock mutex_;Condition cond_;
};}
#endif
6、eventloopthread.cc
#include "eventloopthread.h"#include <pthread.h>
#include <functional>#include "mutex.h"
#include "condition.h"
#include "eventloop.h"using namespace tiny_muduo;EventLoopThread::EventLoopThread(): loop_(nullptr),thread_(std::bind(&EventLoopThread::StartFunc, this)),mutex_(),cond_(mutex_) {
}EventLoopThread::~EventLoopThread() {
} EventLoop* EventLoopThread::StartLoop() {
thread_.StartThread();EventLoop* loop = nullptr;{
MutexLockGuard lock(mutex_);while (loop_ == nullptr) {
cond_.Wait();}loop = loop_;}return loop;
}void EventLoopThread::StartFunc() {
EventLoop loop;{
MutexLockGuard lock(mutex_);loop_ = &loop;cond_.Notify();}loop_->Loop();{
MutexLockGuard lock(mutex_);loop_ = nullptr;}
}
7、eventloopthreadpool.h
#ifndef TINY_MUDUO_EVENTLOOPTHREADPOOL_H_
#define TINY_MUDUO_EVENTLOOPTHREADPOOL_H_#include <vector>
#include <memory>#include "noncopyable.h"namespace tiny_muduo {
class EventLoopThread;
class EventLoop;class EventLoopThreadPool : public NonCopyAble {
public:typedef std::vector<std::unique_ptr<EventLoopThread>> Thread;typedef std::vector<EventLoop*> Loop;EventLoopThreadPool(EventLoop* loop);~EventLoopThreadPool();void SetThreadNums(int thread_nums) {
thread_nums_ = thread_nums;}void StartLoop();EventLoop* NextLoop();private:EventLoop* base_loop_; Thread threads_;Loop loops_;int thread_nums_;int next_;
};}
#endif
7、eventloopthreadpool.cc
#include "eventloopthreadpool.h"#include <memory>#include "eventloopthread.h"using namespace tiny_muduo;EventLoopThreadPool::EventLoopThreadPool(EventLoop* loop): base_loop_(loop),thread_nums_(0),next_(0) {
}EventLoopThreadPool::~EventLoopThreadPool() {
}void EventLoopThreadPool::StartLoop() {
for (int i = 0; i < thread_nums_; ++i) {
EventLoopThread* ptr = new EventLoopThread();threads_.emplace_back(std::unique_ptr<EventLoopThread>(ptr));loops_.emplace_back(ptr->StartLoop());}
}EventLoop* EventLoopThreadPool::NextLoop() {
EventLoop* ret = base_loop_;if (!loops_.empty()) {
ret = loops_[next_++];if (next_ == static_cast<int>(loops_.size())) next_ = 0;}return ret;
}
8、tcpconnection.h
#ifndef TINY_MUDUO_TCPCONNECTION_H_
#define TINY_MUDUO_TCPCONNECTION_H_#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>#include <string>
#include <memory>
#include <utility>#include "callback.h"
#include "channel.h"
#include "buffer.h"
#include "httpcontent.h"
#include "noncopyable.h"
#include "timestamp.h"using std::string;namespace tiny_muduo {
class EventLoop;class TcpConnection : public std::enable_shared_from_this<TcpConnection>, NonCopyAble {
public:enum ConnectionState {
kConnecting,kConnected,kDisconnecting,kDisconnected};TcpConnection(EventLoop* loop, int connfd, int id);~TcpConnection();void SetConnectionCallback(const ConnectionCallback& callback) {
connection_callback_ = callback;}void SetConnectionCallback(ConnectionCallback&& callback) {
connection_callback_ = std::move(callback);}void SetMessageCallback(const MessageCallback& callback) {
message_callback_ = callback;}void SetMessageCallback(MessageCallback&& callback) {
message_callback_ = std::move(callback);}void SetCloseCallback(const CloseCallback& callback) {
close_callback_ = callback;}void SetCloseCallback(CloseCallback&& callback) {
close_callback_ = std::move(callback);}void ConnectionEstablished() {
state_ = kConnected;channel_->Tie(shared_from_this());channel_->EnableReading();connection_callback_(shared_from_this(), &input_buffer_);}void Shutdown(); bool IsShutdown() const {
return state_ == kDisconnecting; }int GetErrno() const;void ConnectionDestructor();void HandleClose();void HandleError();void HandleMessage();void HandleWrite();void Send(Buffer* buffer);void Send(const string& str);void Send(const char* message, int len);void Send(const char* message) {
Send(message, static_cast<int>(strlen(message))); }void UpdateTimestamp(Timestamp now) {
timestamp_ = now; }Timestamp timestamp() const {
return timestamp_; }int fd() const {
return fd_; }int id() const {
return connection_id_; }EventLoop* loop() const {
return loop_; }HttpContent* GetHttpContent() {
return &content_; }private:EventLoop* loop_;int fd_;int connection_id_;ConnectionState state_;std::unique_ptr<Channel> channel_;Buffer input_buffer_;Buffer output_buffer_;HttpContent content_;bool shutdown_state_;Timestamp timestamp_;ConnectionCallback connection_callback_;MessageCallback message_callback_;CloseCallback close_callback_;
};typedef std::shared_ptr<TcpConnection> TcpconnectionPtr;} // namespace tiny_muduo
#endif
8、tcpconnection.cc
#include "tcpconnection.h"#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <string.h>#include "channel.h"
#include "buffer.h"
#include "logging.h"using namespace tiny_muduo;TcpConnection::TcpConnection(EventLoop* loop__, int connfd, int id__) : loop_(loop__),fd_(connfd),connection_id_(id__),state_(kConnecting),channel_(new Channel(loop_, fd_)),shutdown_state_(false) {
channel_->SetReadCallback(std::bind(&TcpConnection::HandleMessage, this));channel_->SetWriteCallback(std::bind(&TcpConnection::HandleWrite, this));channel_->SetErrorCallback(std::bind(&TcpConnection::HandleError, this));
}TcpConnection::~TcpConnection() {
::close(fd_);
}void TcpConnection::ConnectionDestructor() {
if (state_ == kDisconnecting || state_ == kConnected) {
state_ = kDisconnected;channel_->DisableAll();}loop_->Remove(channel_.get());
}int TcpConnection::GetErrno() const {
int optval;socklen_t optlen = static_cast<socklen_t>(sizeof optval);if (::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0) {
return errno;} else {
return optval;}
}void TcpConnection::HandleClose() {
state_ = kDisconnected;channel_->DisableAll();TcpConnectionPtr guard(shared_from_this());close_callback_(guard);
}void TcpConnection::HandleError() {
LOG_ERROR << "TcpConnection::HandleError" << " : " << ErrorToString(GetErrno());
}void TcpConnection::HandleMessage() {
int read_size = input_buffer_.ReadFd(fd_);if (read_size > 0) {
message_callback_(shared_from_this(), &input_buffer_);} else if (read_size == 0) {
HandleClose();} else {
LOG_ERROR << "TcpConnection::HandleMessage read failed";}
}void TcpConnection::HandleWrite() {
if (channel_->IsWriting()) {
int len = output_buffer_.readablebytes();int remaining = len;int send_size = static_cast<int>(::write(fd_, output_buffer_.Peek(), remaining));if (send_size < 0) {
assert(send_size >= 0);if (errno != EWOULDBLOCK) {
LOG_ERROR << "TcpConnection::HandleWrite write failed";}return;}remaining -= send_size;output_buffer_.Retrieve(send_size);assert(remaining <= len);if (!output_buffer_.readablebytes()) {
channel_->DisableWriting();if (state_ == kDisconnecting) {
Shutdown();}}}
}// Only Can Send In Same Thread, Can't Cross-Thread
void TcpConnection::Send(const char* message, int len) {
int remaining = len;int send_size = 0;if (!channel_->IsWriting() && output_buffer_.readablebytes() == 0) {
send_size = static_cast<int>(::write(fd_, message, len));if (send_size >= 0) {
remaining -= send_size; } else {
if (errno != EWOULDBLOCK) {
LOG_ERROR << "TcpConnection::Send write failed";}return;}}assert(remaining <= len);if (remaining > 0) {
output_buffer_.Append(message + send_size, remaining);if (!channel_->IsWriting()) {
channel_->EnableWriting(); }}
}void TcpConnection::Shutdown() {
state_ = kDisconnecting;if (!channel_->IsWriting()) {
int ret = ::shutdown(fd_, SHUT_WR);if (ret < 0) {
LOG_ERROR << "TcpConnection::Shutdown shutdown failed";}}
}// Only Can Send In Same Thread, Can't Cross-Thread
void TcpConnection::Send(Buffer* buffer) {
if (state_ == kDisconnected) return; Send(buffer->Peek(), buffer->readablebytes()); buffer->RetrieveAll();
}// Only Can Send In Same Thread, Can't Cross-Thread
void TcpConnection::Send(const string& message) {
if (state_ == kDisconnected) return; Send(message.data(), static_cast<int>(message.size()));
}
9、tcpserver.h
#ifndef TINY_MUDUO_TCPSERVER_H_
#define TINY_MUDUO_TCPSERVER_H_#include <memory>
#include <map>
#include <utility>
#include <memory>
#include <string>#include "callback.h"
#include "eventloop.h"
#include "acceptor.h"
#include "eventloopthreadpool.h"
#include "tcpconnection.h"
#include "logging.h"
#include "noncopyable.h"
#include "address.h"namespace tiny_muduo {
class Address;class TcpServer : public NonCopyAble {
public:TcpServer(EventLoop* loop, const Address& address);~TcpServer();void Start() {
threads_->StartLoop();loop_->RunOneFunc(std::bind(&Acceptor::Listen, acceptor_.get()));}void SetConnectionCallback(ConnectionCallback&& callback) {
connection_callback_ = std::move(callback);}void SetConnectionCallback(const ConnectionCallback& callback) {
connection_callback_ = callback;}void SetMessageCallback(MessageCallback&& callback) {
message_callback_ = std::move(callback);}void SetMessageCallback(const MessageCallback& callback) {
message_callback_ = callback;}void SetThreadNums(int thread_nums) {
threads_->SetThreadNums(thread_nums); }inline void HandleClose(const TcpConnectionPtr& conn);inline void HandleCloseInLoop(const TcpConnectionPtr& ptr);inline void HandleNewConnection(int connfd, const Address& address);private:typedef std::map<int, TcpconnectionPtr> ConnectionMap;EventLoop* loop_;int next_connection_id_;std::unique_ptr<EventLoopThreadPool> threads_;std::unique_ptr<Acceptor> acceptor_;const std::string ipport_;ConnectionCallback connection_callback_;MessageCallback message_callback_;ConnectionMap connections_;
};}// namespace tiny_muduo
#endif
9、tcpserver.cc
#include "tcpserver.h"#include <assert.h>#include <climits>
#include <utility>#include "eventloopthreadpool.h"
#include "acceptor.h"
#include "tcpconnection.h"using namespace tiny_muduo;TcpServer::TcpServer(EventLoop* loop, const Address& address): loop_(loop),next_connection_id_(1),threads_(new EventLoopThreadPool(loop_)),acceptor_(new Acceptor(loop_, address)),ipport_(std::move(address.IpPortToString())) {
acceptor_->SetNewConnectionCallback(std::bind(&TcpServer::HandleNewConnection, this, _1, _2));
}TcpServer::~TcpServer() {
for (auto& pair : connections_) {
TcpConnectionPtr ptr(pair.second);pair.second.reset();ptr->loop()->RunOneFunc(std::bind(&TcpConnection::ConnectionDestructor, ptr));}
}inline void TcpServer::HandleClose(const TcpConnectionPtr& ptr) {
loop_->RunOneFunc(std::bind(&TcpServer::HandleCloseInLoop, this, ptr));
}inline void TcpServer::HandleCloseInLoop(const TcpConnectionPtr& ptr) {
assert(connections_.find(ptr->fd()) != connections_.end());connections_.erase(connections_.find(ptr->fd()));LOG_INFO << "TcpServer::HandleCloseInLoop - remove connection " << "[" << ipport_ << '#' << ptr->id() << ']';EventLoop* loop = ptr->loop(); loop->QueueOneFunc(std::bind(&TcpConnection::ConnectionDestructor, ptr));
}inline void TcpServer::HandleNewConnection(int connfd, const Address& address) {
EventLoop* loop = threads_->NextLoop(); TcpConnectionPtr ptr(new TcpConnection(loop, connfd, next_connection_id_));connections_[connfd] = ptr;ptr->SetConnectionCallback(connection_callback_);ptr->SetMessageCallback(message_callback_);ptr->SetCloseCallback(std::bind(&TcpServer::HandleClose, this, _1));LOG_INFO << "TcpServer::HandleNewConnection - new connection " << "[" << ipport_ << '#' << next_connection_id_ << ']' << " from " << address.IpPortToString();++next_connection_id_;if (next_connection_id_ == INT_MAX) next_connection_id_ = 1;loop->RunOneFunc(std::bind(&TcpConnection::ConnectionEstablished, ptr));
}
10、thread.h
#ifndef TINY_MUDUO_THREAD_H_
#define TINY_MUDUO_THREAD_H_#include <stdio.h>
#include <sys/prctl.h>
#include <pthread.h>#include <functional>
#include <string>#include "latch.h"
#include "noncopyable.h"using std::string;namespace tiny_muduo {
static int thread_id_ = 1;class Thread : public NonCopyAble {
public:typedef std::function<void ()> ThreadFunc;Thread(const ThreadFunc& func, const string& name = string());~Thread(); void StartThread();void Join() {
::pthread_join(pthread_id_, nullptr); }private:pthread_t pthread_id_;ThreadFunc func_;Latch latch_;string name_;
}; struct ThreadData {
typedef tiny_muduo::Thread::ThreadFunc ThreadFunc;ThreadFunc func_;
Latch* latch_;
string name_;ThreadData(ThreadFunc& func, Latch* latch, const string& name): func_(func),latch_(latch),name_(name) {
}void RunOneFunc() {
latch_->CountDown();latch_ = nullptr; char buf[32] = {
0};snprintf(buf, sizeof(buf), "%d", (thread_id_++));::prctl(PR_SET_NAME, name_.size() == 0 ? ("WorkThread " + string(buf)).data() : name_.data());func_();
}};}
#endif
10、thread.cc
#include "thread.h"#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>#include "latch.h"using namespace tiny_muduo;namespace CurrentThread {
__thread int t_cachedTid = 0;
__thread char t_formattedTid[32];
__thread int t_formattedTidLength;pid_t gettid() {
return static_cast<int>(syscall(SYS_gettid));
}void CacheTid() {
if (t_cachedTid == 0) {
t_cachedTid = gettid();t_formattedTidLength = snprintf(t_formattedTid, sizeof(t_formattedTid), "%5d ", t_cachedTid);}
}} // namespace CurrentThread;static void* ThreadRun(void* arg) {
ThreadData* ptr = static_cast<ThreadData*>(arg);ptr->RunOneFunc();delete ptr;return nullptr;
}Thread::Thread(const ThreadFunc& func, const string& name): pthread_id_(-1),func_(func),latch_(1),name_(std::move(name)) {
}Thread::~Thread() {
::pthread_detach(pthread_id_);
}void Thread::StartThread() {
ThreadData* ptr = new ThreadData(func_, &latch_, name_);::pthread_create(&pthread_id_, nullptr, ThreadRun, ptr);latch_.Wait();
}