emsApplication/sdk/include/hv/Channel.h

378 lines
9.7 KiB
C
Raw Normal View History

2024-09-10 16:12:38 +08:00
#ifndef HV_CHANNEL_HPP_
#define HV_CHANNEL_HPP_
#include <string>
#include <functional>
#include <memory>
#include <atomic>
#include "hloop.h"
#include "hsocket.h"
#include "Buffer.h"
namespace hv {
class Channel {
public:
Channel(hio_t* io = NULL) {
io_ = io;
fd_ = -1;
id_ = 0;
ctx_ = NULL;
status = CLOSED;
if (io) {
fd_ = hio_fd(io);
id_ = hio_id(io);
ctx_ = hio_context(io);
hio_set_context(io, this);
if (hio_is_opened(io)) {
status = OPENED;
}
if (hio_getcb_read(io) == NULL) {
hio_setcb_read(io_, on_read);
}
if (hio_getcb_write(io) == NULL) {
hio_setcb_write(io_, on_write);
}
if (hio_getcb_close(io) == NULL) {
hio_setcb_close(io_, on_close);
}
}
}
virtual ~Channel() {
if (isOpened()) {
close();
// NOTE: Detach after destructor to avoid triggering onclose
if (io_ && id_ == hio_id(io_)) {
hio_set_context(io_, NULL);
}
}
}
hio_t* io() { return io_; }
int fd() { return fd_; }
uint32_t id() { return id_; }
int error() { return hio_error(io_); }
// context
void* context() {
return ctx_;
}
void setContext(void* ctx) {
ctx_ = ctx;
}
template<class T>
T* newContext() {
ctx_ = new T;
return (T*)ctx_;
}
template<class T>
T* getContext() {
return (T*)ctx_;
}
template<class T>
void deleteContext() {
if (ctx_) {
delete (T*)ctx_;
ctx_ = NULL;
}
}
// contextPtr
std::shared_ptr<void> contextPtr() {
return contextPtr_;
}
void setContextPtr(const std::shared_ptr<void>& ctx) {
contextPtr_ = ctx;
}
void setContextPtr(std::shared_ptr<void>&& ctx) {
contextPtr_ = std::move(ctx);
}
template<class T>
std::shared_ptr<T> newContextPtr() {
contextPtr_ = std::make_shared<T>();
return std::static_pointer_cast<T>(contextPtr_);
}
template<class T>
std::shared_ptr<T> getContextPtr() {
return std::static_pointer_cast<T>(contextPtr_);
}
void deleteContextPtr() {
contextPtr_.reset();
}
bool isOpened() {
if (io_ == NULL || status >= DISCONNECTED) return false;
return id_ == hio_id(io_) && hio_is_opened(io_);
}
bool isClosed() {
return !isOpened();
}
int startRead() {
if (!isOpened()) return -1;
return hio_read_start(io_);
}
int stopRead() {
if (!isOpened()) return -1;
return hio_read_stop(io_);
}
int readOnce() {
if (!isOpened()) return -1;
return hio_read_once(io_);
}
int readString() {
if (!isOpened()) return -1;
return hio_readstring(io_);
}
int readLine() {
if (!isOpened()) return -1;
return hio_readline(io_);
}
int readBytes(int len) {
if (!isOpened() || len <= 0) return -1;
return hio_readbytes(io_, len);
}
// write thread-safe
int write(const void* data, int size) {
if (!isOpened()) return -1;
return hio_write(io_, data, size);
}
int write(Buffer* buf) {
return write(buf->data(), buf->size());
}
int write(const std::string& str) {
return write(str.data(), str.size());
}
// iobuf setting
void setReadBuf(void* buf, size_t len) {
if (io_ == NULL) return;
hio_set_readbuf(io_, buf, len);
}
void setMaxReadBufsize(uint32_t size) {
if (io_ == NULL) return;
hio_set_max_read_bufsize(io_, size);
}
void setMaxWriteBufsize(uint32_t size) {
if (io_ == NULL) return;
hio_set_max_write_bufsize(io_, size);
}
size_t writeBufsize() {
if (io_ == NULL) return 0;
return hio_write_bufsize(io_);
}
bool isWriteComplete() {
return writeBufsize() == 0;
}
// close thread-safe
int close(bool async = false) {
if (isClosed()) return -1;
status = CLOSED;
return async ? hio_close_async(io_) : hio_close(io_);
}
public:
hio_t* io_;
int fd_;
uint32_t id_;
void* ctx_;
enum Status {
OPENED,
CONNECTING,
CONNECTED,
DISCONNECTED,
CLOSED,
};
std::atomic<Status> status;
std::function<void(Buffer*)> onread;
// NOTE: Use Channel::isWriteComplete in onwrite callback to determine whether all data has been written.
std::function<void(Buffer*)> onwrite;
std::function<void()> onclose;
std::shared_ptr<void> contextPtr_;
private:
static void on_read(hio_t* io, void* data, int readbytes) {
Channel* channel = (Channel*)hio_context(io);
if (channel && channel->onread) {
Buffer buf(data, readbytes);
channel->onread(&buf);
}
}
static void on_write(hio_t* io, const void* data, int writebytes) {
Channel* channel = (Channel*)hio_context(io);
if (channel && channel->onwrite) {
Buffer buf((void*)data, writebytes);
channel->onwrite(&buf);
}
}
static void on_close(hio_t* io) {
Channel* channel = (Channel*)hio_context(io);
if (channel) {
channel->status = CLOSED;
if (channel->onclose) {
channel->onclose();
}
}
}
};
class SocketChannel : public Channel {
public:
std::function<void()> onconnect; // only for TcpClient
std::function<void()> heartbeat;
SocketChannel(hio_t* io) : Channel(io) {
}
virtual ~SocketChannel() {}
// SSL/TLS
int enableSSL() {
if (io_ == NULL) return -1;
return hio_enable_ssl(io_);
}
bool isSSL() {
if (io_ == NULL) return false;
return hio_is_ssl(io_);
}
int setSSL(hssl_t ssl) {
if (io_ == NULL) return -1;
return hio_set_ssl(io_, ssl);
}
int setSslCtx(hssl_ctx_t ssl_ctx) {
if (io_ == NULL) return -1;
return hio_set_ssl_ctx(io_, ssl_ctx);
}
int newSslCtx(hssl_ctx_opt_t* opt) {
if (io_ == NULL) return -1;
return hio_new_ssl_ctx(io_, opt);
}
// for hssl_set_sni_hostname
int setHostname(const std::string& hostname) {
if (io_ == NULL) return -1;
return hio_set_hostname(io_, hostname.c_str());
}
// timeout
void setConnectTimeout(int timeout_ms) {
if (io_ == NULL) return;
hio_set_connect_timeout(io_, timeout_ms);
}
void setCloseTimeout(int timeout_ms) {
if (io_ == NULL) return;
hio_set_close_timeout(io_, timeout_ms);
}
void setReadTimeout(int timeout_ms) {
if (io_ == NULL) return;
hio_set_read_timeout(io_, timeout_ms);
}
void setWriteTimeout(int timeout_ms) {
if (io_ == NULL) return;
hio_set_write_timeout(io_, timeout_ms);
}
void setKeepaliveTimeout(int timeout_ms) {
if (io_ == NULL) return;
hio_set_keepalive_timeout(io_, timeout_ms);
}
// heartbeat
// NOTE: Beware of circular reference problems caused by passing SocketChannelPtr by value.
void setHeartbeat(int interval_ms, std::function<void()> fn) {
if (io_ == NULL) return;
heartbeat = std::move(fn);
hio_set_heartbeat(io_, interval_ms, send_heartbeat);
}
/*
* unpack
*
* NOTE: unpack_setting_t of multiple IOs of the same function also are same,
* so only the pointer of unpack_setting_t is stored in hio_t,
* the life time of unpack_setting_t shoud be guaranteed by caller.
*/
void setUnpack(unpack_setting_t* setting) {
if (io_ == NULL) return;
hio_set_unpack(io_, setting);
}
int startConnect(int port, const char* host = "127.0.0.1") {
sockaddr_u peeraddr;
memset(&peeraddr, 0, sizeof(peeraddr));
int ret = sockaddr_set_ipport(&peeraddr, host, port);
if (ret != 0) {
// hloge("unknown host %s", host);
return ret;
}
return startConnect(&peeraddr.sa);
}
int startConnect(struct sockaddr* peeraddr) {
if (io_ == NULL) return -1;
hio_set_peeraddr(io_, peeraddr, SOCKADDR_LEN(peeraddr));
return startConnect();
}
int startConnect() {
if (io_ == NULL) return -1;
status = CONNECTING;
hio_setcb_connect(io_, on_connect);
return hio_connect(io_);
}
bool isConnected() {
return status == CONNECTED && isOpened();
}
std::string localaddr() {
if (io_ == NULL) return "";
struct sockaddr* addr = hio_localaddr(io_);
char buf[SOCKADDR_STRLEN] = {0};
return SOCKADDR_STR(addr, buf);
}
std::string peeraddr() {
if (io_ == NULL) return "";
struct sockaddr* addr = hio_peeraddr(io_);
char buf[SOCKADDR_STRLEN] = {0};
return SOCKADDR_STR(addr, buf);
}
private:
static void on_connect(hio_t* io) {
SocketChannel* channel = (SocketChannel*)hio_context(io);
if (channel) {
channel->status = CONNECTED;
if (channel->onconnect) {
channel->onconnect();
}
}
}
static void send_heartbeat(hio_t* io) {
SocketChannel* channel = (SocketChannel*)hio_context(io);
if (channel && channel->heartbeat) {
channel->heartbeat();
}
}
};
typedef std::shared_ptr<Channel> ChannelPtr;
typedef std::shared_ptr<SocketChannel> SocketChannelPtr;
}
#endif // HV_CHANNEL_HPP_