📄 response.cpp
字号:
#include "config.h"
#include <string>
#include <vector>
#include <set>
#include <map>
#include <iostream>
#include <boost/thread/detail/config.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/date_time/gregorian/gregorian.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
// #include <boost/smart_assert/assert.hpp>
#include <boost/static_assert.hpp>
#include <loki/Factory.h>
#include "../zlib/zlib.h"
#include "request.h"
#include "response.h"
#include "stock.h"
#include "util.hpp"
namespace StockMarket{
recursive_mutex res_fact_mutex;
recursive_mutex process_packet_mutex;
StockRes::StockRes():raw_data(0), raw_len(0), data(0),len(0),current(0),left(0),dataNew(false)
{}
StockRes::StockRes(uchar* pData, ulong data_len) : raw_data(pData), raw_len(data_len)
{
this->read(pData, data_len);
}
StockRes::~StockRes()
{
if(dataNew) delete[] data;
}
uint StockRes::read(const uchar* pData, ulong data_len)
{
raw_data = current = data = pData;
dataNew = false;
ulong len1 = get_packet_len();
len1 = len1 < data_len ? len1 : data_len;
raw_len = left = len = len1;
ulong len2 = *(ushort*)(pData + 14);
skip_byte(16 - 4); // this time, the data and len are re-adjust !! // reserve the len2 for umcompress_if
umcompress_if();
if(len != len2)
{
throw StockResWrongData(8);
}
return len1;
}
ulong StockRes::buff_left()
{
return left;
}
bool StockRes::end()
{
return 0 == left;
}
void StockRes::reset()
{
current = data;
left = len;
}
void StockRes::skip_byte(ulong count)
{
if(count > left)
throw StockResWrongData(1);
current += count;
left -= count;
}
void StockRes::skip_data(ulong count)
{
while(count --)
{
if(current[0] < 0x80)
skip_byte(1);
else if(current[1] < 0x80)
skip_byte(2);
else
skip_byte(3);
}
}
char StockRes::get_char()
{
char v = (*current);
skip_byte(1);
return v;
}
uchar StockRes::get_uchar()
{
uchar v = (uchar)(*current);
skip_byte(1);
return v;
}
short StockRes::get_short()
{
short v = *(short*)current;
skip_byte(2);
return v;
}
ushort StockRes::get_ushort()
{
ushort v = *(ushort*)current;
skip_byte(2);
return v;
}
int StockRes::get_int()
{
int v = *(int*)current;
skip_byte(4);
return v;
}
uint StockRes::get_uint()
{
uint v = *(uint*)current;
skip_byte(4);
return v;
}
float StockRes::get_float()
{
float v = *(float*)current;
BOOST_STATIC_ASSERT(sizeof(float) == 4);
skip_byte(4);
return v;
}
int StockRes::parse_data()
{
if((current[0] >= 0x40 && current[0] < 0x80) || current[0] >= 0xc0) // negative
{
return 0x40 - parse_data2();
}
else
return parse_data2();
}
int StockRes::parse_data2() // we don't know this is correct yet
{
// 8f ff ff ff 1f == -49
// bd ff ff ff 1f == -3
// b0 fe ff ff 1f == -80
// 8c 01 == 76
// a8 fb b6 01 == 1017 万
// a3 8e 11 == 14.02 万
// 82 27 == 2498
int v;
int nBytes = 0;
while(current[nBytes] >= 0x80)
{
++nBytes;
}
++nBytes;
switch(nBytes)
{
case 1:
v = current[0];
break;
case 2:
v = current[1] * 0x40 + current[0] - 0x80;
break;
case 3:
v = (current[2] * 0x80 + current[1] - 0x80) * 0x40 + current[0] - 0x80;
break;
case 4:
v = ((current[3] * 0x80 + current[2] - 0x80) * 0x80 + current[1] - 0x80) * 0x40 + current[0] - 0x80;
break;
case 5:
// over flow, positive to negative
v = (((current[4] * 0x80 + current[3] - 0x80) * 0x80 + current[2] - 0x80) * 0x80 + current[1] - 0x80)* 0x40 + current[0] - 0x80;
break;
case 6:
// over flow, positive to negative
v = ((((current[5] * 0x80 + current[4] -0x80) * 0x80 + current[3] - 0x80) * 0x80 + current[2] - 0x80) * 0x80 + current[1] - 0x80) * 0x40 + current[0] - 0x80;
break;
default:
throw StockResWrongData(10);
}
skip_byte(nBytes);
return v;
}
// if the data is compressed, then uncompressed them; Or else do nothing
void StockRes::umcompress_if()
{
len = *(ushort*)(current);
ulong new_len = *(ushort*)(current + 2);
ulong new_len2 = new_len;
skip_byte(4); // now the current moved to the real or compressed data
if(0x78 == current[0] && 0x9c == (uchar)current[1])
{
uchar* new_data = new uchar[new_len];
if(0 == uncompress((Byte*)new_data, &new_len, (Byte*)current, len) && new_len == new_len2)
{
dataNew = true;
current = data = new_data;
left = len = new_len;
}
else
{
delete[] new_data;
throw StockResWrongData(2);
}
}
else
{
data += 16;
current = data;
}
}
void StockHeartBeatRes ::operator()(){}
ushort StockRes::get_packet_len()
{
return get_packet_len(raw_data);
}
ushort StockRes::get_packet_len(const uchar* pData)
{
return (*(ushort*)(pData + 12)) + 16;
}
uint StockRes::get_seq_id()
{
return get_seq_id(raw_data);
}
uint StockRes::get_seq_id(const uchar* pData)
{
return *(uint*)(pData + 5);
}
ushort StockRes::get_cmd_id()
{
return get_cmd_id(raw_data);
}
ushort StockRes::get_cmd_id(const uchar* pData)
{
return *(ushort*)(pData + 10);
}
StockListRes::StockListRes():StockRes()
{}
StockListRes::StockListRes(uchar* pData, ulong data_len):StockRes(pData, data_len)
{
// assert(CMD_STOCK_LIST == get_cmd_id());
}
void StockListRes::operator()()
{
posix_time::ptime now(posix_time::second_clock::local_time());
posix_time::time_duration td(now.time_of_day());
// uchar market = *(data + 4); // not market, but market location that is no use
string code1((char*)(data + 5), MarketInfo::StocksCodeLen);
short market = MarketInfo::get_market_type(code1);
if(market >= MarketInfo::MARKET_UNKNOWN)
{
throw StockResWrongData(9);
}
MarketInfo::stocks_count[market] = get_ushort();
StockBid::Bid bid; // zero it.
ushort count = get_ushort();
while(count--)
{
std::string code((char*)(current + 1), MarketInfo::StocksCodeLen);
// Set up the lists of all stocks.
if(MarketInfo::stocks_set[market].size() < MarketInfo::stocks_count[market]
&& market == MarketInfo::get_market_type(code)
&& (!(market == MarketInfo::MARKET_INDEX && code[0] < '3')))
{
MarketInfo::stocks_set[market].insert(code);
}
else
{
continue;
}
skip_byte(MarketInfo::StocksCodeLen + 1);
bid.minute = (uint)(td.hours() * 60 + td.minutes());
if(bid.minute > 15 * 60)
bid.minute = 15 * 60;
else if(11 * 60 + 30 < bid.minute && bid.minute < 13 * 60)
bid.minute = 11 * 60 + 30;
bid.act = get_ushort();
bid.price = parse_data();
bid.y_close = parse_data() + bid.price;
bid.open = parse_data() + bid.price;
bid.high = parse_data() + bid.price;
bid.low = parse_data() + bid.price;
bid.buy = parse_data() + bid.price;
bid.sell = parse_data() + bid.price;
bid.total_vol = parse_data2();
bid.avail_vol = parse_data2();
skip_byte(4);
bid.inner_vol = parse_data2();
bid.outer_vol = parse_data2();
bid.updown = parse_data();
skip_data(1);
bid.buy_price1 = parse_data() + bid.price;
bid.sell_price1 = parse_data() + bid.price;
bid.buy_vol1 = parse_data2();
bid.sell_vol1 = parse_data2();
bid.buy_price2 = parse_data() + bid.price;
bid.sell_price2 = parse_data() + bid.price;
bid.buy_vol2 = parse_data2();
bid.sell_vol2 = parse_data2();
bid.buy_price3 = parse_data() + bid.price;
bid.sell_price3 = parse_data() + bid.price;
bid.buy_vol3 = parse_data2();
bid.sell_vol3 = parse_data2();
bid.buy_price4 = parse_data() + bid.price;
bid.sell_price4 = parse_data() + bid.price;
bid.buy_vol4 = parse_data2();
bid.sell_vol4 = parse_data2();
bid.buy_price5 = parse_data() + bid.price;
bid.sell_price5 = parse_data() + bid.price;
bid.buy_vol5 = parse_data2();
bid.sell_vol5 = parse_data2();
recursive_mutex::scoped_lock guard(bid_mutex);
if(!(market == MarketInfo::MARKET_INDEX && code[0] < '3'))
instant_price_list[code][date_to_uint(gregorian::day_clock::local_day())].push_back(bid);
// find next item.
if(0 != count)
{
skip_byte(8);
const uchar* p = 0;
if(0 != (p = stock_code_search(current, 15)))
{
skip_byte(p - current - 1);
}
else
{
throw StockResWrongData(3);
}
}
}
return;
}
StockHoldChgRes::StockHoldChgRes():StockRes()
{}
StockHoldChgRes::StockHoldChgRes(uchar* pData, ulong data_len):StockRes(pData, data_len)
{
// assert(CMD_STOCKHOLD_CHANGE == get_cmd_id());
}
void StockHoldChgRes::operator()()
{
GBBQ gbbq;
ushort stock_count = get_ushort();
while(stock_count -- > 0)
{
skip_byte(1);
if(!stockcode_is_valid(current))
{
throw StockResWrongData(4);
}
string s((char*)current, MarketInfo::StocksCodeLen);
int uints = atoi(s.c_str());
skip_byte(MarketInfo::StocksCodeLen);
ushort record_count = get_ushort();
while( record_count -- > 0)
{
skip_byte(MarketInfo::StocksCodeLen + 2);
uint dt = get_uint();
gbbq.code = s;
gbbq.chg_type = get_uchar();
gbbq.data.gb.old_cir = get_float();
gbbq.data.gb.old_ttl = get_float();
gbbq.data.gb.new_cir = get_float();
gbbq.data.gb.new_ttl = get_float();
stock_basic_info::Instance().add_stock_gbbq(dt, gbbq);
}
}
}
StockResWrongData::StockResWrongData( int i) : n_(i)
{}
const char * StockResWrongData::what() const throw()
{
return "StockResWrongData";
}
void ProcessResponsePacket(const uchar * pData , size_t len, bool test)
{
static uint packet_len = 0;
static uint packet_curr_pos = 0;
static uchar packet_buff[TCP_BUFSIZE_READ];
recursive_mutex::scoped_lock guard(process_packet_mutex);
// cout << hex<< (int)pData[0] << (int)pData[1] << (int)pData[2] << (int)pData[3];
// cout << " len= " << len <<endl;
if(test)
{
packet_len = len;
}
else
{
// application layer assembly 应用层组包
if(pData[0] == 0xb1 && pData[1] == 0xcb && pData[2] == 0x74 && pData[3] == 0x00)
{
packet_curr_pos = 0;
packet_len = StockRes::get_packet_len(pData);
// cout << "packet_len= " << packet_len << endl;
}
}
if(packet_curr_pos + len <= packet_len)
{
memcpy(&packet_buff[0] + packet_curr_pos, pData, len);
packet_curr_pos += len;
}
else
{
return;
}
if(packet_curr_pos == packet_len)
{
// 组包完成, 调整指针
pData = &packet_buff[0];
len = packet_len;
packet_len = packet_curr_pos = 0;
}
else
{
return;
}
const uchar *orig_ptr = pData; uint orig_len = len;
int cmd_id = StockRes::get_cmd_id(pData);
try{
boost::scoped_ptr <StockRes> pCmd(ResponseFactory::Instance().CreateObject(cmd_id));
uint processed;
while(len > 0)
{
processed = pCmd->read(pData, len);
(*pCmd)();
Request::res_seq_id(StockRes::get_seq_id(pData));
len -= processed;
pData += processed;
}
}
catch(StockResWrongData& e)
{
if(!test)
{
log_packet((const char*)orig_ptr, orig_len);
}
cout << "Exception Id = " << e.n_ << endl;
}
catch(...)
{
cout << "Command" << cmd_id << " not implemented";
}
packet_len = packet_curr_pos = 0;
}
StockRes* CreateStockListRes()
{
return new StockListRes;
}
StockRes* CreateStockHoldChgRes()
{
return new StockHoldChgRes;
}
StockRes* CreateStockHeartBeatRes()
{
return new StockHeartBeatRes;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -