#include "AsyncHttpClient.h"

namespace hv {

int AsyncHttpClient::send(const HttpRequestPtr& req, HttpResponseCallback resp_cb) {
    hloop_t* loop = EventLoopThread::hloop();
    if (loop == NULL) return -1;
    auto task = std::make_shared<HttpClientTask>();
    task->req = req;
    task->cb = std::move(resp_cb);
    task->start_time = hloop_now_hrtime(loop);
    if (req->retry_count > 0 && req->retry_delay > 0) {
        req->retry_count = MIN(req->retry_count, req->timeout * 1000 / req->retry_delay - 1);
    }
    return send(task);
}

// createsocket => startConnect =>
// onconnect => sendRequest => startRead =>
// onread => HttpParser => resp_cb
int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
    const HttpRequestPtr& req = task->req;
    if (req->cancel) {
        return -1;
    }

    // queueInLoop timeout?
    uint64_t now_hrtime = hloop_now_hrtime(EventLoopThread::hloop());
    int elapsed_ms = (now_hrtime - task->start_time) / 1000;
    int timeout_ms = req->timeout * 1000;
    if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
        hlogw("%s queueInLoop timeout!", req->url.c_str());
        return -10;
    }

    req->ParseUrl();
    sockaddr_u peeraddr;
    memset(&peeraddr, 0, sizeof(peeraddr));
    const char* host = req->host.c_str();
    int ret = sockaddr_set_ipport(&peeraddr, host, req->port);
    if (ret != 0) {
        hloge("unknown host %s", host);
        return -20;
    }

    int connfd = -1;
    // first get from conn_pools
    char strAddr[SOCKADDR_STRLEN] = {0};
    SOCKADDR_STR(&peeraddr, strAddr);
    auto iter = conn_pools.find(strAddr);
    if (iter != conn_pools.end()) {
        // hlogd("get from conn_pools");
        iter->second.get(connfd);
    }

    if (connfd < 0) {
        // create socket
        connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
        if (connfd < 0) {
            perror("socket");
            return -30;
        }
        hio_t* connio = hio_get(EventLoopThread::hloop(), connfd);
        assert(connio != NULL);
        hio_set_peeraddr(connio, &peeraddr.sa, sockaddr_len(&peeraddr));
        addChannel(connio);
        // https
        if (req->IsHttps() && !req->IsProxy()) {
            hio_enable_ssl(connio);
            if (!is_ipaddr(host)) {
                hio_set_hostname(connio, host);
            }
        }
    }

    const SocketChannelPtr& channel = getChannel(connfd);
    assert(channel != NULL);
    HttpClientContext* ctx = channel->getContext<HttpClientContext>();
    ctx->task = task;
    channel->onconnect = [&channel]() {
        sendRequest(channel);
    };
    channel->onread = [this, &channel](Buffer* buf) {
        HttpClientContext* ctx = channel->getContext<HttpClientContext>();
        if (ctx->task == NULL) return;
        if (ctx->task->req->cancel) {
            channel->close();
            return;
        }
        const char* data = (const char*)buf->data();
        int len = buf->size();
        int nparse = ctx->parser->FeedRecvData(data, len);
        if (nparse != len) {
            ctx->errorCallback();
            channel->close();
            return;
        }
        if (ctx->parser->IsComplete()) {
            auto& req = ctx->task->req;
            auto& resp = ctx->resp;
            bool keepalive = req->IsKeepAlive() && resp->IsKeepAlive();
            if (req->redirect && HTTP_STATUS_IS_REDIRECT(resp->status_code)) {
                std::string location = resp->headers["Location"];
                if (!location.empty()) {
                    hlogi("redirect %s => %s", req->url.c_str(), location.c_str());
                    req->url = location;
                    req->ParseUrl();
                    req->headers["Host"] = req->host;
                    resp->Reset();
                    send(ctx->task);
                    // NOTE: detatch from original channel->context
                    ctx->cancelTask();
                }
            } else {
                ctx->successCallback();
            }
            if (keepalive) {
                // NOTE: add into conn_pools to reuse
                // hlogd("add into conn_pools");
                conn_pools[channel->peeraddr()].add(channel->fd());
            } else {
                channel->close();
            }
        }
    };
    channel->onclose = [this, &channel]() {
        HttpClientContext* ctx = channel->getContext<HttpClientContext>();
        // NOTE: remove from conn_pools
        // hlogd("remove from conn_pools");
        auto iter = conn_pools.find(channel->peeraddr());
        if (iter != conn_pools.end()) {
            iter->second.remove(channel->fd());
        }

        const HttpClientTaskPtr& task = ctx->task;
        if (task) {
            if (ctx->parser &&
                ctx->parser->IsEof()) {
                ctx->successCallback();
            }
            else if (task->req &&
                     task->req->cancel == 0 &&
                     task->req->retry_count-- > 0) {
                if (task->req->retry_delay > 0) {
                    // try again after delay
                    setTimeout(task->req->retry_delay, [this, task](TimerID timerID){
                        hlogi("retry %s %s", http_method_str(task->req->method), task->req->url.c_str());
                        sendInLoop(task);
                    });
                } else {
                    send(task);
                }
            }
            else {
                ctx->errorCallback();
            }
        }

        removeChannel(channel);
    };

    // timer
    if (timeout_ms > 0) {
        ctx->timerID = setTimeout(timeout_ms - elapsed_ms, [&channel](TimerID timerID){
            HttpClientContext* ctx = channel->getContext<HttpClientContext>();
            if (ctx && ctx->task) {
                hlogw("%s timeout!", ctx->task->req->url.c_str());
            }
            if (channel) {
                channel->close();
            }
        });
    }

    if (channel->isConnected()) {
        // sendRequest
        sendRequest(channel);
    } else {
        // startConnect
        if (req->connect_timeout > 0) {
            channel->setConnectTimeout(req->connect_timeout * 1000);
        }
        channel->startConnect();
    }

    return 0;
}

// InitResponse => SubmitRequest => while(GetSendData) write => startRead
int AsyncHttpClient::sendRequest(const SocketChannelPtr& channel) {
    HttpClientContext* ctx = (HttpClientContext*)channel->context();
    assert(ctx != NULL && ctx->task != NULL);
    if (ctx->resp == NULL) {
        ctx->resp = std::make_shared<HttpResponse>();
    }
    HttpRequest* req = ctx->task->req.get();
    HttpResponse* resp = ctx->resp.get();
    assert(req != NULL && resp != NULL);
    if (req->http_cb) resp->http_cb = std::move(req->http_cb);

    if (ctx->parser == NULL) {
        ctx->parser.reset(HttpParser::New(HTTP_CLIENT, (http_version)req->http_major));
    }
    ctx->parser->InitResponse(resp);
    ctx->parser->SubmitRequest(req);

    char* data = NULL;
    size_t len = 0;
    while (ctx->parser->GetSendData(&data, &len)) {
        if (req->cancel) {
            channel->close();
            return -1;
        }
        // NOTE: ensure write buffer size is enough
        if (len > (1 << 22) /* 4M */) {
            channel->setMaxWriteBufsize(len);
        }
        channel->write(data, len);
    }
    channel->startRead();

    return 0;
}

}