Files
http_vpn/http_vpn_20191209/ccn_p2p/ndn_socket.cpp
T
2020-02-23 20:44:48 +08:00

347 lines
11 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include <iostream>
#include "ndn_socket.h"
using namespace std;
using namespace ndn;
// 类的实例化
Ndn_socket::Ndn_socket(){
this->seq = 0 ;
this->faddr = "ini_faddr";
this->daddr = "ini_daddr";
// 时间戳
struct timeval start ;
gettimeofday(&start , NULL) ;
this->start_ts = to_string(start.tv_sec) ;
pthread_create(&(this->m_tid) , NULL , run, (void*)&m_face) ;
this->state = true ;
}
Ndn_socket::~Ndn_socket(){
}
// 设置某特定prefix的兴趣监听
int Ndn_socket::listen(const char *prefix){
this->maddr = prefix ;
//cout << "set filter " << prefix << endl ;
m_face.setInterestFilter(prefix,
bind(&Ndn_socket::onInterest, this, _1, _2) ,
RegisterPrefixSuccessCallback() ,
bind(&Ndn_socket::onRegisterFailed, this, _1, _2)) ;
return 0 ;
}
// 设置要将数据推送到的目的prefix
int Ndn_socket::set_daddr(const char * prefix){
if(prefix!=this->daddr){
// 向新的ndn_socket发送数据
// 重置seq
this->seq=0;
// 重置start_ts => 保证后续数据名的唯一性
// 长度无限增长是个bug,后期可优化
this->start_ts = (this->start_ts+'0' );
}
this->daddr = prefix ;
return 0 ;
}
// 查询要将数据推送到的目的prefix
string Ndn_socket::get_daddr(){
return this->daddr ;
}
int Ndn_socket::write(const string& data) {
return this->write(data.data() , data.length() , this->maddr) ;
}
int Ndn_socket::write(const char * data , int len ) {
return this->write(data , len , this->maddr) ;
}
int Ndn_socket::write(const uint8_t * data , int len) {
return this->write((char*)data, len , this->maddr) ;
}
int Ndn_socket::write(const uint8_t * data , int len , string dname_base) {
return this->write((char*)data, len , dname_base) ;
}
// 给数据段加段头
int Ndn_socket::makeData(unsigned int seq,unsigned int seq_first,
unsigned int len,const char *data,int datalen,char *resultData){
memcpy(&resultData[0],&seq,4);
memcpy(&resultData[4],&seq_first,4);
memcpy(&resultData[8],&len,4);
memcpy(&resultData[12],&data[0],datalen);
return 0;
}
// 解析数据段=>用于测试
int Ndn_socket::parseData(char* dataSeg,int datalen,char* rawdata){
unsigned int seq;
unsigned int seq_first;
unsigned int len;
// cout<<"szof(dataSeg): "<<sizeof(dataSeg)<<endl;
// cout<<"strlen(dataSeg): "<<strlen(dataSeg)<<endl;
cout<<"**********the parse result is: ***********"<<endl;
// char* data=(char*)malloc((datalen-HEAD_LEN)*sizeof(char));
memcpy(&seq,&dataSeg[0],4);
cout<<"seq: "<<seq<<endl;
memcpy(&seq_first,&dataSeg[4],4);
cout<<"seq_first: "<<seq_first<<endl;
memcpy(&len,&dataSeg[8],4);
cout<<"len: "<<len<<endl;
memcpy(rawdata,&dataSeg[12],(datalen-HEAD_LEN));
cout<<"rawdata: "<<rawdata<<endl;
cout<<"***************************************"<<endl;
return 0;
}
// brief : 往ndn_socket中写入数据
// param : data 数据
// len 数据长度
// dname_base 数据包的前缀名称
int Ndn_socket::write(const char * data , int len , string dname_base ) {
// 给本机前缀加一个start_ts命名段
if(dname_base[dname_base.length()-1] != '/') {
dname_base += ("/"+this->start_ts+"/") ;
}
// 计算拆分成的数据段的数目
int pkt_n = len/DATASEG_SZ;
if(len%DATASEG_SZ != 0) {
pkt_n ++ ;
}
// 给本机前缀加一个start_ts命名段、一个seq、一个"-seq+pkt_n""
// 用以附加在Block中传递给目的主机
string pre_payload = dname_base + to_string(seq) + "-" + to_string(seq+pkt_n) ;
// 记录该数据包的第一个数据段的seq
unsigned int seq_first=seq;
// 依次将分割出的每段数据放入face中
for (int i = 0; i < pkt_n; i++) {
Data data_pkt ;
// 设置数据名字(“本机字段+start_ts+seq”)
data_pkt.setName(dname_base + to_string(seq)) ;
// cout<<"data_pkt_name: "<<dname_base + to_string(seq)<<endl;
// c_len,当前数据段的长度
int c_len = DATASEG_SZ ;
if(len - i*DATASEG_SZ < DATASEG_SZ) {
c_len = len - i*DATASEG_SZ ;
}
// 加段头
// cout<<"c_len: "<<c_len<<endl;
char resultData[c_len+HEAD_LEN];
this->makeData(seq,seq_first,(unsigned int)len,data+i*DATASEG_SZ,c_len,resultData);
char rawdata[c_len];
// this->parseData(resultData,c_len+HEAD_LEN,rawdata);
// 设置数据内容:这里需要修改数据格式,加一个头部
data_pkt.setContent(reinterpret_cast<const uint8_t*>(resultData), c_len+HEAD_LEN) ;
// data_pkt.setContent(reinterpret_cast<const uint8_t*>(data+i*DATASEG_SZ), c_len) ;
//...
this->m_keyChain.sign(data_pkt) ;
//...
this->m_face.put(data_pkt) ;
// seq顺序递增
seq ++ ;
}
// 给目的地前缀加一个start_ts命名段、一个seq,用以设置目的兴趣包的包名
string pre_iname = daddr +"/"+ this->start_ts + "/" + to_string(seq) ;
// 设置interest
Interest pre_int(Name(pre_iname.data())) ;
// cout<<"pre_interest name: "<<pre_iname<<endl;
pre_int.setInterestLifetime(1_s) ;
pre_int.setMustBeFresh(true) ;
Block app_param = makeBinaryBlock(tlv::AppPrivateBlock1+1,
pre_payload.data(), pre_payload.length());
pre_int.setParameters(app_param) ;
// 发送预请求兴趣包
this->m_face.expressInterest(pre_int ,
bind(&Ndn_socket::onData_pre,this,_1,_2),
bind(&Ndn_socket::onNack_pre,this,_1,_2),
bind(&Ndn_socket::onTimeout_pre,this,_1));
//cout << "pre I>> : " << pre_int.getName() << endl;
return len ;
}
// 接收数据(从队列取出,放入data)
int Ndn_socket::read(char *data , int buf_sz) {
qp.r_queue.wait4data() ;
if(this->state == false) {
return -1 ;
}
//获取队列中的数据长度
//(队列中数据较多,则按照read函数的参数取数据)
int data_len = qp.r_queue.get_data_len() ;
if(data_len > buf_sz) {
data_len = buf_sz ;
}
// 拿到数据
qp.r_queue.get_ndata(data , data_len) ;
qp.r_queue.rmv_n(data_len) ;
// 返回实际拿到的数据长度
return data_len ;
}
// 关闭face
// 1. 不需要关闭线程m_tid吗?=> 直接关闭开ndn_socket的这个进程,也一样效果
// 2. 为何要r_queue.push_ndata(&c_flag,1) ;???=> 让这个队列的waitdata()不阻塞
// 用来给read()函数返回一个-1,而不是无限循环等待。
int Ndn_socket::close(){
if(this->state == false ) {
return 0 ;
}
this->state = false ;
char c_flag = 'c' ;
qp.r_queue.push_ndata(&c_flag,1) ;
// 等待兴趣包完全被处理,就关掉face
while(m_face.getNPendingInterests() > 0){
usleep(10000) ;
}
this->m_face.shutdown() ;
return 0 ;
}
// 开启face
void *Ndn_socket::run(void *param){
Face *face_p = (Face*)param ;
face_p->processEvents(time::milliseconds::zero(), true) ;
}
// -----------------------NDN中的常见函数-----------------------
void Ndn_socket::onInterest(const InterestFilter& filter,
const Interest& interest) {
//cout << "onInterest : " << interest.getName() << endl ;
if(interest.hasParameters()){
uint8_t p_type = 0 ;
// 为何不是“memcpy(&p_type , interest.getParameters().type() , 1) ;”?
// 不是要取Block的Type吗??这里可以进一步探究尝试。
memcpy(&p_type , interest.getParameters().value() , 1) ;
// memcpy(&p_type , interest.getParameters().type() , 1) ;
if(p_type == tlv::AppPrivateBlock1 + 1) { // 预请求包
// 取Block
Block dname_block(interest.getParameters().value() ,
interest.getParameters().value_size()) ;
// 取Block的数值
string datas_name((char*)dname_block.value() ,
dname_block.value_size()) ;
// cout << "datas_name : " <<datas_name << endl ;
// format : /ndn/edu/pkusz/node11/vpn/5-9
// 拿到seq以及seq+pkt_n
int idx1 = datas_name.rfind('/')+1 ;
int first, last ;
sscanf(datas_name.data()+idx1 , "%d-%d" , &first , &last) ;
string iname_base = datas_name.substr(0, idx1) ;
// 判断是否更换了新的ndn_sockt
if(iname_base!=this->faddr){
qp.initSelf();
this->faddr=iname_base;
}
// 依次请求该条信息的每个数据段
//(数据产生者的前缀隐藏在Block中)
for (int i = first; i < last; i++) {
string interest_name = iname_base+ std::to_string(i) ;
// cout<<"data interest name: "<<interest_name<<endl;
Interest request_int(Name(interest_name.data())) ;
request_int.setInterestLifetime(1_s) ;
this->m_face.expressInterest(request_int ,
bind(&Ndn_socket::onData,this,_1,_2),
bind(&Ndn_socket::onNack,this,_1,_2),
bind(&Ndn_socket::onTimeout,this,_1));
//cout << "I>> : " << request_int.getName() << endl ;
}
}
}
}
// 发送兴趣包请求到的数据立即放入缓存队列
void Ndn_socket::onData(const Interest& interest , const Data& data){
int data_sz = data.getContent().value_size() ;
// cout<<"data_sz: "<<data_sz<<endl;
// r_queue.push_ndata(reinterpret_cast<const char*>(data.getContent().value()),
// data_sz) ;
qp.pushDataSegment(reinterpret_cast<const char*>(data.getContent().value()),
data_sz);
//cout << "D<< :" << data.getName() << " sz = " << data_sz << endl ;
}
// 预请求兴趣包的onData
void Ndn_socket::onData_pre(const Interest& interest , const Data& data){
//cout << "pre D<< :" << data.getName() << " sz = " <<
//data.getContent().value_size() << endl ;
}
void Ndn_socket::onNack(const Interest& interest, const Nack& nack){
cout << "Nack : "<< interest.getName() << endl ;
long lifetime = interest.getInterestLifetime().count() ;
if(lifetime > 3000) return ;
if(this->state == false) return ;
sleep(1) ;
Interest interest_new(interest.getName());
interest_new.setMustBeFresh(true) ;
boost::chrono::milliseconds new_lifetime(lifetime+200) ;
interest_new.setInterestLifetime(new_lifetime);
this->m_face.expressInterest(interest_new,
bind(&Ndn_socket::onData,this,_1,_2),
bind(&Ndn_socket::onNack,this,_1,_2),
bind(&Ndn_socket::onTimeout,this,_1));
cout << "I>> : " << interest_new.getName() << endl ;
//this->m_face.shutdown() ;
}
// 预请求兴趣包的onNack
void Ndn_socket::onNack_pre(const Interest& interest, const Nack& nack){
cout << "pre Nack : "<< interest.getName() << endl ;
cout << "listen : " << this->maddr << endl ;
long lifetime = interest.getInterestLifetime().count() ;
if(lifetime > 3000 || this->state == false ) return ;
sleep(1) ;
Interest interest_new(interest.getName());
interest_new.setMustBeFresh(true) ;
boost::chrono::milliseconds new_lifetime(lifetime+200) ;
interest_new.setInterestLifetime(new_lifetime);
interest_new.setParameters(interest.getParameters());
this->m_face.expressInterest(interest_new,
bind(&Ndn_socket::onData_pre,this,_1,_2),
bind(&Ndn_socket::onNack_pre,this,_1,_2),
bind(&Ndn_socket::onTimeout_pre,this,_1));
cout << "pre I>> : " << interest_new.getName() << endl ;
}
void Ndn_socket::onTimeout(const Interest& interest) {
cout << "Time out " << interest.getName() << endl ;
long lifetime = interest.getInterestLifetime().count() ;
if(lifetime > 3000) return ;
if(this->state == false) return ;
Interest interest_new(interest.getName());
boost::chrono::milliseconds new_lifetime(lifetime+200) ;
interest_new.setInterestLifetime(new_lifetime);
this->m_face.expressInterest(interest_new,
bind(&Ndn_socket::onData,this,_1,_2),
bind(&Ndn_socket::onNack,this,_1,_2),
bind(&Ndn_socket::onTimeout,this,_1));
}
// 预请求兴趣包的onTimeout
void Ndn_socket::onTimeout_pre(const Interest& interest) {
}
// face注册时,注册失败的提示信息
void Ndn_socket::onRegisterFailed(const Name& prefix, const std::string& reason)
{
std::cerr << "ERROR: Failed to register prefix \""
<< prefix << "\" in local hub's daemon (" << reason << ")"
<< std::endl;
m_face.shutdown();
}