29 changed files with 3343 additions and 1465 deletions
@ -0,0 +1,309 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* |
||||
* This test is equivalent to simple-distributed but tests boundary cases |
||||
* when one of the ranks has no Nodes on it. When run on two tasks |
||||
* rank 0 will have all the Nodes and rank 1 will be empty: |
||||
*
|
||||
* ------- ------- |
||||
* RANK 0 RANK 0 |
||||
* ------- | ------- |
||||
* | |
||||
* n0 ---------| | |---------- n6 |
||||
* | | | |
||||
* n1 -------\ | | | /------- n7 |
||||
* n4 ----------|---------- n5 |
||||
* n2 -------/ | | | \------- n8 |
||||
* | | | |
||||
* n3 ---------| | |---------- n9 |
||||
* |
||||
* |
||||
* When run on three tasks rank 1 has the left half of the Nodes and rank 2 |
||||
* will be empty. |
||||
* |
||||
* ------- ------- |
||||
* RANK 0 RANK 1 |
||||
* ------- | ------- |
||||
* | |
||||
* n0 ---------| | |---------- n6 |
||||
* | | | |
||||
* n1 -------\ | | | /------- n7 |
||||
* n4 ----------|---------- n5 |
||||
* n2 -------/ | | | \------- n8 |
||||
* | | | |
||||
* n3 ---------| | |---------- n9 |
||||
* |
||||
* OnOff clients are placed on each left leaf node. Each right leaf node |
||||
* is a packet sink for a left leaf node. As a packet travels from one |
||||
* logical processor to another (the link between n4 and n5), MPI messages |
||||
* are passed containing the serialized packet. The message is then |
||||
* deserialized into a new packet and sent on as normal. |
||||
* |
||||
* One packet is sent from each left leaf node. The packet sinks on the |
||||
* right leaf nodes output logging information when they receive the packet. |
||||
*/ |
||||
|
||||
#include "ns3/core-module.h" |
||||
#include "ns3/network-module.h" |
||||
#include "ns3/mpi-interface.h" |
||||
#include "ns3/ipv4-global-routing-helper.h" |
||||
#include "ns3/ipv4-static-routing-helper.h" |
||||
#include "ns3/ipv4-list-routing-helper.h" |
||||
#include "ns3/point-to-point-helper.h" |
||||
#include "ns3/internet-stack-helper.h" |
||||
#include "ns3/ipv4-nix-vector-helper.h" |
||||
#include "ns3/ipv4-address-helper.h" |
||||
#include "ns3/on-off-helper.h" |
||||
#include "ns3/packet-sink-helper.h" |
||||
|
||||
#ifdef NS3_MPI |
||||
#include <mpi.h> |
||||
#endif |
||||
|
||||
using namespace ns3; |
||||
|
||||
NS_LOG_COMPONENT_DEFINE ("SimpleDistributed"); |
||||
|
||||
int |
||||
main (int argc, char *argv[]) |
||||
{ |
||||
#ifdef NS3_MPI |
||||
|
||||
bool nix = true; |
||||
bool nullmsg = false; |
||||
bool tracing = false; |
||||
|
||||
// Parse command line
|
||||
CommandLine cmd; |
||||
cmd.AddValue ("nix", "Enable the use of nix-vector or global routing", nix); |
||||
cmd.AddValue ("nullmsg", "Enable the use of null-message synchronization", nullmsg); |
||||
cmd.AddValue ("tracing", "Enable pcap tracing", tracing); |
||||
cmd.Parse (argc, argv); |
||||
|
||||
// Distributed simulation setup; by default use granted time window algorithm.
|
||||
if(nullmsg)
|
||||
{ |
||||
GlobalValue::Bind ("SimulatorImplementationType", |
||||
StringValue ("ns3::NullMessageSimulatorImpl")); |
||||
}
|
||||
else
|
||||
{ |
||||
GlobalValue::Bind ("SimulatorImplementationType", |
||||
StringValue ("ns3::DistributedSimulatorImpl")); |
||||
} |
||||
|
||||
MpiInterface::Enable (&argc, &argv); |
||||
|
||||
LogComponentEnable ("PacketSink", LOG_LEVEL_INFO); |
||||
|
||||
uint32_t systemId = MpiInterface::GetSystemId (); |
||||
uint32_t systemCount = MpiInterface::GetSize (); |
||||
|
||||
uint32_t rightHalfSystemId = 777; |
||||
|
||||
// Check for valid distributed parameters.
|
||||
// Must have 2 or 3 tasks.
|
||||
if (systemCount == 2) |
||||
{ |
||||
rightHalfSystemId = 0; |
||||
} |
||||
else if (systemCount == 3) |
||||
{ |
||||
rightHalfSystemId = 1; |
||||
} |
||||
else |
||||
{ |
||||
std::cout << "This simulation requires 2 or 3 logical processors." << std::endl; |
||||
return 1; |
||||
} |
||||
|
||||
// Some default values
|
||||
Config::SetDefault ("ns3::OnOffApplication::PacketSize", UintegerValue (512)); |
||||
Config::SetDefault ("ns3::OnOffApplication::DataRate", StringValue ("1Mbps")); |
||||
Config::SetDefault ("ns3::OnOffApplication::MaxBytes", UintegerValue (512)); |
||||
|
||||
// Create leaf nodes on left with system id 0
|
||||
NodeContainer leftLeafNodes; |
||||
leftLeafNodes.Create (4, 0); |
||||
|
||||
// Create router nodes. Left router with system id 0, right router
|
||||
// with system id dependent on number of processors using
|
||||
// rightHalfSystemId
|
||||
NodeContainer routerNodes; |
||||
Ptr<Node> routerNode1 = CreateObject<Node> (0); |
||||
Ptr<Node> routerNode2 = CreateObject<Node> (rightHalfSystemId); |
||||
routerNodes.Add (routerNode1); |
||||
routerNodes.Add (routerNode2); |
||||
|
||||
// Create leaf nodes on left with system id rightHalfSystemId
|
||||
NodeContainer rightLeafNodes; |
||||
rightLeafNodes.Create (4, rightHalfSystemId); |
||||
|
||||
PointToPointHelper routerLink; |
||||
routerLink.SetDeviceAttribute ("DataRate", StringValue ("5Mbps")); |
||||
routerLink.SetChannelAttribute ("Delay", StringValue ("5ms")); |
||||
|
||||
PointToPointHelper leafLink; |
||||
leafLink.SetDeviceAttribute ("DataRate", StringValue ("1Mbps")); |
||||
leafLink.SetChannelAttribute ("Delay", StringValue ("2ms")); |
||||
|
||||
// Add link connecting routers
|
||||
NetDeviceContainer routerDevices; |
||||
routerDevices = routerLink.Install (routerNodes); |
||||
|
||||
// Add links for left side leaf nodes to left router
|
||||
NetDeviceContainer leftRouterDevices; |
||||
NetDeviceContainer leftLeafDevices; |
||||
for (uint32_t i = 0; i < 4; ++i) |
||||
{ |
||||
NetDeviceContainer temp = leafLink.Install (leftLeafNodes.Get (i), routerNodes.Get (0)); |
||||
leftLeafDevices.Add (temp.Get (0)); |
||||
leftRouterDevices.Add (temp.Get (1)); |
||||
} |
||||
|
||||
// Add links for right side leaf nodes to right router
|
||||
NetDeviceContainer rightRouterDevices; |
||||
NetDeviceContainer rightLeafDevices; |
||||
for (uint32_t i = 0; i < 4; ++i) |
||||
{ |
||||
NetDeviceContainer temp = leafLink.Install (rightLeafNodes.Get (i), routerNodes.Get (1)); |
||||
rightLeafDevices.Add (temp.Get (0)); |
||||
rightRouterDevices.Add (temp.Get (1)); |
||||
} |
||||
|
||||
InternetStackHelper stack; |
||||
Ipv4NixVectorHelper nixRouting; |
||||
Ipv4StaticRoutingHelper staticRouting; |
||||
|
||||
Ipv4ListRoutingHelper list; |
||||
list.Add (staticRouting, 0); |
||||
list.Add (nixRouting, 10); |
||||
|
||||
if (nix) |
||||
{ |
||||
stack.SetRoutingHelper (list); // has effect on the next Install ()
|
||||
} |
||||
|
||||
stack.InstallAll (); |
||||
|
||||
Ipv4InterfaceContainer routerInterfaces; |
||||
Ipv4InterfaceContainer leftLeafInterfaces; |
||||
Ipv4InterfaceContainer leftRouterInterfaces; |
||||
Ipv4InterfaceContainer rightLeafInterfaces; |
||||
Ipv4InterfaceContainer rightRouterInterfaces; |
||||
|
||||
Ipv4AddressHelper leftAddress; |
||||
leftAddress.SetBase ("10.1.1.0", "255.255.255.0"); |
||||
|
||||
Ipv4AddressHelper routerAddress; |
||||
routerAddress.SetBase ("10.2.1.0", "255.255.255.0"); |
||||
|
||||
Ipv4AddressHelper rightAddress; |
||||
rightAddress.SetBase ("10.3.1.0", "255.255.255.0"); |
||||
|
||||
// Router-to-Router interfaces
|
||||
routerInterfaces = routerAddress.Assign (routerDevices); |
||||
|
||||
// Left interfaces
|
||||
for (uint32_t i = 0; i < 4; ++i) |
||||
{ |
||||
NetDeviceContainer ndc; |
||||
ndc.Add (leftLeafDevices.Get (i)); |
||||
ndc.Add (leftRouterDevices.Get (i)); |
||||
Ipv4InterfaceContainer ifc = leftAddress.Assign (ndc); |
||||
leftLeafInterfaces.Add (ifc.Get (0)); |
||||
leftRouterInterfaces.Add (ifc.Get (1)); |
||||
leftAddress.NewNetwork (); |
||||
} |
||||
|
||||
// Right interfaces
|
||||
for (uint32_t i = 0; i < 4; ++i) |
||||
{ |
||||
NetDeviceContainer ndc; |
||||
ndc.Add (rightLeafDevices.Get (i)); |
||||
ndc.Add (rightRouterDevices.Get (i)); |
||||
Ipv4InterfaceContainer ifc = rightAddress.Assign (ndc); |
||||
rightLeafInterfaces.Add (ifc.Get (0)); |
||||
rightRouterInterfaces.Add (ifc.Get (1)); |
||||
rightAddress.NewNetwork (); |
||||
} |
||||
|
||||
if (!nix) |
||||
{ |
||||
Ipv4GlobalRoutingHelper::PopulateRoutingTables (); |
||||
} |
||||
|
||||
if (tracing == true) |
||||
{ |
||||
if (systemId == 0) |
||||
{ |
||||
routerLink.EnablePcap("router-left", routerDevices, true); |
||||
leafLink.EnablePcap("leaf-left", leftLeafDevices, true); |
||||
} |
||||
|
||||
if (systemId == rightHalfSystemId) |
||||
{ |
||||
routerLink.EnablePcap("router-right", routerDevices, true); |
||||
leafLink.EnablePcap("leaf-right", rightLeafDevices, true); |
||||
} |
||||
} |
||||
|
||||
// Create a packet sink on the right leafs to receive packets from left leafs
|
||||
uint16_t port = 50000; |
||||
if (systemId == rightHalfSystemId) |
||||
{ |
||||
Address sinkLocalAddress (InetSocketAddress (Ipv4Address::GetAny (), port)); |
||||
PacketSinkHelper sinkHelper ("ns3::UdpSocketFactory", sinkLocalAddress); |
||||
ApplicationContainer sinkApp; |
||||
for (uint32_t i = 0; i < 4; ++i) |
||||
{ |
||||
sinkApp.Add (sinkHelper.Install (rightLeafNodes.Get (i))); |
||||
} |
||||
sinkApp.Start (Seconds (1.0)); |
||||
sinkApp.Stop (Seconds (5)); |
||||
} |
||||
|
||||
// Create the OnOff applications to send
|
||||
if (systemId == 0) |
||||
{ |
||||
OnOffHelper clientHelper ("ns3::UdpSocketFactory", Address ()); |
||||
clientHelper.SetAttribute |
||||
("OnTime", StringValue ("ns3::ConstantRandomVariable[Constant=1]")); |
||||
clientHelper.SetAttribute |
||||
("OffTime", StringValue ("ns3::ConstantRandomVariable[Constant=0]")); |
||||
|
||||
ApplicationContainer clientApps; |
||||
for (uint32_t i = 0; i < 4; ++i) |
||||
{ |
||||
AddressValue remoteAddress |
||||
(InetSocketAddress (rightLeafInterfaces.GetAddress (i), port)); |
||||
clientHelper.SetAttribute ("Remote", remoteAddress); |
||||
clientApps.Add (clientHelper.Install (leftLeafNodes.Get (i))); |
||||
} |
||||
clientApps.Start (Seconds (1.0)); |
||||
clientApps.Stop (Seconds (5)); |
||||
} |
||||
|
||||
Simulator::Stop (Seconds (5)); |
||||
Simulator::Run (); |
||||
Simulator::Destroy (); |
||||
// Exit the MPI execution environment
|
||||
MpiInterface::Disable (); |
||||
return 0; |
||||
#else |
||||
NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
||||
#endif |
||||
} |
@ -0,0 +1,337 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: George Riley <riley@ece.gatech.edu> |
||||
* |
||||
*/ |
||||
|
||||
// This object contains static methods that provide an easy interface
|
||||
// to the necessary MPI information.
|
||||
|
||||
#include <iostream> |
||||
#include <iomanip> |
||||
#include <list> |
||||
|
||||
#include "granted-time-window-mpi-interface.h" |
||||
#include "mpi-receiver.h" |
||||
#include "mpi-interface.h" |
||||
|
||||
#include "ns3/node.h" |
||||
#include "ns3/node-list.h" |
||||
#include "ns3/net-device.h" |
||||
#include "ns3/simulator.h" |
||||
#include "ns3/simulator-impl.h" |
||||
#include "ns3/nstime.h" |
||||
#include "ns3/log.h" |
||||
|
||||
#ifdef NS3_MPI |
||||
#include <mpi.h> |
||||
#endif |
||||
|
||||
namespace ns3 { |
||||
|
||||
NS_LOG_COMPONENT_DEFINE ("GrantedTimeWindowMpiInterface"); |
||||
|
||||
|
||||
SentBuffer::SentBuffer () |
||||
{ |
||||
m_buffer = 0; |
||||
m_request = 0; |
||||
} |
||||
|
||||
SentBuffer::~SentBuffer () |
||||
{ |
||||
delete [] m_buffer; |
||||
} |
||||
|
||||
uint8_t* |
||||
SentBuffer::GetBuffer () |
||||
{ |
||||
return m_buffer; |
||||
} |
||||
|
||||
void |
||||
SentBuffer::SetBuffer (uint8_t* buffer) |
||||
{ |
||||
m_buffer = buffer; |
||||
} |
||||
|
||||
#ifdef NS3_MPI |
||||
MPI_Request* |
||||
SentBuffer::GetRequest () |
||||
{ |
||||
return &m_request; |
||||
} |
||||
#endif |
||||
|
||||
uint32_t GrantedTimeWindowMpiInterface::m_sid = 0; |
||||
uint32_t GrantedTimeWindowMpiInterface::m_size = 1; |
||||
bool GrantedTimeWindowMpiInterface::m_initialized = false; |
||||
bool GrantedTimeWindowMpiInterface::m_enabled = false; |
||||
uint32_t GrantedTimeWindowMpiInterface::m_rxCount = 0; |
||||
uint32_t GrantedTimeWindowMpiInterface::m_txCount = 0; |
||||
std::list<SentBuffer> GrantedTimeWindowMpiInterface::m_pendingTx; |
||||
|
||||
#ifdef NS3_MPI |
||||
MPI_Request* GrantedTimeWindowMpiInterface::m_requests; |
||||
char** GrantedTimeWindowMpiInterface::m_pRxBuffers; |
||||
#endif |
||||
|
||||
TypeId
|
||||
GrantedTimeWindowMpiInterface::GetTypeId (void) |
||||
{ |
||||
static TypeId tid = TypeId ("ns3::GrantedTimeWindowMpiInterface") |
||||
.SetParent<Object> () |
||||
; |
||||
return tid; |
||||
} |
||||
|
||||
void |
||||
GrantedTimeWindowMpiInterface::Destroy () |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
#ifdef NS3_MPI |
||||
for (uint32_t i = 0; i < GetSize (); ++i) |
||||
{ |
||||
delete [] m_pRxBuffers[i]; |
||||
} |
||||
delete [] m_pRxBuffers; |
||||
delete [] m_requests; |
||||
|
||||
m_pendingTx.clear (); |
||||
#endif |
||||
} |
||||
|
||||
uint32_t |
||||
GrantedTimeWindowMpiInterface::GetRxCount () |
||||
{ |
||||
return m_rxCount; |
||||
} |
||||
|
||||
uint32_t |
||||
GrantedTimeWindowMpiInterface::GetTxCount () |
||||
{ |
||||
return m_txCount; |
||||
} |
||||
|
||||
uint32_t |
||||
GrantedTimeWindowMpiInterface::GetSystemId () |
||||
{ |
||||
if (!m_initialized) |
||||
{ |
||||
Simulator::GetImplementation (); |
||||
m_initialized = true; |
||||
} |
||||
return m_sid; |
||||
} |
||||
|
||||
uint32_t |
||||
GrantedTimeWindowMpiInterface::GetSize () |
||||
{ |
||||
if (!m_initialized) |
||||
{ |
||||
Simulator::GetImplementation (); |
||||
m_initialized = true; |
||||
} |
||||
return m_size; |
||||
} |
||||
|
||||
bool |
||||
GrantedTimeWindowMpiInterface::IsEnabled () |
||||
{ |
||||
if (!m_initialized) |
||||
{ |
||||
Simulator::GetImplementation (); |
||||
m_initialized = true; |
||||
} |
||||
return m_enabled; |
||||
} |
||||
|
||||
void |
||||
GrantedTimeWindowMpiInterface::Enable (int* pargc, char*** pargv) |
||||
{ |
||||
NS_LOG_FUNCTION (this << pargc << pargv);
|
||||
|
||||
#ifdef NS3_MPI |
||||
// Initialize the MPI interface
|
||||
MPI_Init (pargc, pargv); |
||||
MPI_Barrier (MPI_COMM_WORLD); |
||||
MPI_Comm_rank (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_sid)); |
||||
MPI_Comm_size (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_size)); |
||||
m_enabled = true; |
||||
m_initialized = true; |
||||
// Post a non-blocking receive for all peers
|
||||
m_pRxBuffers = new char*[m_size]; |
||||
m_requests = new MPI_Request[m_size]; |
||||
for (uint32_t i = 0; i < GetSize (); ++i) |
||||
{ |
||||
m_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE]; |
||||
MPI_Irecv (m_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, |
||||
MPI_COMM_WORLD, &m_requests[i]); |
||||
} |
||||
#else |
||||
NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
||||
#endif |
||||
} |
||||
|
||||
void |
||||
GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev) |
||||
{ |
||||
NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev); |
||||
|
||||
#ifdef NS3_MPI |
||||
SentBuffer sendBuf; |
||||
m_pendingTx.push_back (sendBuf); |
||||
std::list<SentBuffer>::reverse_iterator i = m_pendingTx.rbegin (); // Points to the last element
|
||||
|
||||
uint32_t serializedSize = p->GetSerializedSize (); |
||||
uint8_t* buffer = new uint8_t[serializedSize + 16]; |
||||
i->SetBuffer (buffer); |
||||
// Add the time, dest node and dest device
|
||||
uint64_t t = rxTime.GetInteger (); |
||||
uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer); |
||||
*pTime++ = t; |
||||
uint32_t* pData = reinterpret_cast<uint32_t *> (pTime); |
||||
*pData++ = node; |
||||
*pData++ = dev; |
||||
// Serialize the packet
|
||||
p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize); |
||||
|
||||
// Find the system id for the destination node
|
||||
Ptr<Node> destNode = NodeList::GetNode (node); |
||||
uint32_t nodeSysId = destNode->GetSystemId (); |
||||
|
||||
MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId, |
||||
0, MPI_COMM_WORLD, (i->GetRequest ())); |
||||
m_txCount++; |
||||
#else |
||||
NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
||||
#endif |
||||
} |
||||
|
||||
void |
||||
GrantedTimeWindowMpiInterface::ReceiveMessages () |
||||
{
|
||||
NS_LOG_FUNCTION_NOARGS (); |
||||
|
||||
#ifdef NS3_MPI |
||||
// Poll the non-block reads to see if data arrived
|
||||
while (true) |
||||
{ |
||||
int flag = 0; |
||||
int index = 0; |
||||
MPI_Status status; |
||||
|
||||
MPI_Testany (MpiInterface::GetSize (), m_requests, &index, &flag, &status); |
||||
if (!flag) |
||||
{ |
||||
break; // No more messages
|
||||
} |
||||
int count; |
||||
MPI_Get_count (&status, MPI_CHAR, &count); |
||||
m_rxCount++; // Count this receive
|
||||
|
||||
// Get the meta data first
|
||||
uint64_t* pTime = reinterpret_cast<uint64_t *> (m_pRxBuffers[index]); |
||||
uint64_t time = *pTime++; |
||||
uint32_t* pData = reinterpret_cast<uint32_t *> (pTime); |
||||
uint32_t node = *pData++; |
||||
uint32_t dev = *pData++; |
||||
|
||||
Time rxTime (time); |
||||
|
||||
count -= sizeof (time) + sizeof (node) + sizeof (dev); |
||||
|
||||
Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true); |
||||
|
||||
// Find the correct node/device to schedule receive event
|
||||
Ptr<Node> pNode = NodeList::GetNode (node); |
||||
Ptr<MpiReceiver> pMpiRec = 0; |
||||
uint32_t nDevices = pNode->GetNDevices (); |
||||
for (uint32_t i = 0; i < nDevices; ++i) |
||||
{ |
||||
Ptr<NetDevice> pThisDev = pNode->GetDevice (i); |
||||
if (pThisDev->GetIfIndex () == dev) |
||||
{ |
||||
pMpiRec = pThisDev->GetObject<MpiReceiver> (); |
||||
break; |
||||
} |
||||
} |
||||
|
||||
NS_ASSERT (pNode && pMpiRec); |
||||
|
||||
// Schedule the rx event
|
||||
Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (), |
||||
&MpiReceiver::Receive, pMpiRec, p); |
||||
|
||||
// Re-queue the next read
|
||||
MPI_Irecv (m_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, |
||||
MPI_COMM_WORLD, &m_requests[index]); |
||||
} |
||||
#else |
||||
NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
||||
#endif |
||||
} |
||||
|
||||
void |
||||
GrantedTimeWindowMpiInterface::TestSendComplete () |
||||
{ |
||||
NS_LOG_FUNCTION_NOARGS (); |
||||
|
||||
#ifdef NS3_MPI |
||||
std::list<SentBuffer>::iterator i = m_pendingTx.begin (); |
||||
while (i != m_pendingTx.end ()) |
||||
{ |
||||
MPI_Status status; |
||||
int flag = 0; |
||||
MPI_Test (i->GetRequest (), &flag, &status); |
||||
std::list<SentBuffer>::iterator current = i; // Save current for erasing
|
||||
i++; // Advance to next
|
||||
if (flag) |
||||
{ // This message is complete
|
||||
m_pendingTx.erase (current); |
||||
} |
||||
} |
||||
#else |
||||
NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
||||
#endif |
||||
} |
||||
|
||||
void |
||||
GrantedTimeWindowMpiInterface::Disable () |
||||
{ |
||||
NS_LOG_FUNCTION_NOARGS (); |
||||
|
||||
#ifdef NS3_MPI |
||||
int flag = 0; |
||||
MPI_Initialized (&flag); |
||||
if (flag) |
||||
{ |
||||
MPI_Finalize (); |
||||
m_enabled = false; |
||||
m_initialized = false; |
||||
} |
||||
else |
||||
{ |
||||
NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first"); |
||||
} |
||||
#else |
||||
NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); |
||||
#endif |
||||
} |
||||
|
||||
|
||||
} // namespace ns3
|
@ -0,0 +1,175 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: George Riley <riley@ece.gatech.edu> |
||||
* |
||||
*/ |
||||
|
||||
// This object contains static methods that provide an easy interface
|
||||
// to the necessary MPI information.
|
||||
|
||||
#ifndef NS3_GRANTED_TIME_WINDOW_MPI_INTERFACE_H |
||||
#define NS3_GRANTED_TIME_WINDOW_MPI_INTERFACE_H |
||||
|
||||
#include <stdint.h> |
||||
#include <list> |
||||
|
||||
#include "ns3/nstime.h" |
||||
#include "ns3/buffer.h" |
||||
|
||||
#include "parallel-communication-interface.h" |
||||
|
||||
#ifdef NS3_MPI |
||||
#include "mpi.h" |
||||
#else |
||||
typedef void* MPI_Request; |
||||
#endif |
||||
|
||||
namespace ns3 { |
||||
|
||||
/**
|
||||
* maximum MPI message size for easy |
||||
* buffer creation |
||||
*/ |
||||
const uint32_t MAX_MPI_MSG_SIZE = 2000; |
||||
|
||||
/**
|
||||
* \ingroup mpi |
||||
* |
||||
* \brief Tracks non-blocking sends |
||||
* |
||||
* This class is used to keep track of the asynchronous non-blocking |
||||
* sends that have been posted. |
||||
*/ |
||||
class SentBuffer |
||||
{ |
||||
public: |
||||
SentBuffer (); |
||||
~SentBuffer (); |
||||
|
||||
/**
|
||||
* \return pointer to sent buffer |
||||
*/ |
||||
uint8_t* GetBuffer (); |
||||
/**
|
||||
* \param buffer pointer to sent buffer |
||||
*/ |
||||
void SetBuffer (uint8_t* buffer); |
||||
/**
|
||||
* \return MPI request |
||||
*/ |
||||
MPI_Request* GetRequest (); |
||||
|
||||
private: |
||||
uint8_t* m_buffer; |
||||
MPI_Request m_request; |
||||
}; |
||||
|
||||
class Packet; |
||||
|
||||
/**
|
||||
* \ingroup mpi |
||||
* |
||||
* \brief Interface between ns-3 and MPI |
||||
* |
||||
* Implements the interface used by the singleton parallel controller |
||||
* to interface between NS3 and the communications layer being |
||||
* used for inter-task packet transfers. |
||||
*/ |
||||
class GrantedTimeWindowMpiInterface : public ParallelCommunicationInterface, Object |
||||
{ |
||||
public: |
||||
static TypeId GetTypeId (void); |
||||
|
||||
/**
|
||||
* Delete all buffers |
||||
*/ |
||||
virtual void Destroy (); |
||||
/**
|
||||
* \return MPI rank |
||||
*/ |
||||
virtual uint32_t GetSystemId (); |
||||
/**
|
||||
* \return MPI size (number of systems) |
||||
*/ |
||||
virtual uint32_t GetSize (); |
||||
/**
|
||||
* \return true if using MPI |
||||
*/ |
||||
virtual bool IsEnabled (); |
||||
/**
|
||||
* \param pargc number of command line arguments |
||||
* \param pargv command line arguments |
||||
* |
||||
* Sets up MPI interface |
||||
*/ |
||||
virtual void Enable (int* pargc, char*** pargv); |
||||
/**
|
||||
* Terminates the MPI environment by calling MPI_Finalize |
||||
* This function must be called after Destroy () |
||||
* It also resets m_initialized, m_enabled |
||||
*/ |
||||
virtual void Disable (); |
||||
/**
|
||||
* \param p packet to send |
||||
* \param rxTime received time at destination node |
||||
* \param node destination node |
||||
* \param dev destination device |
||||
* |
||||
* Serialize and send a packet to the specified node and net device |
||||
*/ |
||||
virtual void SendPacket (Ptr<Packet> p, const Time &rxTime, uint32_t node, uint32_t dev); |
||||
/**
|
||||
* Check for received messages complete |
||||
*/ |
||||
static void ReceiveMessages (); |
||||
/**
|
||||
* Check for completed sends |
||||
*/ |
||||
static void TestSendComplete (); |
||||
/**
|
||||
* \return received count in packets |
||||
*/ |
||||
static uint32_t GetRxCount (); |
||||
/**
|
||||
* \return transmitted count in packets |
||||
*/ |
||||
static uint32_t GetTxCount (); |
||||
|
||||
private: |
||||
static uint32_t m_sid; |
||||
static uint32_t m_size; |
||||
|
||||
// Total packets received
|
||||
static uint32_t m_rxCount; |
||||
|
||||
// Total packets sent
|
||||
static uint32_t m_txCount; |
||||
static bool m_initialized; |
||||
static bool m_enabled; |
||||
|
||||
// Pending non-blocking receives
|
||||
static MPI_Request* m_requests; |
||||
|
||||
// Data buffers for non-blocking reads
|
||||
static char** m_pRxBuffers; |
||||
|
||||
// List of pending non-blocking sends
|
||||
static std::list<SentBuffer> m_pendingTx; |
||||
}; |
||||
|
||||
} // namespace ns3
|
||||
|
||||
#endif /* NS3_GRANTED_TIME_WINDOW_MPI_INTERFACE_H */ |
@ -0,0 +1,460 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* Copyright 2013. Lawrence Livermore National Security, LLC. |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: Steven Smith <smith84@llnl.gov> |
||||
* |
||||
*/ |
||||
|
||||
#include "null-message-mpi-interface.h" |
||||
|
||||
#include "null-message-simulator-impl.h" |
||||
#include "remote-channel-bundle-manager.h" |
||||
#include "remote-channel-bundle.h" |
||||
|
||||
#include "ns3/mpi-receiver.h" |
||||
#include "ns3/node.h" |
||||
#include "ns3/node-list.h" |
||||
#include "ns3/net-device.h" |
||||
#include "ns3/nstime.h" |
||||
#include "ns3/simulator.h" |
||||
#include "ns3/log.h" |
||||
|
||||
#ifdef NS3_MPI |
||||
#include <mpi.h> |
||||
#endif |
||||
|
||||
#include <iostream> |
||||
#include <iomanip> |
||||
#include <list> |
||||
|
||||
NS_LOG_COMPONENT_DEFINE ("NullMessageMpiInterface"); |
||||
|
||||
namespace ns3 { |
||||
|
||||
/**
|
||||
* maximum MPI message size for easy |
||||
* buffer creation |
||||
*/ |
||||
const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE = 2000; |
||||
|
||||
|
||||
NullMessageSentBuffer::NullMessageSentBuffer () |
||||
{ |
||||
m_buffer = 0; |
||||
m_request = 0; |
||||
} |
||||
|
||||
NullMessageSentBuffer::~NullMessageSentBuffer () |
||||
{ |
||||
delete [] m_buffer; |
||||
} |
||||
|
||||
uint8_t* |
||||
NullMessageSentBuffer::GetBuffer () |
||||
{ |
||||
return m_buffer; |
||||
} |
||||
|
||||
void |
||||
NullMessageSentBuffer::SetBuffer (uint8_t* buffer) |
||||
{ |
||||
m_buffer = buffer; |
||||
} |
||||
|
||||
MPI_Request* |
||||
NullMessageSentBuffer::GetRequest () |
||||
{ |
||||
return &m_request; |
||||
} |
||||
|
||||
uint32_t NullMessageMpiInterface::g_sid = 0; |
||||
uint32_t NullMessageMpiInterface::g_size = 1; |
||||
uint32_t NullMessageMpiInterface::g_numNeighbors = 0; |
||||
bool NullMessageMpiInterface::g_initialized = false; |
||||
bool NullMessageMpiInterface::g_enabled = false; |
||||
std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx; |
||||
|
||||
MPI_Request* NullMessageMpiInterface::g_requests; |
||||
char** NullMessageMpiInterface::g_pRxBuffers; |
||||
|
||||
NullMessageMpiInterface::NullMessageMpiInterface () |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
#ifndef NS3_MPI |
||||
/*
|
||||
* This class can only be constructed if MPI is available. Fail if an
|
||||
* attempt is made to instantiate this class without MPI. |
||||
*/ |
||||
NS_FATAL_ERROR ("Must compile with MPI if Null Message simulator is used, see --enable-mpi option for waf"); |
||||
#endif |
||||
} |
||||
|
||||
NullMessageMpiInterface::~NullMessageMpiInterface () |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
} |
||||
|
||||
void |
||||
NullMessageMpiInterface::Destroy () |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
} |
||||
|
||||
uint32_t |
||||
NullMessageMpiInterface::GetSystemId () |
||||
{ |
||||
NS_ASSERT (g_enabled); |
||||
return g_sid; |
||||
} |
||||
|
||||
uint32_t |
||||
NullMessageMpiInterface::GetSize () |
||||
{ |
||||
NS_ASSERT (g_enabled); |
||||
return g_size; |
||||
} |
||||
|
||||
bool |
||||
NullMessageMpiInterface::IsEnabled () |
||||
{ |
||||
if (!g_initialized) |
||||
{ |
||||
Simulator::GetImplementation (); |
||||
g_initialized = true; |
||||
} |
||||
return g_enabled; |
||||
} |
||||
|
||||
void |
||||
NullMessageMpiInterface::Enable (int* pargc, char*** pargv) |
||||
{ |
||||
NS_LOG_FUNCTION (this << *pargc); |
||||
#ifdef NS3_MPI |
||||
|
||||
// Initialize the MPI interface
|
||||
MPI_Init (pargc, pargv); |
||||
MPI_Barrier (MPI_COMM_WORLD); |
||||
|
||||
// SystemId and Size are unit32_t in interface but MPI uses int so convert.
|
||||
int mpiSystemId; |
||||
int mpiSize; |
||||
MPI_Comm_rank (MPI_COMM_WORLD, &mpiSystemId); |
||||
MPI_Comm_size (MPI_COMM_WORLD, &mpiSize); |
||||
|
||||
g_sid = mpiSystemId; |
||||
g_size = mpiSize; |
||||
|
||||
g_enabled = true; |
||||
g_initialized = true; |
||||
|
||||
#endif |
||||
} |
||||
|
||||
void
|
||||
NullMessageMpiInterface::InitializeSendReceiveBuffers(void) |
||||
{ |
||||
NS_LOG_FUNCTION_NOARGS (); |
||||
#ifdef NS3_MPI |
||||
NS_ASSERT (g_enabled); |
||||
|
||||
g_numNeighbors = RemoteChannelBundleManager::Size(); |
||||
|
||||
// Post a non-blocking receive for all peers
|
||||
g_requests = new MPI_Request[g_numNeighbors]; |
||||
g_pRxBuffers = new char*[g_numNeighbors]; |
||||
int index = 0; |
||||
for (uint32_t rank = 0; rank < g_size; ++rank) |
||||
{ |
||||
Ptr<RemoteChannelBundle> bundle = RemoteChannelBundleManager::Find(rank); |
||||
if (bundle)
|
||||
{ |
||||
g_pRxBuffers[index] = new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE]; |
||||
MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, rank, 0, |
||||
MPI_COMM_WORLD, &g_requests[index]); |
||||
++index; |
||||
} |
||||
} |
||||
#endif |
||||
} |
||||
|
||||
void |
||||
NullMessageMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev) |
||||
{ |
||||
NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev); |
||||
|
||||
NS_ASSERT (g_enabled); |
||||
|
||||
#ifdef NS3_MPI |
||||
|
||||
// Find the system id for the destination node
|
||||
Ptr<Node> destNode = NodeList::GetNode (node); |
||||
uint32_t nodeSysId = destNode->GetSystemId (); |
||||
|
||||
NullMessageSentBuffer sendBuf; |
||||
g_pendingTx.push_back (sendBuf); |
||||
std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
|
||||
|
||||
uint32_t serializedSize = p->GetSerializedSize (); |
||||
uint32_t bufferSize = serializedSize + ( 2 * sizeof (uint64_t) ) + ( 2 * sizeof (uint32_t) ); |
||||
uint8_t* buffer = new uint8_t[bufferSize]; |
||||
iter->SetBuffer (buffer); |
||||
// Add the time, dest node and dest device
|
||||
uint64_t t = rxTime.GetInteger (); |
||||
uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer); |
||||
*pTime++ = t; |
||||
|
||||
Time guarantee_update = NullMessageSimulatorImpl::GetInstance ()->CalculateGuaranteeTime (nodeSysId); |
||||
*pTime++ = guarantee_update.GetTimeStep (); |
||||
|
||||
uint32_t* pData = reinterpret_cast<uint32_t *> (pTime); |
||||
*pData++ = node; |
||||
*pData++ = dev; |
||||
// Serialize the packet
|
||||
p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize); |
||||
|
||||
MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId, |
||||
0, MPI_COMM_WORLD, (iter->GetRequest ())); |
||||
|
||||
NullMessageSimulatorImpl::GetInstance ()->RescheduleNullMessageEvent (nodeSysId); |
||||
|
||||
#endif |
||||
} |
||||
|
||||
void |
||||
NullMessageMpiInterface::SendNullMessage (const Time& guarantee_update, Ptr<RemoteChannelBundle> bundle) |
||||
{ |
||||
NS_LOG_FUNCTION (guarantee_update.GetTimeStep () << bundle); |
||||
|
||||
NS_ASSERT (g_enabled); |
||||
|
||||
#ifdef NS3_MPI |
||||
|
||||
NullMessageSentBuffer sendBuf; |
||||
g_pendingTx.push_back (sendBuf); |
||||
std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
|
||||
|
||||
uint32_t bufferSize = 2 * sizeof (uint64_t) + 2 * sizeof (uint32_t); |
||||
uint8_t* buffer = new uint8_t[bufferSize]; |
||||
iter->SetBuffer (buffer); |
||||
// Add the time, dest node and dest device
|
||||
uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer); |
||||
*pTime++ = 0; |
||||
*pTime++ = guarantee_update.GetInteger (); |
||||
uint32_t* pData = reinterpret_cast<uint32_t *> (pTime); |
||||
*pData++ = 0; |
||||
*pData++ = 0; |
||||
|
||||
// Find the system id for the destination MPI rank
|
||||
uint32_t nodeSysId = bundle->GetSystemId (); |
||||
|
||||
MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId, |
||||
0, MPI_COMM_WORLD, (iter->GetRequest ())); |
||||
#endif |
||||
} |
||||
|
||||
void |
||||
NullMessageMpiInterface::ReceiveMessagesBlocking () |
||||
{ |
||||
NS_LOG_FUNCTION_NOARGS (); |
||||
|
||||
ReceiveMessages(true); |
||||
} |
||||
|
||||
|
||||
void |
||||
NullMessageMpiInterface::ReceiveMessagesNonBlocking () |
||||
{ |
||||
NS_LOG_FUNCTION_NOARGS (); |
||||
|
||||
ReceiveMessages(false); |
||||
} |
||||
|
||||
|
||||
void |
||||
NullMessageMpiInterface::ReceiveMessages (bool blocking) |
||||
{ |
||||
NS_LOG_FUNCTION (blocking); |
||||
|
||||
NS_ASSERT (g_enabled); |
||||
|
||||
#ifdef NS3_MPI |
||||
|
||||
// stop flag set to true when no more messages are found to
|
||||
// process.
|
||||
bool stop = false; |
||||
|
||||
|
||||
if (!g_numNeighbors) { |
||||
// Not communicating with anyone.
|
||||
return; |
||||
} |
||||
|
||||
do |
||||
{ |
||||
int messageReceived = 0; |
||||
int index = 0; |
||||
MPI_Status status; |
||||
|
||||
if (blocking) |
||||
{ |
||||
MPI_Waitany (g_numNeighbors, g_requests, &index, &status); |
||||
messageReceived = 1; /* Wait always implies message was received */ |
||||
stop = true; |
||||
} |
||||
else |
||||
{ |
||||
MPI_Testany (g_numNeighbors, g_requests, &index, &messageReceived, &status); |
||||
} |
||||
|
||||
if (messageReceived) |
||||
{ |
||||
int count; |
||||
MPI_Get_count (&status, MPI_CHAR, &count); |
||||
|
||||
// Get the meta data first
|
||||
uint64_t* pTime = reinterpret_cast<uint64_t *> (g_pRxBuffers[index]); |
||||
uint64_t time = *pTime++; |
||||
uint64_t guaranteeUpdate = *pTime++; |
||||
|
||||
uint32_t* pData = reinterpret_cast<uint32_t *> (pTime); |
||||
uint32_t node = *pData++; |
||||
uint32_t dev = *pData++; |
||||
|
||||
Time rxTime (time); |
||||
|
||||
// rxtime == 0 means this is a Null Message
|
||||
if (rxTime > 0) |
||||
{ |
||||
count -= sizeof (time) + sizeof (guaranteeUpdate) + sizeof (node) + sizeof (dev); |
||||
|
||||
Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true); |
||||
|
||||
// Find the correct node/device to schedule receive event
|
||||
Ptr<Node> pNode = NodeList::GetNode (node); |
||||
Ptr<MpiReceiver> pMpiRec = 0; |
||||
uint32_t nDevices = pNode->GetNDevices (); |
||||
for (uint32_t i = 0; i < nDevices; ++i) |
||||
{ |
||||
Ptr<NetDevice> pThisDev = pNode->GetDevice (i); |
||||
if (pThisDev->GetIfIndex () == dev) |
||||
{ |
||||
pMpiRec = pThisDev->GetObject<MpiReceiver> (); |
||||
break; |
||||
} |
||||
} |
||||
NS_ASSERT (pNode && pMpiRec); |
||||
|
||||
// Schedule the rx event
|
||||
Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (), |
||||
&MpiReceiver::Receive, pMpiRec, p); |
||||
|
||||
} |
||||
|
||||
// Update guarantee time for both packet receives and Null Messages.
|
||||
Ptr<RemoteChannelBundle> bundle = RemoteChannelBundleManager::Find (status.MPI_SOURCE); |
||||
NS_ASSERT (bundle); |
||||
|
||||
bundle->SetGuaranteeTime (Time (guaranteeUpdate)); |
||||
|
||||
// Re-queue the next read
|
||||
MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, status.MPI_SOURCE, 0, |
||||
MPI_COMM_WORLD, &g_requests[index]); |
||||
|
||||
} |
||||
else |
||||
{ |
||||
// if non-blocking and no message received in testany then stop message loop
|
||||
stop = true; |
||||
} |
||||
} |
||||
while (!stop); |
||||
#endif |
||||
} |
||||
|
||||
void |
||||
NullMessageMpiInterface::TestSendComplete () |
||||
{ |
||||
NS_LOG_FUNCTION_NOARGS (); |
||||
|
||||
NS_ASSERT (g_enabled); |
||||
|
||||
#ifdef NS3_MPI |
||||
std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin (); |
||||
while (iter != g_pendingTx.end ()) |
||||
{ |
||||
MPI_Status status; |
||||
int flag = 0; |
||||
MPI_Test (iter->GetRequest (), &flag, &status); |
||||
std::list<NullMessageSentBuffer>::iterator current = iter; // Save current for erasing
|
||||
++iter; // Advance to next
|
||||
if (flag) |
||||
{ // This message is complete
|
||||
g_pendingTx.erase (current); |
||||
} |
||||
} |
||||
#endif |
||||
} |
||||
|
||||
void |
||||
NullMessageMpiInterface::Disable () |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
#ifdef NS3_MPI |
||||
int flag = 0; |
||||
MPI_Initialized (&flag); |
||||
if (flag) |
||||
{ |
||||
|
||||
for (std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin (); |
||||
iter != g_pendingTx.end (); |
||||
++iter) |
||||
{ |
||||
MPI_Cancel (iter->GetRequest ()); |
||||
MPI_Request_free (iter->GetRequest ()); |
||||
} |
||||
|
||||
for (uint32_t i = 0; i < g_numNeighbors; ++i) |
||||
{ |
||||
MPI_Cancel (&g_requests[i]); |
||||
MPI_Request_free (&g_requests[i]); |
||||
} |
||||
|
||||
MPI_Finalize (); |
||||
|
||||
for (uint32_t i = 0; i < g_numNeighbors; ++i) |
||||
{ |
||||
delete [] g_pRxBuffers[i]; |
||||
} |
||||
delete [] g_pRxBuffers; |
||||
delete [] g_requests; |
||||
|
||||
g_pendingTx.clear (); |
||||
|
||||
g_enabled = false; |
||||
g_initialized = false; |
||||
|
||||
} |
||||
else |
||||
{ |
||||
NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first"); |
||||
} |
||||
#endif |
||||
} |
||||
|
||||
} // namespace ns3
|
@ -0,0 +1,228 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* Copyright 2013. Lawrence Livermore National Security, LLC. |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: Steven Smith <smith84@llnl.gov> |
||||
* |
||||
*/ |
||||
|
||||
#ifndef NS3_NULLMESSAGE_MPI_INTERFACE_H |
||||
#define NS3_NULLMESSAGE_MPI_INTERFACE_H |
||||
|
||||
#include "parallel-communication-interface.h" |
||||
|
||||
#include <ns3/nstime.h> |
||||
#include <ns3/buffer.h> |
||||
|
||||
#ifdef NS3_MPI |
||||
#include "mpi.h" |
||||
#else |
||||
typedef void* MPI_Request; |
||||
#endif |
||||
|
||||
#include <list> |
||||
|
||||
namespace ns3 { |
||||
|
||||
class RemoteChannelBundle; |
||||
class Packet; |
||||
|
||||
/**
|
||||
* \ingroup mpi |
||||
* |
||||
* \brief Non-blocking send buffers for Null Message implementation. |
||||
*
|
||||
* One buffer is allocated for each non-blocking send. |
||||
*/ |
||||
class NullMessageSentBuffer |
||||
{ |
||||
public: |
||||
NullMessageSentBuffer (); |
||||
~NullMessageSentBuffer (); |
||||
|
||||
/**
|
||||
* \return pointer to sent buffer |
||||
*/ |
||||
uint8_t* GetBuffer (); |
||||
/**
|
||||
* \param buffer pointer to sent buffer |
||||
*/ |
||||
void SetBuffer (uint8_t* buffer); |
||||
/**
|
||||
* \return MPI request |
||||
*/ |
||||
MPI_Request* GetRequest (); |
||||
|
||||
private: |
||||
|
||||
/**
|
||||
* Buffer for send. |
||||
*/ |
||||
uint8_t* m_buffer; |
||||
|
||||
/**
|
||||
* MPI request posted for the send. |
||||
*/ |
||||
MPI_Request m_request; |
||||
}; |
||||
|
||||
/**
|
||||
* \ingroup mpi |
||||
* |
||||
* \brief Interface between ns-3 and MPI for the Null Message |
||||
* distributed simulation implementation. |
||||
*/ |
||||
class NullMessageMpiInterface : public ParallelCommunicationInterface |
||||
{ |
||||
public: |
||||
|
||||
NullMessageMpiInterface (); |
||||
~NullMessageMpiInterface (); |
||||
|
||||
/**
|
||||
* Delete all buffers |
||||
*/ |
||||
virtual void Destroy (); |
||||
/**
|
||||
* \return system id (MPI rank) |
||||
*/ |
||||
virtual uint32_t GetSystemId (); |
||||
/**
|
||||
* \return number of systems (MPI size) |
||||
*/ |
||||
virtual uint32_t GetSize (); |
||||
/**
|
||||
* \return true if interface is enabled |
||||
*/ |
||||
virtual bool IsEnabled (); |
||||
/**
|
||||
* \param pargc number of command line arguments |
||||
* \param pargv command line arguments |
||||
* |
||||
* Sets up interface. Calls MPI Init and
|
||||
* posts receives. |
||||
*/ |
||||
virtual void Enable (int* pargc, char*** pargv); |
||||
/**
|
||||
* Terminates the MPI environment by calling MPI_Finalize This |
||||
* function must be called after Destroy (). Resets m_initialized |
||||
* and m_enabled. |
||||
*/ |
||||
virtual void Disable (); |
||||
/**
|
||||
* \param p packet to send |
||||
* \param rxTime received time at destination node |
||||
* \param node destination node |
||||
* \param dev destination device |
||||
* |
||||
* Serialize and send a packet to the specified node and net device. |
||||
* |
||||
* \internal |
||||
* The MPI buffer format packs a delivery information and the serialized packet. |
||||
* |
||||
* uint64_t time the packed should be delivered |
||||
* uint64_t guarantee time for the Null Message algorithm. |
||||
* uint32_t node id of destination |
||||
* unit32_t dev id on destination |
||||
* uint8_t[] serialized packet |
||||
* \endinternal |
||||
*/ |
||||
virtual void SendPacket (Ptr<Packet> p, const Time &rxTime, uint32_t node, uint32_t dev); |
||||
/**
|
||||
* \param guaranteeUpdate guarantee update time for the Null Message |
||||
* \bundle the destination bundle for the Null Message. |
||||
* |
||||
* \brief Send a Null Message to across the specified bundle.
|
||||
* |
||||
* Guarantee update time is the lower bound time on the next |
||||
* possible event from this MPI task to the remote MPI task across |
||||
* the bundle. Remote task may execute events up to time. |
||||
* |
||||
* Null Messages are sent when a packet has not been sent across |
||||
* this bundle in order to allow time advancement on the remote |
||||
* MPI task. |
||||
* |
||||
* \internal |
||||
* The Null Message MPI buffer format is based on the format for sending a packet with |
||||
* several fields set to 0 to signal that it is a Null Message. Overloading the normal packet |
||||
* format simplifies receive logic. |
||||
* |
||||
* uint64_t 0 must be zero for Null Message |
||||
* uint64_t guarantee time |
||||
* uint32_t 0 must be zero for Null Message |
||||
* uint32_t 0 must be zero for Null Message |
||||
* \endinternal |
||||
*/ |
||||
static void SendNullMessage (const Time& guaranteeUpdate, Ptr<RemoteChannelBundle> bundle); |
||||
/**
|
||||
* Non-blocking check for received messages complete. Will |
||||
* receive all messages that are queued up locally. |
||||
*/ |
||||
static void ReceiveMessagesNonBlocking (); |
||||
/**
|
||||
* Blocking message receive. Will block until at least one message |
||||
* has been received. |
||||
*/ |
||||
static void ReceiveMessagesBlocking (); |
||||
/**
|
||||
* Check for completed sends |
||||
*/ |
||||
static void TestSendComplete (); |
||||
|
||||
/**
|
||||
* \brief Initialize send and receive buffers. |
||||
* |
||||
* This method should be called after all links have been added to the RemoteChannelBundle |
||||
* manager to setup any required send and receive buffers. |
||||
*/ |
||||
static void InitializeSendReceiveBuffers (void); |
||||
|
||||
private: |
||||
|
||||
/**
|
||||
* Check for received messages complete. Will block until message |
||||
* has been received if blocking flag is true. When blocking will |
||||
* return after the first message is received. Non-blocking mode will |
||||
* Non-blocking check for received messages complete. Will |
||||
* receive all messages that are queued up locally. |
||||
*/ |
||||
static void ReceiveMessages (bool blocking = false); |
||||
|
||||
// System ID (rank) for this task
|
||||
static uint32_t g_sid; |
||||
|
||||
// Size of the MPI COM_WORLD group.
|
||||
static uint32_t g_size; |
||||
|
||||
// Number of neighbor tasks, tasks that this task shares a link with.
|
||||
static uint32_t g_numNeighbors; |
||||
|
||||
static bool g_initialized; |
||||
static bool g_enabled; |
||||
|
||||
// Pending non-blocking receives
|
||||
static MPI_Request* g_requests; |
||||
|
||||
// Data buffers for non-blocking receives
|
||||
static char** g_pRxBuffers; |
||||
|
||||
// List of pending non-blocking sends
|
||||
static std::list<NullMessageSentBuffer> g_pendingTx; |
||||
}; |
||||
|
||||
} // namespace ns3
|
||||
|
||||
#endif /* NS3_NULL_MESSAGE_MPI_INTERFACE_H */ |
@ -0,0 +1,603 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* Copyright 2013. Lawrence Livermore National Security, LLC. |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: Steven Smith <smith84@llnl.gov> |
||||
* |
||||
*/ |
||||
|
||||
#include "null-message-simulator-impl.h" |
||||
|
||||
#include "null-message-mpi-interface.h" |
||||
#include "remote-channel-bundle-manager.h" |
||||
#include "remote-channel-bundle.h" |
||||
#include "mpi-interface.h" |
||||
|
||||
#include <ns3/simulator.h> |
||||
#include <ns3/scheduler.h> |
||||
#include <ns3/event-impl.h> |
||||
#include <ns3/channel.h> |
||||
#include <ns3/node-container.h> |
||||
#include <ns3/double.h> |
||||
#include <ns3/ptr.h> |
||||
#include <ns3/pointer.h> |
||||
#include <ns3/assert.h> |
||||
#include <ns3/log.h> |
||||
|
||||
#include <cmath> |
||||
#include <iostream> |
||||
#include <fstream> |
||||
#include <iomanip> |
||||
|
||||
NS_LOG_COMPONENT_DEFINE ("NullMessageSimulatorImpl"); |
||||
|
||||
namespace ns3 { |
||||
|
||||
NS_OBJECT_ENSURE_REGISTERED (NullMessageSimulatorImpl); |
||||
|
||||
NullMessageSimulatorImpl* NullMessageSimulatorImpl::g_instance = 0; |
||||
|
||||
TypeId |
||||
NullMessageSimulatorImpl::GetTypeId (void) |
||||
{ |
||||
static TypeId tid = TypeId ("ns3::NullMessageSimulatorImpl") |
||||
.SetParent<Object> () |
||||
.AddConstructor<NullMessageSimulatorImpl> () |
||||
.AddAttribute ("SchedulerTune", "Null Message scheduler tuning parameter", |
||||
DoubleValue (1.0), |
||||
MakeDoubleAccessor (&NullMessageSimulatorImpl::m_schedulerTune), |
||||
MakeDoubleChecker<double> (0.01,1.0)) |
||||
; |
||||
return tid; |
||||
} |
||||
|
||||
NullMessageSimulatorImpl::NullMessageSimulatorImpl () |
||||
{ |
||||
#ifdef NS3_MPI |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
m_myId = MpiInterface::GetSystemId (); |
||||
m_systemCount = MpiInterface::GetSize (); |
||||
|
||||
m_stop = false; |
||||
// uids are allocated from 4.
|
||||
// uid 0 is "invalid" events
|
||||
// uid 1 is "now" events
|
||||
// uid 2 is "destroy" events
|
||||
m_uid = 4; |
||||
// before ::Run is entered, the m_currentUid will be zero
|
||||
m_currentUid = 0; |
||||
m_currentTs = 0; |
||||
m_currentContext = 0xffffffff; |
||||
m_unscheduledEvents = 0; |
||||
m_events = 0; |
||||
|
||||
m_safeTime = Seconds (0); |
||||
|
||||
NS_ASSERT (g_instance == 0); |
||||
g_instance = this; |
||||
|
||||
#else |
||||
NS_FATAL_ERROR ("Can't use Null Message simulator without MPI compiled in"); |
||||
#endif |
||||
} |
||||
|
||||
NullMessageSimulatorImpl::~NullMessageSimulatorImpl () |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::DoDispose (void) |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
while (!m_events->IsEmpty ()) |
||||
{ |
||||
Scheduler::Event next = m_events->RemoveNext (); |
||||
next.impl->Unref (); |
||||
} |
||||
m_events = 0; |
||||
SimulatorImpl::DoDispose (); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::Destroy () |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
while (!m_destroyEvents.empty ()) |
||||
{ |
||||
Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl (); |
||||
m_destroyEvents.pop_front (); |
||||
NS_LOG_LOGIC ("handle destroy " << ev); |
||||
if (!ev->IsCancelled ()) |
||||
{ |
||||
ev->Invoke (); |
||||
} |
||||
} |
||||
|
||||
RemoteChannelBundleManager::Destroy(); |
||||
MpiInterface::Destroy (); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::CalculateLookAhead (void) |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
int num_local_nodes = 0; |
||||
|
||||
if (MpiInterface::GetSize () > 1) |
||||
{ |
||||
NodeContainer c = NodeContainer::GetGlobal (); |
||||
for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter) |
||||
{ |
||||
if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ()) |
||||
{ |
||||
continue; |
||||
} |
||||
|
||||
num_local_nodes++; |
||||
|
||||
for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i) |
||||
{ |
||||
Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i); |
||||
// only works for p2p links currently
|
||||
if (!localNetDevice->IsPointToPoint ()) |
||||
{ |
||||
continue; |
||||
} |
||||
Ptr<Channel> channel = localNetDevice->GetChannel (); |
||||
if (channel == 0) |
||||
{ |
||||
continue; |
||||
} |
||||
|
||||
// grab the adjacent node
|
||||
Ptr<Node> remoteNode; |
||||
if (channel->GetDevice (0) == localNetDevice) |
||||
{ |
||||
remoteNode = (channel->GetDevice (1))->GetNode (); |
||||
} |
||||
else |
||||
{ |
||||
remoteNode = (channel->GetDevice (0))->GetNode (); |
||||
} |
||||
|
||||
// if it's not remote, don't consider it
|
||||
if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ()) |
||||
{ |
||||
continue; |
||||
} |
||||
|
||||
/**
|
||||
* Add this channel to the remote channel bundle from this task to MPI task on other side of the channel. |
||||
*/ |
||||
Ptr<RemoteChannelBundle> remoteChannelBundle = RemoteChannelBundleManager::Find (remoteNode->GetSystemId ()); |
||||
if (!remoteChannelBundle) |
||||
{ |
||||
remoteChannelBundle = RemoteChannelBundleManager::Add (remoteNode->GetSystemId ()); |
||||
} |
||||
|
||||
TimeValue delay; |
||||
channel->GetAttribute ("Delay", delay); |
||||
remoteChannelBundle->AddChannel (channel, delay.Get () ); |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Completed setup of remote channel bundles. Setup send and receive buffers.
|
||||
NullMessageMpiInterface::InitializeSendReceiveBuffers (); |
||||
|
||||
// Initialized to 0 as we don't have a simulation start time.
|
||||
m_safeTime = Time (0); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::SetScheduler (ObjectFactory schedulerFactory) |
||||
{ |
||||
NS_LOG_FUNCTION (this << schedulerFactory); |
||||
|
||||
Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> (); |
||||
|
||||
if (m_events != 0) |
||||
{ |
||||
while (!m_events->IsEmpty ()) |
||||
{ |
||||
Scheduler::Event next = m_events->RemoveNext (); |
||||
scheduler->Insert (next); |
||||
} |
||||
} |
||||
m_events = scheduler; |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::ProcessOneEvent (void) |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
Scheduler::Event next = m_events->RemoveNext (); |
||||
|
||||
NS_ASSERT (next.key.m_ts >= m_currentTs); |
||||
m_unscheduledEvents--; |
||||
|
||||
NS_LOG_LOGIC ("handle " << next.key.m_ts); |
||||
m_currentTs = next.key.m_ts; |
||||
m_currentContext = next.key.m_context; |
||||
m_currentUid = next.key.m_uid; |
||||
next.impl->Invoke (); |
||||
next.impl->Unref (); |
||||
} |
||||
|
||||
bool |
||||
NullMessageSimulatorImpl::IsFinished (void) const |
||||
{ |
||||
return m_events->IsEmpty () || m_stop; |
||||
} |
||||
|
||||
Time |
||||
NullMessageSimulatorImpl::Next (void) const |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
NS_ASSERT (!m_events->IsEmpty ()); |
||||
|
||||
Scheduler::Event ev = m_events->PeekNext (); |
||||
return TimeStep (ev.key.m_ts); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::ScheduleNullMessageEvent (Ptr<RemoteChannelBundle> bundle) |
||||
{ |
||||
NS_LOG_FUNCTION (this << bundle); |
||||
|
||||
Time time (m_schedulerTune * bundle->GetDelay ().GetTimeStep ()); |
||||
|
||||
bundle->SetEventId (Simulator::Schedule (time, &NullMessageSimulatorImpl::NullMessageEventHandler,
|
||||
this, PeekPointer(bundle))); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::RescheduleNullMessageEvent (Ptr<RemoteChannelBundle> bundle) |
||||
{ |
||||
NS_LOG_FUNCTION (this << bundle); |
||||
|
||||
Simulator::Cancel (bundle->GetEventId ()); |
||||
|
||||
Time time (m_schedulerTune * bundle->GetDelay ().GetTimeStep ()); |
||||
|
||||
bundle->SetEventId (Simulator::Schedule (time, &NullMessageSimulatorImpl::NullMessageEventHandler,
|
||||
this, PeekPointer(bundle))); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::RescheduleNullMessageEvent (uint32_t nodeSysId) |
||||
{ |
||||
NS_LOG_FUNCTION (this << nodeSysId); |
||||
|
||||
Ptr<RemoteChannelBundle> bundle = RemoteChannelBundleManager::Find (nodeSysId); |
||||
NS_ASSERT (bundle); |
||||
|
||||
RescheduleNullMessageEvent (bundle); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::Run (void) |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
CalculateLookAhead (); |
||||
|
||||
RemoteChannelBundleManager::InitializeNullMessageEvents (); |
||||
|
||||
// Stop will be set if stop is called by simulation.
|
||||
m_stop = false; |
||||
while (!IsFinished ()) |
||||
{ |
||||
Time nextTime = Next (); |
||||
|
||||
if ( nextTime <= GetSafeTime () ) |
||||
{ |
||||
ProcessOneEvent (); |
||||
HandleArrivingMessagesNonBlocking (); |
||||
} |
||||
else |
||||
{ |
||||
// Block until packet or Null Message has been received.
|
||||
HandleArrivingMessagesBlocking (); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::HandleArrivingMessagesNonBlocking (void) |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
NullMessageMpiInterface::ReceiveMessagesNonBlocking (); |
||||
|
||||
CalculateSafeTime (); |
||||
|
||||
// Check for send completes
|
||||
NullMessageMpiInterface::TestSendComplete (); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::HandleArrivingMessagesBlocking (void) |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
NullMessageMpiInterface::ReceiveMessagesBlocking (); |
||||
|
||||
CalculateSafeTime (); |
||||
|
||||
// Check for send completes
|
||||
NullMessageMpiInterface::TestSendComplete (); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::CalculateSafeTime () |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
m_safeTime = RemoteChannelBundleManager::GetSafeTime (); |
||||
NS_ASSERT (m_safeTime >= m_currentTs); |
||||
} |
||||
|
||||
Time |
||||
NullMessageSimulatorImpl::GetSafeTime () |
||||
{ |
||||
return m_safeTime; |
||||
} |
||||
|
||||
|
||||
uint32_t |
||||
NullMessageSimulatorImpl::GetSystemId () const |
||||
{ |
||||
return m_myId; |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::RunOneEvent (void) |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
ProcessOneEvent (); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::Stop (void) |
||||
{ |
||||
NS_LOG_FUNCTION (this); |
||||
|
||||
m_stop = true; |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::Stop (Time const &time) |
||||
{ |
||||
NS_LOG_FUNCTION (this << time.GetTimeStep ()); |
||||
|
||||
Simulator::Schedule (time, &Simulator::Stop); |
||||
} |
||||
|
||||
//
|
||||
// Schedule an event for a _relative_ time in the future.
|
||||
//
|
||||
EventId |
||||
NullMessageSimulatorImpl::Schedule (Time const &time, EventImpl *event) |
||||
{ |
||||
NS_LOG_FUNCTION (this << time.GetTimeStep () << event); |
||||
|
||||
Time tAbsolute = time + TimeStep (m_currentTs); |
||||
|
||||
NS_ASSERT (tAbsolute.IsPositive ()); |
||||
NS_ASSERT (tAbsolute >= TimeStep (m_currentTs)); |
||||
Scheduler::Event ev; |
||||
ev.impl = event; |
||||
ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ()); |
||||
ev.key.m_context = GetContext (); |
||||
ev.key.m_uid = m_uid; |
||||
m_uid++; |
||||
m_unscheduledEvents++; |
||||
m_events->Insert (ev); |
||||
return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid); |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time, EventImpl *event) |
||||
{ |
||||
NS_LOG_FUNCTION (this << context << time.GetTimeStep () << m_currentTs << event); |
||||
|
||||
Time tAbsolute(m_currentTs + time.GetTimeStep ()); |
||||
|
||||
NS_ASSERT (tAbsolute.IsPositive ()); |
||||
NS_ASSERT (tAbsolute >= TimeStep (m_currentTs)); |
||||
|
||||
Scheduler::Event ev; |
||||
ev.impl = event; |
||||
ev.key.m_ts = tAbsolute.GetTimeStep (); |
||||
ev.key.m_context = context; |
||||
ev.key.m_uid = m_uid; |
||||
m_uid++; |
||||
m_unscheduledEvents++; |
||||
m_events->Insert (ev); |
||||
} |
||||
|
||||
EventId |
||||
NullMessageSimulatorImpl::ScheduleNow (EventImpl *event) |
||||
{ |
||||
NS_LOG_FUNCTION (this << event); |
||||
|
||||
Scheduler::Event ev; |
||||
ev.impl = event; |
||||
ev.key.m_ts = m_currentTs; |
||||
ev.key.m_context = GetContext (); |
||||
ev.key.m_uid = m_uid; |
||||
m_uid++; |
||||
m_unscheduledEvents++; |
||||
m_events->Insert (ev); |
||||
return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid); |
||||
} |
||||
|
||||
EventId |
||||
NullMessageSimulatorImpl::ScheduleDestroy (EventImpl *event) |
||||
{ |
||||
NS_LOG_FUNCTION (this << event); |
||||
|
||||
EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2); |
||||
m_destroyEvents.push_back (id); |
||||
m_uid++; |
||||
return id; |
||||
} |
||||
|
||||
Time |
||||
NullMessageSimulatorImpl::Now (void) const |
||||
{ |
||||
return TimeStep (m_currentTs); |
||||
} |
||||
|
||||
Time |
||||
NullMessageSimulatorImpl::GetDelayLeft (const EventId &id) const |
||||
{ |
||||
if (IsExpired (id)) |
||||
{ |
||||
return TimeStep (0); |
||||
} |
||||
else |
||||
{ |
||||
return TimeStep (id.GetTs () - m_currentTs); |
||||
} |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::Remove (const EventId &id) |
||||
{ |
||||
if (id.GetUid () == 2) |
||||
{ |
||||
// destroy events.
|
||||
for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++) |
||||
{ |
||||
if (*i == id) |
||||
{ |
||||
m_destroyEvents.erase (i); |
||||
break; |
||||
} |
||||
} |
||||
return; |
||||
} |
||||
if (IsExpired (id)) |
||||
{ |
||||
return; |
||||
} |
||||
Scheduler::Event event; |
||||
event.impl = id.PeekEventImpl (); |
||||
event.key.m_ts = id.GetTs (); |
||||
event.key.m_context = id.GetContext (); |
||||
event.key.m_uid = id.GetUid (); |
||||
m_events->Remove (event); |
||||
event.impl->Cancel (); |
||||
// whenever we remove an event from the event list, we have to unref it.
|
||||
event.impl->Unref (); |
||||
|
||||
m_unscheduledEvents--; |
||||
} |
||||
|
||||
void |
||||
NullMessageSimulatorImpl::Cancel (const EventId &id) |
||||
{ |
||||
if (!IsExpired (id)) |
||||
{ |
||||
id.PeekEventImpl ()->Cancel (); |
||||
} |
||||
} |
||||
|
||||
bool |
||||
NullMessageSimulatorImpl::IsExpired (const EventId &ev) const |
||||
{ |
||||
if (ev.GetUid () == 2) |
||||
{ |
||||
if (ev.PeekEventImpl () == 0 |
||||
|| ev.PeekEventImpl ()->IsCancelled ()) |
||||
{ |
||||
return true; |
||||
} |
||||
// destroy events.
|
||||
for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++) |
||||
{ |
||||
if (*i == ev) |
||||
{ |
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
if (ev.PeekEventImpl () == 0 |
||||
|| ev.GetTs () < m_currentTs |
||||
|| (ev.GetTs () == m_currentTs |
||||
&& ev.GetUid () <= m_currentUid) |
||||
|| ev.PeekEventImpl ()->IsCancelled ()) |
||||
{ |
||||
return true; |
||||
} |
||||
else |
||||
{ |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
Time |
||||
NullMessageSimulatorImpl::GetMaximumSimulationTime (void) const |
||||
{ |
||||
// XXX: I am fairly certain other compilers use other non-standard
|
||||
// post-fixes to indicate 64 bit constants.
|
||||
return TimeStep (0x7fffffffffffffffLL); |
||||
} |
||||
|
||||
uint32_t |
||||
NullMessageSimulatorImpl::GetContext (void) const |
||||
{ |
||||
return m_currentContext; |
||||
} |
||||
|
||||
Time NullMessageSimulatorImpl::CalculateGuaranteeTime (uint32_t nodeSysId) |
||||
{ |
||||
Ptr<RemoteChannelBundle> bundle = RemoteChannelBundleManager::Find (nodeSysId); |
||||
NS_ASSERT (bundle); |
||||
|
||||
return Min (NullMessageSimulatorImpl::GetInstance ()->Next (), GetSafeTime ()) + bundle->GetDelay (); |
||||
} |
||||
|
||||
void NullMessageSimulatorImpl::NullMessageEventHandler(RemoteChannelBundle* bundle) |
||||
{ |
||||
NS_LOG_FUNCTION (this << bundle); |
||||
|
||||
Time time = Min (Next (), GetSafeTime ()) + bundle->GetDelay (); |
||||
NullMessageMpiInterface::SendNullMessage (time, bundle); |
||||
|
||||
ScheduleNullMessageEvent (bundle); |
||||
} |
||||
|
||||
|
||||
NullMessageSimulatorImpl* |
||||
NullMessageSimulatorImpl::GetInstance (void) |
||||
{ |
||||
NS_ASSERT (g_instance != 0); |
||||
return g_instance; |
||||
} |
||||
} // namespace ns3
|
||||
|
@ -0,0 +1,212 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* Copyright 2013. Lawrence Livermore National Security, LLC. |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: Steven Smith <smith84@llnl.gov> |
||||
* |
||||
*/ |
||||
|
||||
#ifndef NULLMESSAGE_SIMULATOR_IMPL_H |
||||
#define NULLMESSAGE_SIMULATOR_IMPL_H |
||||
|
||||
#include <ns3/simulator-impl.h> |
||||
#include <ns3/scheduler.h> |
||||
#include <ns3/event-impl.h> |
||||
#include <ns3/ptr.h> |
||||
|
||||
#include <list> |
||||
#include <iostream> |
||||
#include <fstream> |
||||
|
||||
namespace ns3 { |
||||
|
||||
class NullMessageEvent; |
||||
class NullMessageMpiInterface; |
||||
class RemoteChannelBundle; |
||||
|
||||
/**
|
||||
* \ingroup mpi |
||||
* |
||||
* \brief Simulator implementation using MPI and a Null Message algorithm. |
||||
*/ |
||||
class NullMessageSimulatorImpl : public SimulatorImpl |
||||
{ |
||||
public: |
||||
static TypeId GetTypeId (void); |
||||
|
||||
NullMessageSimulatorImpl (); |
||||
|
||||
~NullMessageSimulatorImpl (); |
||||
|
||||
// virtual from SimulatorImpl
|
||||
virtual void Destroy (); |
||||
virtual bool IsFinished (void) const; |
||||
virtual void Stop (void); |
||||
virtual void Stop (Time const &time); |
||||
virtual EventId Schedule (Time const &time, EventImpl *event); |
||||
virtual void ScheduleWithContext (uint32_t context, Time const &time, EventImpl *event); |
||||
virtual EventId ScheduleNow (EventImpl *event); |
||||
virtual EventId ScheduleDestroy (EventImpl *event); |
||||
virtual void Remove (const EventId &ev); |
||||
virtual void Cancel (const EventId &ev); |
||||
virtual bool IsExpired (const EventId &ev) const; |
||||
virtual void Run (void); |
||||
virtual void RunOneEvent (void); |
||||
virtual Time Now (void) const; |
||||
virtual Time GetDelayLeft (const EventId &id) const; |
||||
virtual Time GetMaximumSimulationTime (void) const; |
||||
virtual void SetScheduler (ObjectFactory schedulerFactory); |
||||
virtual uint32_t GetSystemId (void) const; |
||||
virtual uint32_t GetContext (void) const; |
||||
|
||||
/**
|
||||
* \return singleton instance |
||||
* |
||||
* Singleton accessor. |
||||
*/ |
||||
static NullMessageSimulatorImpl * GetInstance (void); |
||||
|
||||
private: |
||||
friend class NullMessageEvent; |
||||
friend class NullMessageMpiInterface; |
||||
friend class RemoteChannelBundleManager; |
||||
|
||||
/**
|
||||
* Non blocking receive of pending messages.
|
||||
*/ |
||||
void HandleArrivingMessagesNonBlocking (void); |
||||
|
||||
/**
|
||||
* Blocking receive of arriving messages. |
||||
*/ |
||||
void HandleArrivingMessagesBlocking (void); |
||||
|
||||
virtual void DoDispose (void); |
||||
|
||||
/**
|
||||
* Calculate the look ahead allowable for this MPI task. Basically |
||||
* the minimum latency on links to neighbor MPI tasks. |
||||
*/ |
||||
void CalculateLookAhead (void); |
||||
|
||||
/**
|
||||
* Process the next event on the queue. |
||||
*/ |
||||
void ProcessOneEvent (void); |
||||
|
||||
/**
|
||||
* \return next local event time. |
||||
*/ |
||||
Time Next (void) const; |
||||
|
||||
/**
|
||||
* Calculate the SafeTime. Should be called after message receives. |
||||
*/ |
||||
void CalculateSafeTime (void); |
||||
|
||||
/**
|
||||
* Get the current SafeTime; the maximum time that events can |
||||
* be processed based on information received from neighboring |
||||
* MPI tasks. |
||||
*/ |
||||
Time GetSafeTime (void); |
||||
|
||||
/**
|
||||
* \param bundle Bundle to schedule Null Message event for |
||||
* |
||||
* Schedule Null Message event for the specified RemoteChannelBundle. |
||||
*/ |
||||
void ScheduleNullMessageEvent (Ptr<RemoteChannelBundle> bundle); |
||||
|
||||
/**
|
||||
* \param bundle Bundle to reschedule Null Message event for |
||||
* |
||||
* Reschedule Null Message event for the specified |
||||
* RemoteChannelBundel. Existing event will be canceled. |
||||
*/ |
||||
void RescheduleNullMessageEvent (Ptr<RemoteChannelBundle> bundle); |
||||
|
||||
/**
|
||||
* \param nodeSysId SystemID to reschedule null event for |
||||
* |
||||
* Reschedule Null Message event for the RemoteChannelBundel to the |
||||
* task nodeSysId. Existing event will be canceled. |
||||
*/ |
||||
void RescheduleNullMessageEvent (uint32_t nodeSysId); |
||||
|
||||
/**
|
||||
* \param systemId SystemID to compute guarentee time for |
||||
* |
||||
* \return Guarentee time |
||||
* |
||||
* Calculate the guarantee time for incoming RemoteChannelBundel |
||||
* from task nodeSysId. No message should arrive from task |
||||
* nodeSysId with a receive time less than the guarantee time. |
||||
*/ |
||||
Time CalculateGuaranteeTime (uint32_t systemId); |
||||
|
||||
/**
|
||||
* \param bundle remote channel bundle to schedule an event for. |
||||
* |
||||
* Null message event handler. Scheduled to send a null message |
||||
* for the specified bundle at regular intervals. Will canceled |
||||
* and rescheduled when packets are sent. |
||||
*/ |
||||
void NullMessageEventHandler(RemoteChannelBundle* bundle); |
||||
|
||||
typedef std::list<EventId> DestroyEvents; |
||||
|
||||
DestroyEvents m_destroyEvents; |
||||
bool m_stop; |
||||
Ptr<Scheduler> m_events; |
||||
uint32_t m_uid; |
||||
uint32_t m_currentUid; |
||||
uint64_t m_currentTs; |
||||
uint32_t m_currentContext; |
||||
// number of events that have been inserted but not yet scheduled,
|
||||
// not counting the "destroy" events; this is used for validation
|
||||
int m_unscheduledEvents; |
||||
|
||||
uint32_t m_myId; // MPI Rank
|
||||
uint32_t m_systemCount; // MPI Size
|
||||
|
||||
/*
|
||||
* The time for which it is safe for this task to execute events |
||||
* without danger of out-of-order events. |
||||
*/ |
||||
Time m_safeTime; |
||||
|
||||
/*
|
||||
* Null Message performance tuning parameter. Controls when Null |
||||
* messages are sent. When value is 1 the minimum number of Null |
||||
* messages are sent conserving bandwidth. The delay in arrival of |
||||
* lookahead information is the greatest resulting in maximum |
||||
* unnecessary blocking of the receiver. When the value is near 0 |
||||
* Null Messages are sent with high frequency, costing more |
||||
* bandwidth and Null Message processing time, but there is minimum |
||||
* unnecessary block of the receiver. |
||||
*/ |
||||
double m_schedulerTune; |
||||
|
||||
/*
|
||||
* Singleton instance. |
||||
*/ |
||||
static NullMessageSimulatorImpl* g_instance; |
||||
}; |
||||
|
||||
} // namespace ns3
|
||||
|
||||
#endif /* NULLMESSAGE_SIMULATOR_IMPL_H */ |
@ -0,0 +1,99 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* Copyright 2013. Lawrence Livermore National Security, LLC. |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: Steven Smith <smith84@llnl.gov> |
||||
* |
||||
*/ |
||||
|
||||
#ifndef NS3_PARALLEL_COMMUNICATION_INTERFACE_H |
||||
#define NS3_PARALLEL_COMMUNICATION_INTERFACE_H |
||||
|
||||
#include <stdint.h> |
||||
#include <list> |
||||
|
||||
#include <ns3/object.h> |
||||
#include <ns3/nstime.h> |
||||
#include <ns3/buffer.h> |
||||
#include <ns3/packet.h> |
||||
|
||||
#if defined(NS3_MPI) |
||||
#include "mpi.h" |
||||
#endif |
||||
|
||||
namespace ns3 { |
||||
|
||||
/**
|
||||
* \ingroup mpi |
||||
* |
||||
* \brief Pure virtual base class for the interface between ns-3 and |
||||
* the parallel communication layer being used. |
||||
* |
||||
* Each type of parallel communication layer is required to implement |
||||
* this interface. This interface is called through the |
||||
* MpiInterface. |
||||
*/ |
||||
class ParallelCommunicationInterface |
||||
{ |
||||
public: |
||||
/**
|
||||
* Destructor |
||||
*/ |
||||
virtual ~ParallelCommunicationInterface() {} |
||||
/**
|
||||
* Deletes storage used by the parallel environment. |
||||
*/ |
||||
virtual void Destroy () = 0; |
||||
/**
|
||||
* \return system identification |
||||
*/ |
||||
virtual uint32_t GetSystemId () = 0; |
||||
/**
|
||||
* \return number of parallel tasks |
||||
*/ |
||||
virtual uint32_t GetSize () = 0; |
||||
/**
|
||||
* \return true if parallel communication is enabled |
||||
*/ |
||||
virtual bool IsEnabled () = 0; |
||||
/**
|
||||
* \param pargc number of command line arguments |
||||
* \param pargv command line arguments |
||||
* |
||||
* Sets up parallel communication interface |
||||
*/ |
||||
virtual void Enable (int* pargc, char*** pargv) = 0; |
||||
/**
|
||||
* Terminates the parallel environment. |
||||
* This function must be called after Destroy () |
||||
*/ |
||||
virtual void Disable () = 0; |
||||
/**
|
||||
* \param p packet to send |
||||
* \param rxTime received time at destination node |
||||
* \param node destination node |
||||
* \param dev destination device |
||||
* |
||||
* Serialize and send a packet to the specified node and net device |
||||
*/ |
||||
virtual void SendPacket (Ptr<Packet> p, const Time &rxTime, uint32_t node, uint32_t dev) = 0; |
||||
|
||||
private: |
||||
}; |
||||
|
||||
} // namespace ns3
|
||||
|
||||
#endif /* NS3_PARALLEL_COMMUNICATION_INTERFACE_H */ |
@ -0,0 +1,112 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* Copyright 2013. Lawrence Livermore National Security, LLC. |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: Steven Smith <smith84@llnl.gov> |
||||
* |
||||
*/ |
||||
|
||||
#include "remote-channel-bundle-manager.h" |
||||
|
||||
#include "remote-channel-bundle.h" |
||||
#include "null-message-simulator-impl.h" |
||||
|
||||
#include "ns3/simulator.h" |
||||
|
||||
namespace ns3 { |
||||
|
||||
bool ns3::RemoteChannelBundleManager::g_initialized = false; |
||||
ns3::RemoteChannelBundleManager::RemoteChannelMap ns3::RemoteChannelBundleManager::g_remoteChannelBundles; |
||||
|
||||
Ptr<RemoteChannelBundle> |
||||
RemoteChannelBundleManager::Find (uint32_t systemId) |
||||
{ |
||||
ns3::RemoteChannelBundleManager::RemoteChannelMap::iterator kv = g_remoteChannelBundles.find (systemId); |
||||
|
||||
if ( kv == g_remoteChannelBundles.end ()) |
||||
{ |
||||
return 0; |
||||
} |
||||
else |
||||
{ |
||||
return kv->second; |
||||
} |
||||
} |
||||
|
||||
Ptr<RemoteChannelBundle> |
||||
RemoteChannelBundleManager::Add (uint32_t systemId) |
||||
{ |
||||
NS_ASSERT (!g_initialized); |
||||
NS_ASSERT (g_remoteChannelBundles.find (systemId) == g_remoteChannelBundles.end ()); |
||||
|
||||
Ptr<RemoteChannelBundle> remoteChannelBundle = Create<RemoteChannelBundle> (systemId); |
||||
|
||||
g_remoteChannelBundles[systemId] = remoteChannelBundle; |
||||
|
||||
return remoteChannelBundle; |
||||
} |
||||
|
||||
uint32_t
|
||||
RemoteChannelBundleManager::Size (void)
|
||||
{ |
||||
return g_remoteChannelBundles.size(); |
||||
} |
||||
|
||||
void |
||||
RemoteChannelBundleManager::InitializeNullMessageEvents (void) |
||||
{ |
||||
NS_ASSERT (!g_initialized); |
||||
|
||||
for ( RemoteChannelMap::const_iterator iter = g_remoteChannelBundles.begin (); |
||||
iter != g_remoteChannelBundles.end (); |
||||
++iter ) |
||||
{ |
||||
Ptr<RemoteChannelBundle> bundle = iter->second; |
||||
bundle->Send (bundle->GetDelay ()); |
||||
|
||||
NullMessageSimulatorImpl::GetInstance ()->ScheduleNullMessageEvent (bundle); |
||||
} |
||||
|
||||
g_initialized = true; |
||||
} |
||||
|
||||
Time |
||||
RemoteChannelBundleManager::GetSafeTime (void) |
||||
{ |
||||
NS_ASSERT (g_initialized); |
||||
|
||||
Time safeTime = Simulator::GetMaximumSimulationTime (); |
||||
|
||||
for (RemoteChannelMap::const_iterator kv = g_remoteChannelBundles.begin (); |
||||
kv != g_remoteChannelBundles.end (); |
||||
++kv) |
||||
{ |
||||
safeTime = Min (safeTime, kv->second->GetGuaranteeTime ()); |
||||
} |
||||
|
||||
return safeTime; |
||||
} |
||||
|
||||
void |
||||
RemoteChannelBundleManager::Destroy (void) |
||||
{ |
||||
NS_ASSERT (g_initialized); |
||||
|
||||
g_remoteChannelBundles.clear(); |
||||
g_initialized = false; |
||||
} |
||||
|
||||
} // namespace ns3
|
@ -0,0 +1,107 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* Copyright 2013. Lawrence Livermore National Security, LLC. |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: Steven Smith <smith84@llnl.gov> |
||||
* |
||||
*/ |
||||
|
||||
#ifndef NS3_REMOTE_CHANNEL_BUNDLE_MANAGER |
||||
#define NS3_REMOTE_CHANNEL_BUNDLE_MANAGER |
||||
|
||||
#include <ns3/nstime.h> |
||||
#include <ns3/ptr.h> |
||||
#include <map> |
||||
|
||||
namespace ns3 { |
||||
|
||||
class RemoteChannelBundle; |
||||
|
||||
/*
|
||||
* \ingroup mpi |
||||
*
|
||||
* \brief Singleton for managing the RemoteChannelBundles for each process. |
||||
* |
||||
* Manages collective tasks associated with the bundle collection. |
||||
*/ |
||||
class RemoteChannelBundleManager |
||||
{ |
||||
|
||||
public: |
||||
/**
|
||||
* \return remote channel bundle for specified SystemId. |
||||
*/ |
||||
static Ptr<RemoteChannelBundle> Find (uint32_t systemId); |
||||
|
||||
/**
|
||||
* Add RemoteChannelBundle from this task to MPI task on other side of the link. |
||||
* Can not be invoked after InitializeNullMessageEvents has been invoked. |
||||
*/ |
||||
static Ptr<RemoteChannelBundle> Add (uint32_t systemId); |
||||
|
||||
/**
|
||||
* \return number of remote channel bundles |
||||
*
|
||||
*/ |
||||
static uint32_t Size (void); |
||||
|
||||
/**
|
||||
* Setup initial Null Message events for every RemoteChannelBundle. |
||||
* All RemoteChannelBundles should be added before this method is invoked. |
||||
*/ |
||||
static void InitializeNullMessageEvents (void); |
||||
|
||||
/**
|
||||
* \return safe time across all remote channels. |
||||
*/ |
||||
static Time GetSafeTime (void); |
||||
|
||||
/**
|
||||
* Destroy the singleton. |
||||
*/ |
||||
static void Destroy (void); |
||||
|
||||
private: |
||||
|
||||
/**
|
||||
* Private ctor to prevent creation outside of singleton pattern. |
||||
*/ |
||||
RemoteChannelBundleManager () |
||||
{ |
||||
} |
||||
|
||||
~RemoteChannelBundleManager () |
||||
{ |
||||
} |
||||
|
||||
/*
|
||||
* Container for all remote channel bundles for this task. |
||||
* |
||||
* Would be more efficient to use unordered_map when C++11 is adopted for NS3. |
||||
*/ |
||||
typedef std::map<uint32_t, Ptr<RemoteChannelBundle> > RemoteChannelMap; |
||||
static RemoteChannelMap g_remoteChannelBundles; |
||||
|
||||
/*
|
||||
* Protect manager class from being initialized twice or incorrect |
||||
* ordering of method calls. |
||||
*/ |
||||
static bool g_initialized; |
||||
}; |
||||
|
||||
} // namespace ns3
|
||||
|
||||
#endif |
@ -0,0 +1,130 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* Copyright 2013. Lawrence Livermore National Security, LLC. |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: Steven Smith <smith84@llnl.gov> |
||||
* |
||||
*/ |
||||
|
||||
#include "remote-channel-bundle.h" |
||||
|
||||
#include "null-message-mpi-interface.h" |
||||
#include "null-message-simulator-impl.h" |
||||
|
||||
#include <ns3/simulator.h> |
||||
|
||||
namespace ns3 { |
||||
|
||||
#define NS_TIME_INFINITY ns3::Time (0x7fffffffffffffffLL) |
||||
|
||||
TypeId RemoteChannelBundle::GetTypeId (void) |
||||
{ |
||||
static TypeId tid = TypeId ("ns3::RemoteChannelBundle") |
||||
.SetParent<Object> () |
||||
.AddConstructor <RemoteChannelBundle> (); |
||||
return tid; |
||||
} |
||||
|
||||
RemoteChannelBundle::RemoteChannelBundle () |
||||
: m_remoteSystemId (-1), |
||||
m_guaranteeTime (0), |
||||
m_delay (NS_TIME_INFINITY) |
||||
{ |
||||
} |
||||
|
||||
RemoteChannelBundle::RemoteChannelBundle (const uint32_t remoteSystemId) |
||||
: m_remoteSystemId (remoteSystemId), |
||||
m_guaranteeTime (0), |
||||
m_delay (NS_TIME_INFINITY) |
||||
{ |
||||
} |
||||
|
||||
void |
||||
RemoteChannelBundle::AddChannel (Ptr<Channel> channel, Time delay) |
||||
{ |
||||
m_channels[channel->GetId ()] = channel; |
||||
m_delay = ns3::Min (m_delay, delay); |
||||
} |
||||
|
||||
uint32_t |
||||
RemoteChannelBundle::GetSystemId () const |
||||
{ |
||||
return m_remoteSystemId; |
||||
} |
||||
|
||||
Time |
||||
RemoteChannelBundle::GetGuaranteeTime (void) const |
||||
{ |
||||
return m_guaranteeTime; |
||||
} |
||||
|
||||
void |
||||
RemoteChannelBundle::SetGuaranteeTime (Time time) |
||||
{ |
||||
NS_ASSERT (time >= Simulator::Now ()); |
||||
|
||||
m_guaranteeTime = time; |
||||
} |
||||
|
||||
Time |
||||
RemoteChannelBundle::GetDelay (void) const |
||||
{ |
||||
return m_delay; |
||||
} |
||||
|
||||
void |
||||
RemoteChannelBundle::SetEventId (EventId id) |
||||
{ |
||||
m_nullEventId = id; |
||||
} |
||||
|
||||
EventId |
||||
RemoteChannelBundle::GetEventId (void) const |
||||
{ |
||||
return m_nullEventId; |
||||
} |
||||
|
||||
int |
||||
RemoteChannelBundle::GetSize (void) const |
||||
{ |
||||
return m_channels.size (); |
||||
} |
||||
|
||||
void
|
||||
RemoteChannelBundle::Send(Time time) |
||||
{ |
||||
NullMessageMpiInterface::SendNullMessage (time, this);
|
||||
} |
||||
|
||||
std::ostream& operator<< (std::ostream& out, ns3::RemoteChannelBundle& bundle ) |
||||
{ |
||||
out << "RemoteChannelBundle Rank = " << bundle.m_remoteSystemId |
||||
<< ", GuaranteeTime = " << bundle.m_guaranteeTime |
||||
<< ", Delay = " << bundle.m_delay << std::endl; |
||||
|
||||
for (std::map < uint32_t, Ptr < Channel > > ::const_iterator pair = bundle.m_channels.begin (); |
||||
pair != bundle.m_channels.end (); |
||||
++pair) |
||||
{ |
||||
out << "\t" << (*pair).second << std::endl; |
||||
} |
||||
|
||||
return out; |
||||
} |
||||
|
||||
|
||||
} // namespace ns3
|
||||
|
@ -0,0 +1,151 @@
|
||||
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ |
||||
/*
|
||||
* Copyright 2013. Lawrence Livermore National Security, LLC. |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation; |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
||||
* |
||||
* Author: Steven Smith <smith84@llnl.gov> |
||||
* |
||||
*/ |
||||
|
||||
#ifndef NS3_REMOTE_CHANNEL_BUNDLE |
||||
#define NS3_REMOTE_CHANNEL_BUNDLE |
||||
|
||||
#include "null-message-simulator-impl.h" |
||||
|
||||
#include <ns3/channel.h> |
||||
#include <ns3/ptr.h> |
||||
#include <ns3/pointer.h> |
||||
|
||||
#include <map> |
||||
|
||||
namespace ns3 { |
||||
|
||||
/**
|
||||
* \ingroup mpi |
||||
*
|
||||
* \brief Collection of NS3 channels between local and remote nodes. |
||||
*
|
||||
* An instance exists for each remote system that the local system is |
||||
* in communication with. These are created and managed by the |
||||
* RemoteChannelBundleManager class. Stores time information for each |
||||
* bundle. |
||||
*/ |
||||
class RemoteChannelBundle : public Object |
||||
{ |
||||
public: |
||||
static TypeId GetTypeId (void); |
||||
|
||||
RemoteChannelBundle (); |
||||
|
||||
RemoteChannelBundle (const uint32_t remoteSystemId); |
||||
|
||||
~RemoteChannelBundle () |
||||
{ |
||||
} |
||||
|
||||
/**
|
||||
* \param channel to add to the bundle |
||||
* \param delay time for the channel (usually the latency) |
||||
*/ |
||||
void AddChannel (Ptr<Channel> channel, Time delay); |
||||
|
||||
/**
|
||||
* \return SystemID for remote side of this bundle |
||||
*/ |
||||
uint32_t GetSystemId () const; |
||||
|
||||
/**
|
||||
* \return guarantee time |
||||
*/ |
||||
Time GetGuaranteeTime (void) const; |
||||
|
||||
/**
|
||||
* \param guarantee time |
||||
* |
||||
* Set the guarantee time for the bundle. This should be called |
||||
* after a packet or Null Message received. |
||||
*/ |
||||
void SetGuaranteeTime (Time time); |
||||
|
||||
/**
|
||||
* \return the minimum delay along any channel in this bundle |
||||
*/ |
||||
Time GetDelay (void) const; |
||||
|
||||
/**
|
||||
* Set the event ID of the Null Message send event current scheduled |
||||
* for this channel. |
||||
*/ |
||||
void SetEventId (EventId id); |
||||
|
||||
/**
|
||||
* \return the event ID of the Null Message send event for this bundle |
||||
*/ |
||||
EventId GetEventId (void) const; |
||||
|
||||
/**
|
||||
* \return number of NS3 channels in this bundle |
||||
*/ |
||||
int GetSize (void) const; |
||||
|
||||
/**
|
||||
* \param time
|
||||
* |
||||
* Send Null Message to the remote task associated with this bundle. |
||||
* Message will be delivered at current simulation time + the time |
||||
* passed in. |
||||
*/ |
||||
void Send(Time time); |
||||
|
||||
/**
|
||||
* Output for debugging purposes. |
||||
*/ |
||||
friend std::ostream& operator<< (std::ostream& out, ns3::RemoteChannelBundle& bundle ); |
||||
|
||||
private: |
||||
/*
|
||||
* Remote rank. |
||||
*/ |
||||
uint32_t m_remoteSystemId; |
||||
|
||||
/*
|
||||
* NS3 Channels that are connected from nodes in this MPI task to remote_rank. |
||||
* |
||||
* Would be more efficient to use unordered_map when C++11 is adopted by NS3. |
||||
*/ |
||||
std::map < uint32_t, Ptr < Channel > > m_channels; |
||||
|
||||
/*
|
||||
* Guarentee time for the incoming Channels from MPI task remote_rank. |
||||
* No PacketMessage will ever arrive on any incoming channel in this bundle with a |
||||
* ReceiveTime less than this. Intialized to StartTime. |
||||
*/ |
||||
Time m_guaranteeTime; |
||||
|
||||
/*
|
||||
* Delay for this Channel bundle. min link delay over all incoming channels; |
||||
*/ |
||||
Time m_delay; |
||||
|
||||
/*
|
||||
* Event scheduled to send Null Message for this bundle. |
||||
*/ |
||||
EventId m_nullEventId; |
||||
|
||||
}; |
||||
|
||||
} |
||||
|
||||
#endif |
Loading…
Reference in new issue