boost::asio http client 응용 노트
1.1 io_service
boost::asio::io_service::run()
작업 이 없 을 때 까지 계속 실행 되 며,중간 호출stop()
하면 모든 대기 중인 작업 이 즉시 실 행 됩 니 다.멈 출 때 모든 임 무 를 버 려 야 한다.해결 방안 은run_one()
,즉while (keep_running)
io_service_.run_one();
keep_running 은 bool 값 입 니 다.stop io서비스 할 때 false 를 직접 설치 하면 됩 니 다.
1.2 deadline_timer
호출
async_wait()
후 호출deadline_timer::cancel()
이 든 이 deadlinetimer 가 모두 분석 되 고handler
모두 촉발 된다.물론 이것 은 문서 에 쓰 여 있 습 니 다.야생 지침 을 피 하 는 방법 은 두 가지 가 있 는데,하 나 는 들 어 오 는 것handler
은 shared 이다.ptr,둘 째 는 한 층 더 포장 하 는 것 입 니 다.후 자 는handler
의 생명주기 가 스스로 제어 할 수 없 는 경우 에 적용 되 며,예시 코드 는 http client 1 절의 Timer Holder 클래스 를 보십시오.1.3 async*
이것 은 사실
deadline_timer::asyn::wait()
과 차이 가 많 지 않 습 니 다.async_read
,async_read_until
등 async접두사 함수,중간 에 정지 되면(예 를 들 어 호출ip::tcp::socket::close()
Handler
aborted 를 대표 하 는 boost::system:errorcode。 1.4 ip::tcp::socket
async_connect
에 게 건 네 주면 오류 가 발생 할 수 있 습 니 다.IP 에 불합리한 주소 가 있 을 수 있 습 니 다.예 를 들 어 0 의 주 소 를 되 돌 릴 수 있 습 니 다.해결 방법 은 http client 코드 의DoResolveAndConnect()
함 수 를 참고 합 니 다.2.http client 의 응용
C++클래스 로 봉인 되 었 습 니 다.이것 은 단일 스 레 드 의 실현(ioservice 는 같은 스 레 드 에서 run 입 니 다)socket 함 수 를 동기 화하 고 deadline 을 사용 합 니 다.timer 가 데 이 터 를 비동기 로 되 돌려 주 는 것 은 더욱 쉽게 제어 할 수 있 습 니 다.자세히 말 하지 않 겠 습 니 다.코드 를 보 세 요.
// header
#include <assert.h>
#include <string>
#include "boost/asio.hpp"
#include "boost/make_shared.hpp"
#include "boost/thread.hpp"
class HttpTransaction {
public:
// The Delegate implementation MAY delete HttpTransaction during Delegate
// method is invoked.
// Possible flow: ("*" means zero or more times.)
// 1. OnResponseReceived() -> OnDataReceived()* -> OnFinished().
// 2. OnError().
// 3. OnResponseReceived() -> OnError().
// 4. OnResponseReceived() -> OnDataReceived()* -> OnError().
class Delegate {
public:
virtual ~Delegate() {}
virtual void OnResponseReceived(HttpTransaction* transaction,
const HttpResponse& response) = 0;
virtual void OnDataReceived(HttpTransaction* transaction,
const char* data,
size_t length) = 0;
virtual void OnFinished(HttpTransaction* transaction) = 0;
virtual void OnError(HttpTransaction* transaction, int error_code) = 0;
};
explicit HttpTransaction(boost::asio::io_service& io);
~HttpTransaction();
void Start();
void Cancel();
const HttpRequest* request() const {
return http_request_;
}
void set_request(const HttpRequest* request) {
http_request_ = request;
}
Delegate* delegate() const { return delegate_; }
void set_delegate(Delegate* delegate) {
delegate_ = delegate;
}
private:
enum State {
STATE_NONE,
STATE_CONNECT,
STATE_SEND_REQUEST,
STATE_READ_HEADER,
STATE_READ_BODY,
STATE_READ_CHUNK_SIZE,
STATE_READ_CHUNK_DATA,
STATE_READ_UNTIL_EOF,
STATE_CALL_ON_FINISHED,
};
void DoLoop();
bool DoResolveAndConnect();
bool DoSendRequest();
bool DoReadHeader();
bool DoReadChunkSize();
bool DoReadChunkData();
bool DoReadBody();
bool DoReadUntilEof();
bool DoCallOnFinished();
void CallOnDataReceived(size_t size, size_t additional_consume_size = 0);
const HttpRequest* http_request_;
Delegate* delegate_;
HttpResponse* http_response_;
boost::asio::ip::tcp::resolver resolver_;
boost::asio::ip::tcp::socket socket_;
boost::shared_ptr<boost::asio::streambuf> stream_buf_;
size_t pending_read_size_;
State next_state_;
bool started_;
class TimerHolder;
boost::shared_ptr<TimerHolder> timer_holder_;
};
#####################################################################
// implementation
#include <string.h>
#include <assert.h>
#include <algorithm>
#include "boost/algorithm/hex.hpp"
#include "boost/bind.hpp"
#include "http_transaction_impl.h"
#include "http_request.h"
#include "http_response.h"
#include "util/url.h"
#include "util/logging.h"
#include "util/http_util.h"
// TimerHolder is needed because the callback is invoked even the timer
// cancelled or deleted.
class HttpTransaction::TimerHolder {
public:
TimerHolder(HttpTransaction* trans,
boost::asio::io_service& io) // NOLINT
: trans_(trans),
timer_(io) {}
void Schedule() {
timer_.expires_from_now(boost::posix_time::microseconds(0));
timer_.async_wait(boost::bind(&TimerHolder::OnTimer, trans_->timer_holder_,
boost::asio::placeholders::error));
}
void OnTransactionCancelled() {
trans_ = NULL;
timer_.cancel();
}
private:
void OnTimer(const boost::system::error_code& err) {
if (!err && trans_) {
trans_->DoLoop();
}
}
HttpTransaction* trans_;
boost::asio::deadline_timer timer_;
};
HttpTransaction::HttpTransaction(boost::asio::io_service& io)
: http_request_(NULL),
delegate_(NULL),
http_response_(NULL),
resolver_(io),
socket_(io),
stream_buf_(new boost::asio::streambuf),
pending_read_size_(0),
next_state_(STATE_NONE),
started_(false),
timer_holder_(new TimerHolder(this, io)) {
}
HttpTransaction::~HttpTransaction() {
Cancel();
}
void HttpTransaction::Start() {
assert(!started_);
started_ = true;
next_state_ = STATE_CONNECT;
timer_holder_->Schedule();
}
void HttpTransaction::Cancel() {
next_state_ = STATE_NONE;
timer_holder_->OnTransactionCancelled();
socket_.close();
if (http_response_) {
delete http_response_;
http_response_ = NULL;
}
}
void HttpTransaction::DoLoop() {
bool rv = false;
do {
State state = next_state_;
next_state_ = STATE_NONE;
switch (state) {
case STATE_CONNECT:
rv = DoResolveAndConnect();
break;
case STATE_SEND_REQUEST:
rv = DoSendRequest();
break;
case STATE_READ_HEADER:
rv = DoReadHeader();
break;
case STATE_READ_BODY:
rv = DoReadBody();
break;
case STATE_READ_UNTIL_EOF:
rv = DoReadUntilEof();
break;
case STATE_READ_CHUNK_SIZE:
rv = DoReadChunkSize();
break;
case STATE_READ_CHUNK_DATA:
rv = DoReadChunkData();
break;
case STATE_CALL_ON_FINISHED:
rv = DoCallOnFinished();
break;
default:
assert(0);
break;
}
} while (rv);
}
bool HttpTransaction::DoResolveAndConnect() {
URL url(http_request_->url());
// TODO(liuhx): if url is ip address.
boost::asio::ip::tcp::resolver::query query(
url.host(), url.port() == 0 ? url.protocol() : url.port_string());
boost::system::error_code err;
boost::asio::ip::tcp::resolver::iterator it = resolver_.resolve(query, err);
boost::asio::ip::tcp::resolver::iterator end; // Default is end.
if (err || it == end) {
LOG_DEBUG(kLogTagHttpTrans, "resolve error:%s",
err.message().c_str());
delegate_->OnError(this, err.value());
return false;
}
do {
LOG_INFO(kLogTagHttpTrans, "dns result:%s",
it->endpoint().address().to_string().c_str());
// "unspecified" means address is "0.0.0.0". It may appear on some machines
// running Apache. Please google it for more detail.
if (!it->endpoint().address().is_unspecified()) {
socket_.close();
LOG_INFO(kLogTagHttpTrans, "connecting:%s",
it->endpoint().address().to_string().c_str());
socket_.connect(*it, err);
if (!err)
break;
}
++it;
} while (it != end);
if (err) {
LOG_DEBUG(kLogTagHttpTrans, "connect error:%s",
err.message().c_str());
delegate_->OnError(this, err.value());
return false;
}
next_state_ = STATE_SEND_REQUEST;
return true;
}
bool HttpTransaction::DoSendRequest() {
URL url(http_request_->url());
std::ostream request_stream(stream_buf_.get());
request_stream << http_request_->method() << " " << url.path()
<< " HTTP/1.1\r
";
if (!http_request_->HasHeader("Host")) {
request_stream << "Host: " << url.host() << "\r
";
}
if (!http_request_->HasHeader("Connection")) {
request_stream << "Connection: keep-alive\r
";
}
const char* name;
const char* value;
void* iter = NULL;
while (http_request_->EnumerateHeaderLines(&iter, &name, &value))
request_stream << name << ": " << value << "\r
";
size_t size = 0;
if (http_request_->body(&value, &size)) {
if (!http_request_->HasHeader("Content-Length")) {
request_stream << "Content-Length: " << size << "\r
";
}
request_stream << "\r
";
request_stream.write(value, size);
} else {
request_stream << "\r
";
}
boost::system::error_code err;
// boost::asio::write() consumes |stream_buf_|, no need to do it ourselves.
boost::asio::write(socket_, *stream_buf_, err);
if (err) {
LOG_DEBUG(kLogTagHttpTrans, "send request error:%s",
err.message().c_str());
delegate_->OnError(this, err.value());
} else {
next_state_ = STATE_READ_HEADER;
timer_holder_->Schedule();
}
return false;
}
bool HttpTransaction::DoReadHeader() {
boost::system::error_code err;
boost::asio::read_until(socket_, *stream_buf_, "\r
\r
", err);
if (err) {
LOG_DEBUG(kLogTagHttpTrans, "read header error:%s",
err.message().c_str());
delegate_->OnError(this, err.value());
return false;
}
size_t size = stream_buf_->size();
const char* data = boost::asio::buffer_cast<const char*>(stream_buf_->data());
size_t pos = std::string(data, size).find("\r
\r
");
if (pos == std::string::npos) {
LOG_DEBUG(kLogTagHttpTrans,
"Can not find header end. Maybe TCP data Out-of-Order");
delegate_->OnError(this, 1234);
return false;
}
http_response_ = new HttpResponseImpl(http_request_->url(), data, pos);
stream_buf_->consume(pos + 4); // Skip 4 = "\r
\r
".
if (http_response_->status_code() < 100) {
LOG_DEBUG(kLogTagHttpTrans, "Header invalid");
delegate_->OnError(this, 2345);
return false;
}
// TODO(liuhx): Body may be available even status code is not 200.For
// example, some website may display a web page while replying with 404.
if (http_response_->status_code() != 200) {
next_state_ = STATE_CALL_ON_FINISHED;
} else {
const char* content_length = http_response_->GetHeader("content-length");
if (content_length) {
pending_read_size_ = atoi(content_length);
next_state_ = STATE_READ_BODY;
} else {
const char* encoding = http_response_->GetHeader("Transfer-Encoding");
bool is_chunk = encoding && (std::string(encoding).find("chunked") !=
std::string::npos);
if (is_chunk) {
next_state_ = STATE_READ_CHUNK_SIZE;
} else {
next_state_ = STATE_READ_UNTIL_EOF;
}
}
}
timer_holder_->Schedule();
delegate_->OnResponseReceived(this, *http_response_);
return false;
}
bool HttpTransaction::DoReadBody() {
// If content-length exists, the connection may be keep-alive. We MUST keep
// counting |pending_read_size_| instead of reading until EOF.
if (pending_read_size_ == 0) {
delegate_->OnFinished(this);
return false;
}
while (true) {
// boost may read addtional data beyond the condition in STATE_READ_HEADER,
// pass left data first if exists.
size_t size = stream_buf_->size();
if (size) {
next_state_ = STATE_READ_BODY;
timer_holder_->Schedule();
// TODO(liuhx): assert -> OnError
assert(pending_read_size_ >= size);
pending_read_size_ -= size;
CallOnDataReceived(size);
break;
} else {
boost::system::error_code err;
boost::asio::read(socket_, *stream_buf_,
boost::asio::transfer_at_least(1), err);
if (err) {
LOG_DEBUG(kLogTagHttpTrans, "read body error:%s",
err.message().c_str());
delegate_->OnError(this, err.value());
break;
}
}
}
return false;
}
// About chunked encoding, refer to http://tools.ietf.org/html/rfc2616#page-25
bool HttpTransaction::DoReadChunkSize() {
while (true) {
// boost may read addtional data beyond the condition, find "\r
" first.
size_t size = stream_buf_->size();
if (size) {
const char* data =
boost::asio::buffer_cast<const char*>(stream_buf_->data());
size_t index = std::string(data, size).find("\r
");
if (index != std::string::npos) {
pending_read_size_ = static_cast<size_t>(strtol(data, NULL, 16));
stream_buf_->consume(index + 2); // Skip +2 = "\r
"
if (pending_read_size_ == 0) {
delegate_->OnFinished(this);
return false;
}
break;
}
}
boost::system::error_code err;
boost::asio::read_until(socket_, *stream_buf_, "\r
", err);
if (err) {
LOG_DEBUG(kLogTagHttpTrans, "read chunk size error:%s",
err.message().c_str());
delegate_->OnError(this, err.value());
return false;
}
}
next_state_ = STATE_READ_CHUNK_DATA;
return true;
}
bool HttpTransaction::DoReadChunkData() {
while (true) {
size_t size = stream_buf_->size();
if (size) {
bool reach_end = size >= pending_read_size_;
size_t data_size = reach_end ? pending_read_size_ : size;
pending_read_size_ -= data_size;
next_state_ = reach_end ? STATE_READ_CHUNK_SIZE : STATE_READ_CHUNK_DATA;
timer_holder_->Schedule();
CallOnDataReceived(data_size, reach_end ? 2 : 0); // Skip 2 = "\r
".
break;
} else {
boost::system::error_code err;
boost::asio::read_until(socket_, *stream_buf_, "\r
", err);
if (err) {
LOG_DEBUG(kLogTagHttpTrans, "read chunk data error:%s",
err.message().c_str());
delegate_->OnError(this, err.value());
break;
}
}
}
return false;
}
bool HttpTransaction::DoReadUntilEof() {
while (true) {
size_t size = stream_buf_->size();
if (size) {
next_state_ = STATE_READ_UNTIL_EOF;
timer_holder_->Schedule();
const char* data =
boost::asio::buffer_cast<const char*>(stream_buf_->data());
boost::shared_ptr<boost::asio::streambuf> buf = stream_buf_;
delegate_->OnDataReceived(this, data, size);
buf->consume(size);
break;
} else {
boost::system::error_code err;
boost::asio::read(socket_, *stream_buf_,
boost::asio::transfer_at_least(1), err);
if (err) {
if (err == boost::asio::error::eof) {
delegate_->OnFinished(this);
} else {
LOG_DEBUG(kLogTagHttpTrans, "%s", err.message().c_str());
delegate_->OnError(this, err.value());
}
break;
}
}
}
return false;
}
bool HttpTransaction::DoCallOnFinished() {
delegate_->OnFinished(this);
return false;
}
void HttpTransaction::CallOnDataReceived(size_t size,
size_t additional_consume_size) {
const char* data = boost::asio::buffer_cast<const char*>(stream_buf_->data());
// Because Delegate may delete HttpTransaction during callback, we MUST NOT
// access member variable after callback method, instead, use the |buf|.
boost::shared_ptr<boost::asio::streambuf> buf = stream_buf_;
delegate_->OnDataReceived(this, data, size);
buf->consume(size + additional_consume_size);
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
빠른 팁: SingleStoreDB의 데이터 API 사용SingleStoreDB는 HTTP 연결을 통해 SQL 문을 실행하는 데 사용할 수 있는 을 제공합니다. 이 짧은 문서에서는 이 데이터 API를 사용하는 방법에 대한 예를 보여줍니다. A는 무료 SingleStore...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.