当前位置: 代码迷 >> 综合 >> 从零开始自制实现WebServer(二十)---- C++ High-Performance WebServer源码实现(Util核心代码部分)
  详细解决方案

从零开始自制实现WebServer(二十)---- C++ High-Performance WebServer源码实现(Util核心代码部分)

热度:5   发布时间:2023-11-17 16:14:37.0

文章目录

    • 全流程实现博客链接
    • 前引
    • (二十)---- C++ High-Performance WebServer源码实现(Util核心代码部分)
      • 1、acceptor.h
      • 1、acceptor.cc
      • 2、buffer.h
      • 2、buffer.cc
      • 3、channel.h
      • 3、channel.cc
      • 4、epoller.h
      • 4、epoller.cc
      • 5、eventloop.h
      • 5、eventloop.cc
      • 6、eventloopthread.h
      • 6、eventloopthread.cc
      • 7、eventloopthreadpool.h
      • 7、eventloopthreadpool.cc
      • 8、tcpconnection.h
      • 8、tcpconnection.cc
      • 9、tcpserver.h
      • 9、tcpserver.cc
      • 10、thread.h
      • 10、thread.cc


全流程实现博客链接


从零开始自制实现C++ High-Performance WebServer 全流程记录(基于muduo网络库)


前引


这部分还是挺单纯的
就是单纯把源码贴出来 没有多一点的意思

一是 为了方便各位查看代码 毕竟所有的代码都以网页的形式呈现出来 定位某个源文件的话 ctrl+f一下子就能定位的到 所以我还是多做点功夫吧

那就开始了 去喝口水把代码贴出来了


(二十)---- C++ High-Performance WebServer源码实现(Util核心代码部分)


事件驱动最核心的代码块 类如Acceptor Channel TcpServer Epoller EventLoop等等


1、acceptor.h


#ifndef TINY_MUDUO_ACCEPTOR_H_
#define TINY_MUDUO_ACCEPTOR_H_#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>#include <memory>
#include <functional>#include "noncopyable.h"
#include "address.h"namespace tiny_muduo {
    class EventLoop;
class Address;
class Channel;class Acceptor : public NonCopyAble {
    public:typedef std::function<void (int, const Address&)> NewConnectionCallback;Acceptor(EventLoop* loop, const Address& address);~Acceptor();void BindListenFd(const Address& address); void Listen();void NewConnection();int SetSockoptKeepAlive(int fd) {
    int option_val = 1;return setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE,&option_val, static_cast<socklen_t>(sizeof(option_val)));}int SetSockoptReuseAddr(int fd) {
    int option_val = 1;return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,&option_val, static_cast<socklen_t>(sizeof(option_val)));}int SetSockoptTcpNoDelay(int fd) {
    int option_val = 1;return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,&option_val, static_cast<socklen_t>(sizeof(option_val)));}void SetNewConnectionCallback(const NewConnectionCallback& callback) {
    new_connection_callback_ = callback;}void SetNewConnectionCallback(NewConnectionCallback&& callback) {
    new_connection_callback_ = std::move(callback);}private:EventLoop* loop_;int listenfd_;int idlefd_;std::unique_ptr<Channel> channel_;NewConnectionCallback new_connection_callback_; 
};}
#endif

1、acceptor.cc


#include "acceptor.h"#include <assert.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <bits/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <functional>#include <cstring>#include "address.h"
#include "channel.h"
#include "logging.h"using namespace tiny_muduo;Acceptor::Acceptor(EventLoop* loop, const Address& address): loop_(loop),listenfd_(::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP)),idlefd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)),channel_(new Channel(loop_, listenfd_)) {
    SetSockoptReuseAddr(listenfd_);SetSockoptKeepAlive(listenfd_);BindListenFd(address);channel_->SetReadCallback(std::bind(&Acceptor::NewConnection, this));
}Acceptor::~Acceptor() {
    channel_->DisableAll();loop_->Remove(channel_.get());::close(listenfd_);
}void Acceptor::BindListenFd(const Address& addr) {
    struct sockaddr_in address;bzero((char*)&address, sizeof(address));address.sin_family = AF_INET;address.sin_addr.s_addr = htonl(INADDR_ANY); // FIXME : it should be addr.ip() and need some conversionaddress.sin_port = htons(static_cast<uint16_t>(addr.port()));int ret = bind(listenfd_, (struct sockaddr*)(&address), sizeof(address));assert(ret != -1); (void)ret;
}void Acceptor::Listen() {
    int ret = listen(listenfd_, SOMAXCONN);assert(ret != -1);(void)ret;channel_->EnableReading(); 
}void Acceptor::NewConnection() {
    struct sockaddr_in client, peeraddr;socklen_t client_addrlength = sizeof(client);int connfd = ::accept4(listenfd_, (struct sockaddr*)&client, &client_addrlength, SOCK_NONBLOCK | SOCK_CLOEXEC);if (connfd < 0) {
    if (errno == EMFILE) {
    ::close(idlefd_);idlefd_ = ::accept(listenfd_, NULL, NULL);::close(idlefd_);idlefd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);}return;}assert(connfd > 0);if (SetSockoptKeepAlive(connfd) == -1) {
    LOG_ERROR << "Acceptor::NewConnection SetSockoptKeepAlive failed";close(connfd);return;}if (SetSockoptTcpNoDelay(connfd) == -1) {
    LOG_ERROR << "Acceptor::NewConnection SetSockoptTcpNoDelay failed";close(connfd);return;}socklen_t peer_addrlength = sizeof(peeraddr);getpeername(connfd, (struct sockaddr *)&peeraddr, &peer_addrlength);new_connection_callback_(connfd, Address(inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port)));
}

2、buffer.h


#ifndef TINY_MUDUO_BUFFER_H_
#define TINY_MUDUO_BUFFER_H_#include <vector>
#include <algorithm>
#include <string>#include <assert.h>
#include <cstring>#include "noncopyable.h"using std::string;namespace tiny_muduo {
    static const int kPrePendIndex = 8;
static const int kInitialSize = 1024;
static const char* kCRLF = "\r\n";class Buffer : public NonCopyAble {
    public:Buffer() : buffer_(kInitialSize), read_index_(kPrePendIndex), write_index_(kPrePendIndex) {
    }~Buffer() {
    }int ReadFd(int fd);char* begin() {
     return &*buffer_.begin(); };const char* begin() const {
     return &*buffer_.begin(); };char* beginread() {
     return begin() + read_index_; } const char* beginread() const {
     return begin() + read_index_; }char* beginwrite() {
     return begin() + write_index_; }const char* beginwrite() const {
     return begin() + write_index_; }const char* FindCRLF() const {
     const char* find = std::search(Peek(), beginwrite(), kCRLF, kCRLF + 2); return find == beginwrite() ? nullptr : find;}void Append(const char* message) {
    Append(message, static_cast<int>(strlen(message)));}void Append(const char* message, int len) {
    MakeSureEnoughStorage(len);std::copy(message, message + len, beginwrite());write_index_ += len;}void Append(const string& message) {
    Append(message.data(), static_cast<int>(message.size())); }void Retrieve(int len) {
    assert(readablebytes() >= len);if (len + read_index_ < write_index_) {
    read_index_ += len;} else {
    RetrieveAll();}}void RetrieveUntilIndex(const char* index) {
    assert(beginwrite() >= index);read_index_ += static_cast<int>(index - beginread());}void RetrieveAll() {
    write_index_ = kPrePendIndex;read_index_ = write_index_;}string RetrieveAsString(int len) {
    assert(read_index_ + len <= write_index_);string ret = std::move(PeekAsString(len));Retrieve(len); return ret;}string RetrieveAllAsString() {
    string ret = std::move(PeekAllAsString());RetrieveAll();return ret;}const char* Peek() const {
    return beginread();}char* Peek() {
    return beginread();}string PeekAsString(int len) {
    return string(beginread(), beginread() + len);}string PeekAllAsString() {
    return string(beginread(), beginwrite()); }int readablebytes() const {
     return write_index_ - read_index_; }int writablebytes() const {
     return static_cast<int>(buffer_.size()) - write_index_; } int prependablebytes() const {
     return read_index_; }void MakeSureEnoughStorage(int len) {
    if (writablebytes() >= len) return;if (writablebytes() + prependablebytes() >= kPrePendIndex + len) {
    std::copy(beginread(), beginwrite(), begin() + kPrePendIndex);write_index_ = kPrePendIndex + readablebytes();read_index_ = kPrePendIndex;} else {
    buffer_.resize(write_index_ + len);}}private:std::vector<char> buffer_;int read_index_;int write_index_;
};}
#endif

2、buffer.cc


#include "buffer.h"#include <sys/uio.h>#include "logging.h"using namespace tiny_muduo;int Buffer::ReadFd(int fd) {
    char extrabuf[65536] = {
    0};struct iovec iv[2];const int writable = writablebytes();iv[0].iov_base = beginwrite();iv[0].iov_len = writable;iv[1].iov_base = extrabuf;iv[1].iov_len = sizeof(extrabuf);const int iovcnt = (writable < static_cast<int>(sizeof(extrabuf)) ? 2 : 1);int readn = static_cast<int>(::readv(fd, iv, iovcnt));if (readn < 0) {
    // LOG_ERRNO << "Buffer::ReadFd readv failed";} else if (readn <= writable) {
    write_index_ += readn;} else {
    write_index_ = static_cast<int>(buffer_.size());Append(extrabuf, readn - writable);}return readn;
}

3、channel.h


#ifndef TINY_MUDUO_CHANNEL_H_
#define TINY_MUDUO_CHANNEL_H_#include <sys/epoll.h>#include <utility>
#include <memory>#include "eventloop.h"
#include "callback.h"
#include "noncopyable.h"namespace tiny_muduo {
    enum ChannelState {
    kNew,kAdded,kDeleted
};class Channel : public NonCopyAble {
    public:typedef std::function<void()> ErrorCallback;Channel(EventLoop* loop, const int& fd);~Channel();void HandleEvent();void HandleEventWithGuard();void SetReadCallback(ReadCallback&& callback) {
     read_callback_ = std::move(callback); }void SetReadCallback(const ReadCallback& callback) {
     read_callback_ = callback; }void SetWriteCallback(WriteCallback&& callback) {
    write_callback_ = std::move(callback);}void SetWriteCallback(const WriteCallback& callback) {
    write_callback_ = callback;}void SetErrorCallback(ErrorCallback&& callback) {
    error_callback_ = std::move(callback);}void SetErrorCallback(const ErrorCallback& callback) {
    error_callback_ = callback;}  void Tie(const std::shared_ptr<void>& ptr) {
    tied_ = true;tie_ = ptr;} void EnableReading() {
    events_ |= (EPOLLIN | EPOLLPRI); Update();  }void EnableWriting() {
    events_ |= EPOLLOUT;Update();}void DisableAll() {
    events_ = 0;Update(); }void DisableWriting() {
    events_ &= ~EPOLLOUT;Update();  }void Update() {
    loop_->Update(this);}void SetReceivedEvents(int events__) {
    recv_events_ = events__;}void SetChannelState(ChannelState state__) {
    state_ = state__;}int fd() const {
     return fd_; } int events() const {
     return events_; }int recv_events() const {
     return recv_events_; }ChannelState state() const {
     return state_; }bool IsWriting() const {
     return events_ & EPOLLOUT; }bool IsReading() const {
     return events_ & (EPOLLIN | EPOLLPRI); }private:EventLoop* loop_;int fd_;int events_;      // update eventsint recv_events_; // epoll received eventsstd::weak_ptr<void> tie_;bool tied_;int errno_;ChannelState state_;ReadCallback read_callback_;WriteCallback write_callback_;  ErrorCallback error_callback_;
};}
#endif

3、channel.cc


#include "channel.h"#include <sys/poll.h>
#include <sys/epoll.h>using namespace tiny_muduo;Channel::Channel(EventLoop* loop,const int& fd__): loop_(loop),fd_(fd__),events_(0),recv_events_(0),tied_(false),state_(kNew) {
    
}Channel::~Channel() {
    
}void Channel::HandleEvent() {
    if (tied_) {
    std::shared_ptr<void> guard = tie_.lock();HandleEventWithGuard();} else {
    HandleEventWithGuard();}
}void Channel::HandleEventWithGuard() {
    if (recv_events_ & POLLNVAL) {
    LOG_ERROR << "Channel::HandleEventWithGuard POLLNVAL";}if (recv_events_ & (EPOLLERR | POLLNVAL)) {
    if (error_callback_) {
    error_callback_();}}if (recv_events_ & (EPOLLIN | EPOLLPRI | EPOLLRDHUP)) {
    if (read_callback_) {
    read_callback_();}} if (recv_events_ & EPOLLOUT) {
    if (write_callback_) {
    write_callback_();}}
}

4、epoller.h


#ifndef TINY_MUDUO_EPOLLER_H_
#define TINY_MUDUO_EPOLLER_H_#include <sys/epoll.h>
#include <vector>
#include <map>#include "noncopyable.h"static const int kDefaultEvents = 16;namespace tiny_muduo {
    class Channel;class Epoller : public NonCopyAble {
    public: typedef std::vector<epoll_event> Events;typedef std::vector<Channel*> Channels;Epoller();~Epoller();void Remove(Channel* channel_);void Poll(Channels& channels);int EpollWait() {
     return epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), -1); }void FillActiveChannels(int eventnums, Channels& channels); void Update(Channel* channel);void UpdateChannel(int operation, Channel* channel);private: typedef std::map<int, Channel*> ChannelMap;int epollfd_;Events events_;ChannelMap channels_;
};}
#endif

4、epoller.cc


#include "epoller.h"#include <assert.h>
#include <string.h>
#include <sys/epoll.h>#include <vector>#include "channel.h"
#include "logging.h"using namespace tiny_muduo;Epoller::Epoller(): epollfd_(::epoll_create1(EPOLL_CLOEXEC)),events_(kDefaultEvents), channels_() {
    
}Epoller::~Epoller() {
    ::close(epollfd_);
}void Epoller::Poll(Channels& channels) {
    int eventnums = EpollWait();FillActiveChannels(eventnums, channels);
}void Epoller::FillActiveChannels(int eventnums, Channels& channels) {
    for (int i = 0; i < eventnums; ++i) {
    Channel* ptr = static_cast<Channel*> (events_[i].data.ptr);ptr->SetReceivedEvents(events_[i].events);channels.emplace_back(ptr);}if (eventnums == static_cast<int>(events_.size())) {
    events_.resize(eventnums * 2);}
}void Epoller::Remove(Channel* channel) {
    int fd = channel->fd();ChannelState state = channel->state();assert(state == kDeleted || state == kAdded);if (state == kAdded) {
    UpdateChannel(EPOLL_CTL_DEL, channel);}channel->SetChannelState(kNew);channels_.erase(fd);return;
}void Epoller::Update(Channel* channel) {
    int op = 0, events = channel->events();ChannelState state = channel->state(); int fd = channel->fd();if (state == kNew || state == kDeleted) {
    if (state == kNew) {
    assert(channels_.find(fd) == channels_.end());channels_[fd] = channel;} else {
    assert(channels_.find(fd) != channels_.end());assert(channels_[fd] == channel);}op = EPOLL_CTL_ADD;channel->SetChannelState(kAdded);} else {
    assert(channels_.find(fd) != channels_.end());assert(channels_[fd] == channel);if (events == 0) {
    op = EPOLL_CTL_DEL;channel->SetChannelState(kDeleted); } else {
    op = EPOLL_CTL_MOD;}}UpdateChannel(op, channel);
}void Epoller::UpdateChannel(int operation, Channel* channel) {
    struct epoll_event event;event.events = channel->events();event.data.ptr = static_cast<void*>(channel);if (epoll_ctl(epollfd_, operation, channel->fd(), &event) < 0) {
    LOG_ERROR << "Epoller::UpdataChannel epoll_ctl failed"; }return;
}

5、eventloop.h


#ifndef TINY_MUDUO_EVENTLOOP_H_
#define TINY_MUDUO_EVENTLOOP_H_#include <stdint.h>
#include <unistd.h>
#include <assert.h>
#include <sys/eventfd.h>
#include <pthread.h>#include <vector>
#include <functional>
#include <memory>
#include <utility>#include "mutex.h"
#include "epoller.h"
#include "currentthread.h"
#include "timestamp.h"
#include "timerqueue.h"
#include "noncopyable.h"namespace tiny_muduo {
    class Epoller;
class Channel;class EventLoop : public NonCopyAble {
    public:typedef std::vector<Channel*> Channels;typedef std::function<void()> BasicFunc; typedef std::vector<BasicFunc> ToDoList;  EventLoop();~EventLoop();void RunAt(Timestamp timestamp, BasicFunc&& cb) {
    timer_queue_->AddTimer(timestamp, std::move(cb), 0.0);}void RunAfter(double wait_time, BasicFunc&& cb) {
    Timestamp timestamp(Timestamp::AddTime(Timestamp::Now(), wait_time)); timer_queue_->AddTimer(timestamp, std::move(cb), 0.0);}void RunEvery(double interval, BasicFunc&& cb) {
    Timestamp timestamp(Timestamp::AddTime(Timestamp::Now(), interval)); timer_queue_->AddTimer(timestamp, std::move(cb), interval);}bool IsInThreadLoop() {
     return CurrentThread::tid() == tid_; }void Update(Channel* channel) {
     epoller_->Update(channel); }void Remove(Channel* channel) {
     epoller_->Remove(channel); }void Loop();void HandleRead();void QueueOneFunc(BasicFunc func);void RunOneFunc(BasicFunc func);void DoToDoList();private:bool running_;pid_t tid_; std::unique_ptr<Epoller> epoller_;int wakeup_fd_;std::unique_ptr<Channel> wakeup_channel_;std::unique_ptr<TimerQueue> timer_queue_;bool calling_functors_;Channels active_channels_;ToDoList to_do_list_;MutexLock mutex_;
};}  // namespace tiny_muduo
#endif

5、eventloop.cc


#include "eventloop.h"#include <unistd.h>
#include <sys/eventfd.h>
#include <pthread.h>
#include <signal.h>#include <utility>#include "mutex.h"
#include "channel.h"using namespace tiny_muduo;namespace {
    class IgnoreSigPipe {
    public:IgnoreSigPipe() {
    ::signal(SIGPIPE, SIG_IGN);}
};IgnoreSigPipe initObj;}  // namespaceEventLoop::EventLoop(): running_(false),tid_(CurrentThread::tid()),epoller_(new Epoller()),wakeup_fd_(::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)),wakeup_channel_(new Channel(this, wakeup_fd_)),timer_queue_(new TimerQueue(this)),calling_functors_(false) {
    wakeup_channel_->SetReadCallback(std::bind(&EventLoop::HandleRead, this));wakeup_channel_->EnableReading();
}EventLoop::~EventLoop() {
    if (running_) running_ = false;wakeup_channel_->DisableAll();Remove(wakeup_channel_.get());close(wakeup_fd_);
}void EventLoop::Loop() {
    assert(IsInThreadLoop());running_ = true;while (running_) {
    active_channels_.clear();epoller_->Poll(active_channels_);for (const auto& channel : active_channels_) {
    channel->HandleEvent();}DoToDoList();}running_ = false;
}void EventLoop::HandleRead() {
    uint64_t read_one_byte = 1;ssize_t read_size = ::read(wakeup_fd_, &read_one_byte, sizeof(read_one_byte));(void) read_size;assert(read_size == sizeof(read_one_byte));return;
}void EventLoop::QueueOneFunc(BasicFunc func) {
    {
      MutexLockGuard lock(mutex_);to_do_list_.emplace_back(std::move(func));}   if (!IsInThreadLoop() || calling_functors_) {
    uint64_t write_one_byte = 1;  ssize_t write_size = ::write(wakeup_fd_, &write_one_byte, sizeof(write_one_byte));(void) write_size;assert(write_size == sizeof(write_one_byte));} 
}void EventLoop::RunOneFunc(BasicFunc func) {
     if (IsInThreadLoop()) {
       func(); } else {
    QueueOneFunc(std::move(func));  } 
}void EventLoop::DoToDoList() {
    ToDoList functors;calling_functors_ = true;{
    MutexLockGuard lock(mutex_);functors.swap(to_do_list_);}for (const auto& func : functors) {
    func();}calling_functors_ = false;
}

6、eventloopthread.h


#ifndef TINY_MUDUO_EVENTLOOPTHREAD_H_
#define TINY_MUDUO_EVENTLOOPTHREAD_H_#include "thread.h"
#include "mutex.h"
#include "condition.h"
#include "noncopyable.h"namespace tiny_muduo {
    class EventLoop;class EventLoopThread : public NonCopyAble {
    public:EventLoopThread();~EventLoopThread();void StartFunc();EventLoop* StartLoop();private:EventLoop* loop_;Thread thread_;MutexLock mutex_;Condition cond_;
};}
#endif

6、eventloopthread.cc


#include "eventloopthread.h"#include <pthread.h>
#include <functional>#include "mutex.h"
#include "condition.h"
#include "eventloop.h"using namespace tiny_muduo;EventLoopThread::EventLoopThread(): loop_(nullptr),thread_(std::bind(&EventLoopThread::StartFunc, this)),mutex_(),cond_(mutex_) {
    
}EventLoopThread::~EventLoopThread() {
    }        EventLoop* EventLoopThread::StartLoop() {
    thread_.StartThread();EventLoop* loop = nullptr;{
       MutexLockGuard lock(mutex_);while (loop_ == nullptr) {
    cond_.Wait();}loop = loop_;}return loop;
}void EventLoopThread::StartFunc() {
    EventLoop loop;{
    MutexLockGuard lock(mutex_);loop_ = &loop;cond_.Notify();}loop_->Loop();{
    MutexLockGuard lock(mutex_);loop_ = nullptr;}
}

7、eventloopthreadpool.h


#ifndef TINY_MUDUO_EVENTLOOPTHREADPOOL_H_
#define TINY_MUDUO_EVENTLOOPTHREADPOOL_H_#include <vector>
#include <memory>#include "noncopyable.h"namespace tiny_muduo {
    class EventLoopThread;
class EventLoop;class EventLoopThreadPool : public NonCopyAble {
    public:typedef std::vector<std::unique_ptr<EventLoopThread>> Thread;typedef std::vector<EventLoop*> Loop;EventLoopThreadPool(EventLoop* loop);~EventLoopThreadPool();void SetThreadNums(int thread_nums) {
    thread_nums_ = thread_nums;}void StartLoop();EventLoop* NextLoop();private:EventLoop* base_loop_;   Thread threads_;Loop loops_;int thread_nums_;int next_;
};}
#endif

7、eventloopthreadpool.cc


#include "eventloopthreadpool.h"#include <memory>#include "eventloopthread.h"using namespace tiny_muduo;EventLoopThreadPool::EventLoopThreadPool(EventLoop* loop): base_loop_(loop),thread_nums_(0),next_(0) {
    
}EventLoopThreadPool::~EventLoopThreadPool() {
    }void EventLoopThreadPool::StartLoop() {
    for (int i = 0; i < thread_nums_; ++i) {
    EventLoopThread* ptr = new EventLoopThread();threads_.emplace_back(std::unique_ptr<EventLoopThread>(ptr));loops_.emplace_back(ptr->StartLoop());}
}EventLoop* EventLoopThreadPool::NextLoop() {
    EventLoop* ret = base_loop_;if (!loops_.empty()) {
    ret = loops_[next_++];if (next_ == static_cast<int>(loops_.size())) next_ = 0;}return ret;
}

8、tcpconnection.h


#ifndef TINY_MUDUO_TCPCONNECTION_H_
#define TINY_MUDUO_TCPCONNECTION_H_#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>#include <string> 
#include <memory>
#include <utility>#include "callback.h"
#include "channel.h"
#include "buffer.h"
#include "httpcontent.h"
#include "noncopyable.h"
#include "timestamp.h"using std::string;namespace tiny_muduo {
    class EventLoop;class TcpConnection : public std::enable_shared_from_this<TcpConnection>, NonCopyAble {
    public:enum ConnectionState {
    kConnecting,kConnected,kDisconnecting,kDisconnected};TcpConnection(EventLoop* loop, int connfd, int id);~TcpConnection();void SetConnectionCallback(const ConnectionCallback& callback) {
     connection_callback_ = callback;}void SetConnectionCallback(ConnectionCallback&& callback) {
     connection_callback_ = std::move(callback);}void SetMessageCallback(const MessageCallback& callback) {
    message_callback_ = callback;}void SetMessageCallback(MessageCallback&& callback) {
    message_callback_ = std::move(callback);}void SetCloseCallback(const CloseCallback& callback) {
    close_callback_ = callback;}void SetCloseCallback(CloseCallback&& callback) {
    close_callback_ = std::move(callback);}void ConnectionEstablished() {
    state_ = kConnected;channel_->Tie(shared_from_this());channel_->EnableReading();connection_callback_(shared_from_this(), &input_buffer_);}void Shutdown();  bool IsShutdown() const {
     return state_ == kDisconnecting; }int GetErrno() const;void ConnectionDestructor();void HandleClose();void HandleError();void HandleMessage();void HandleWrite();void Send(Buffer* buffer);void Send(const string& str);void Send(const char* message, int len);void Send(const char* message) {
     Send(message, static_cast<int>(strlen(message))); }void UpdateTimestamp(Timestamp now) {
     timestamp_ = now; }Timestamp timestamp() const {
     return timestamp_; }int fd() const {
     return fd_; }int id() const {
     return connection_id_; }EventLoop* loop() const {
     return loop_; }HttpContent* GetHttpContent() {
     return &content_; }private:EventLoop* loop_;int fd_;int connection_id_;ConnectionState state_;std::unique_ptr<Channel> channel_;Buffer input_buffer_;Buffer output_buffer_;HttpContent content_;bool shutdown_state_;Timestamp timestamp_;ConnectionCallback connection_callback_;MessageCallback message_callback_;CloseCallback close_callback_;
};typedef std::shared_ptr<TcpConnection> TcpconnectionPtr;}  // namespace tiny_muduo
#endif

8、tcpconnection.cc


#include "tcpconnection.h"#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <string.h>#include "channel.h"
#include "buffer.h"
#include "logging.h"using namespace tiny_muduo;TcpConnection::TcpConnection(EventLoop* loop__, int connfd, int id__) : loop_(loop__),fd_(connfd),connection_id_(id__),state_(kConnecting),channel_(new Channel(loop_, fd_)),shutdown_state_(false) {
    channel_->SetReadCallback(std::bind(&TcpConnection::HandleMessage, this));channel_->SetWriteCallback(std::bind(&TcpConnection::HandleWrite, this));channel_->SetErrorCallback(std::bind(&TcpConnection::HandleError, this));
}TcpConnection::~TcpConnection() {
    ::close(fd_);
}void TcpConnection::ConnectionDestructor() {
    if (state_ == kDisconnecting || state_ == kConnected) {
    state_ = kDisconnected;channel_->DisableAll();}loop_->Remove(channel_.get());
}int TcpConnection::GetErrno() const {
    int optval;socklen_t optlen = static_cast<socklen_t>(sizeof optval);if (::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0) {
    return errno;} else {
    return optval;}
}void TcpConnection::HandleClose() {
    state_ = kDisconnected;channel_->DisableAll();TcpConnectionPtr guard(shared_from_this());close_callback_(guard);
}void TcpConnection::HandleError() {
    LOG_ERROR << "TcpConnection::HandleError" << " : " << ErrorToString(GetErrno());
}void TcpConnection::HandleMessage() {
    int read_size = input_buffer_.ReadFd(fd_);if (read_size > 0) {
    message_callback_(shared_from_this(), &input_buffer_);} else if (read_size == 0) {
    HandleClose();} else {
    LOG_ERROR << "TcpConnection::HandleMessage read failed";}
}void TcpConnection::HandleWrite() {
    if (channel_->IsWriting()) {
    int len = output_buffer_.readablebytes();int remaining = len;int send_size = static_cast<int>(::write(fd_, output_buffer_.Peek(), remaining));if (send_size < 0) {
    assert(send_size >= 0);if (errno != EWOULDBLOCK) {
    LOG_ERROR << "TcpConnection::HandleWrite write failed";}return;}remaining -= send_size;output_buffer_.Retrieve(send_size);assert(remaining <= len);if (!output_buffer_.readablebytes()) {
    channel_->DisableWriting();if (state_ == kDisconnecting) {
    Shutdown();}}}
}// Only Can Send In Same Thread, Can't Cross-Thread
void TcpConnection::Send(const char* message, int len) {
    int remaining = len;int send_size = 0;if (!channel_->IsWriting() && output_buffer_.readablebytes() == 0) {
    send_size = static_cast<int>(::write(fd_, message, len));if (send_size >= 0) {
    remaining -= send_size; } else {
    if (errno != EWOULDBLOCK) {
    LOG_ERROR << "TcpConnection::Send write failed";}return;}}assert(remaining <= len);if (remaining > 0) {
    output_buffer_.Append(message + send_size, remaining);if (!channel_->IsWriting()) {
    channel_->EnableWriting(); }}
}void TcpConnection::Shutdown() {
     state_ = kDisconnecting;if (!channel_->IsWriting()) {
    int ret = ::shutdown(fd_, SHUT_WR);if (ret < 0) {
    LOG_ERROR << "TcpConnection::Shutdown shutdown failed";}}
}// Only Can Send In Same Thread, Can't Cross-Thread
void TcpConnection::Send(Buffer* buffer) {
    if (state_ == kDisconnected) return; Send(buffer->Peek(), buffer->readablebytes()); buffer->RetrieveAll();
}// Only Can Send In Same Thread, Can't Cross-Thread
void TcpConnection::Send(const string& message) {
    if (state_ == kDisconnected) return; Send(message.data(), static_cast<int>(message.size()));
}

9、tcpserver.h


#ifndef TINY_MUDUO_TCPSERVER_H_
#define TINY_MUDUO_TCPSERVER_H_#include <memory>
#include <map>
#include <utility>
#include <memory>
#include <string>#include "callback.h"
#include "eventloop.h"
#include "acceptor.h"
#include "eventloopthreadpool.h"
#include "tcpconnection.h"
#include "logging.h"
#include "noncopyable.h"
#include "address.h"namespace tiny_muduo {
    class Address;class TcpServer : public NonCopyAble {
    public:TcpServer(EventLoop* loop, const Address& address);~TcpServer();void Start() {
    threads_->StartLoop();loop_->RunOneFunc(std::bind(&Acceptor::Listen, acceptor_.get()));}void SetConnectionCallback(ConnectionCallback&& callback) {
     connection_callback_ = std::move(callback);}void SetConnectionCallback(const ConnectionCallback& callback) {
     connection_callback_ = callback;}void SetMessageCallback(MessageCallback&& callback) {
    message_callback_ = std::move(callback);}void SetMessageCallback(const MessageCallback& callback) {
    message_callback_ = callback;}void SetThreadNums(int thread_nums) {
    threads_->SetThreadNums(thread_nums);    }inline void HandleClose(const TcpConnectionPtr& conn);inline void HandleCloseInLoop(const TcpConnectionPtr& ptr);inline void HandleNewConnection(int connfd, const Address& address);private:typedef std::map<int, TcpconnectionPtr> ConnectionMap;EventLoop* loop_;int next_connection_id_;std::unique_ptr<EventLoopThreadPool> threads_;std::unique_ptr<Acceptor> acceptor_;const std::string ipport_;ConnectionCallback connection_callback_;MessageCallback message_callback_;ConnectionMap connections_;
};}// namespace tiny_muduo
#endif

9、tcpserver.cc


#include "tcpserver.h"#include <assert.h>#include <climits>
#include <utility>#include "eventloopthreadpool.h"
#include "acceptor.h"
#include "tcpconnection.h"using namespace tiny_muduo;TcpServer::TcpServer(EventLoop* loop, const Address& address): loop_(loop),next_connection_id_(1),threads_(new EventLoopThreadPool(loop_)),acceptor_(new Acceptor(loop_, address)),ipport_(std::move(address.IpPortToString())) {
    acceptor_->SetNewConnectionCallback(std::bind(&TcpServer::HandleNewConnection, this, _1, _2));
}TcpServer::~TcpServer() {
    for (auto& pair : connections_) {
    TcpConnectionPtr ptr(pair.second);pair.second.reset();ptr->loop()->RunOneFunc(std::bind(&TcpConnection::ConnectionDestructor, ptr));}
}inline void TcpServer::HandleClose(const TcpConnectionPtr& ptr) {
    loop_->RunOneFunc(std::bind(&TcpServer::HandleCloseInLoop, this, ptr));
}inline void TcpServer::HandleCloseInLoop(const TcpConnectionPtr& ptr) {
    assert(connections_.find(ptr->fd()) != connections_.end());connections_.erase(connections_.find(ptr->fd()));LOG_INFO << "TcpServer::HandleCloseInLoop - remove connection " << "[" << ipport_ << '#' << ptr->id() << ']';EventLoop* loop = ptr->loop(); loop->QueueOneFunc(std::bind(&TcpConnection::ConnectionDestructor, ptr));
}inline void TcpServer::HandleNewConnection(int connfd, const Address& address) {
    EventLoop* loop = threads_->NextLoop(); TcpConnectionPtr ptr(new TcpConnection(loop, connfd, next_connection_id_));connections_[connfd] = ptr;ptr->SetConnectionCallback(connection_callback_);ptr->SetMessageCallback(message_callback_);ptr->SetCloseCallback(std::bind(&TcpServer::HandleClose, this, _1));LOG_INFO << "TcpServer::HandleNewConnection - new connection " << "[" << ipport_ << '#' << next_connection_id_ << ']' << " from " << address.IpPortToString();++next_connection_id_;if (next_connection_id_ == INT_MAX) next_connection_id_ = 1;loop->RunOneFunc(std::bind(&TcpConnection::ConnectionEstablished, ptr));
}

10、thread.h


#ifndef TINY_MUDUO_THREAD_H_
#define TINY_MUDUO_THREAD_H_#include <stdio.h>
#include <sys/prctl.h>
#include <pthread.h>#include <functional>
#include <string>#include "latch.h"
#include "noncopyable.h"using std::string;namespace tiny_muduo {
    static int thread_id_ = 1;class Thread : public NonCopyAble {
    public:typedef std::function<void ()> ThreadFunc;Thread(const ThreadFunc& func, const string& name = string());~Thread(); void StartThread();void Join() {
     ::pthread_join(pthread_id_, nullptr); }private:pthread_t pthread_id_;ThreadFunc func_;Latch latch_;string name_;
};       struct ThreadData {
    typedef tiny_muduo::Thread::ThreadFunc ThreadFunc;ThreadFunc func_;
Latch* latch_;
string name_;ThreadData(ThreadFunc& func, Latch* latch, const string& name): func_(func),latch_(latch),name_(name) {
    
}void RunOneFunc() {
    latch_->CountDown();latch_ = nullptr;  char buf[32] = {
    0};snprintf(buf, sizeof(buf), "%d", (thread_id_++));::prctl(PR_SET_NAME, name_.size() == 0 ? ("WorkThread " + string(buf)).data() : name_.data());func_();
}};}
#endif

10、thread.cc


#include "thread.h"#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>#include "latch.h"using namespace tiny_muduo;namespace CurrentThread {
    __thread int t_cachedTid = 0;
__thread char t_formattedTid[32];
__thread int t_formattedTidLength;pid_t gettid() {
    return static_cast<int>(syscall(SYS_gettid));
}void CacheTid() {
    if (t_cachedTid == 0) {
    t_cachedTid = gettid();t_formattedTidLength = snprintf(t_formattedTid, sizeof(t_formattedTid), "%5d ", t_cachedTid);}
}} // namespace CurrentThread;static void* ThreadRun(void* arg) {
    ThreadData* ptr = static_cast<ThreadData*>(arg);ptr->RunOneFunc();delete ptr;return nullptr;
}Thread::Thread(const ThreadFunc& func, const string& name): pthread_id_(-1),func_(func),latch_(1),name_(std::move(name)) {
    
}Thread::~Thread() {
    ::pthread_detach(pthread_id_);
}void Thread::StartThread() {
    ThreadData* ptr = new ThreadData(func_, &latch_, name_);::pthread_create(&pthread_id_, nullptr, ThreadRun, ptr);latch_.Wait();
}
  相关解决方案