libgrape-lite
A C++ library for parallel graph processing
Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes | List of all members
grape::ParallelMessageManager Class Reference

A kind of parallel message manager. More...

#include <parallel_message_manager.h>

Inheritance diagram for grape::ParallelMessageManager:
grape::MessageManagerBase

Public Member Functions

void Init (MPI_Comm comm) override
 Inherit.
 
void Start () override
 Inherit.
 
void StartARound () override
 Inherit.
 
void FinishARound () override
 Inherit.
 
bool ToTerminate () override
 Inherit.
 
void Finalize () override
 Inherit.
 
void ForceContinue () override
 Inherit.
 
void ForceTerminate (const std::string &terminate_info) override
 Inherit.
 
const TerminateInfoGetTerminateInfo () const override
 Inherit.
 
size_t GetMsgSize () const override
 Inherit.
 
void InitChannels (int channel_num=1, size_t block_size=default_msg_send_block_size, size_t block_cap=default_msg_send_block_capacity)
 Init a set of channels, each channel is a thread local message buffer. More...
 
std::vector< ThreadLocalMessageBuffer< ParallelMessageManager > > & Channels ()
 
void SendRawMsgByFid (fid_t fid, InArchive &&arc)
 Send a buffer to a fragment. More...
 
template<typename MESSAGE_T >
void SendToFragment (fid_t dst_fid, const MESSAGE_T &msg, int channel_id=0)
 Send message to a fragment. More...
 
template<typename GRAPH_T , typename MESSAGE_T >
void SyncStateOnOuterVertex (const GRAPH_T &frag, const typename GRAPH_T::vertex_t &v, const MESSAGE_T &msg, int channel_id=0)
 SyncStateOnOuterVertex on a channel. More...
 
template<typename GRAPH_T >
void SyncStateOnOuterVertex (const GRAPH_T &frag, const typename GRAPH_T::vertex_t &v, int channel_id=0)
 
template<typename GRAPH_T , typename MESSAGE_T >
void SendMsgThroughIEdges (const GRAPH_T &frag, const typename GRAPH_T::vertex_t &v, const MESSAGE_T &msg, int channel_id=0)
 SendMsgThroughIEdges on a channel. More...
 
template<typename GRAPH_T , typename MESSAGE_T >
void SendMsgThroughOEdges (const GRAPH_T &frag, const typename GRAPH_T::vertex_t &v, const MESSAGE_T &msg, int channel_id=0)
 SendMsgThroughOEdges on a channel. More...
 
template<typename GRAPH_T , typename MESSAGE_T >
void SendMsgThroughEdges (const GRAPH_T &frag, const typename GRAPH_T::vertex_t &v, const MESSAGE_T &msg, int channel_id=0)
 SendMsgThroughEdges on a channel. More...
 
bool GetMessageInBuffer (MessageInBuffer &buf)
 Get a bunch of messages, stored in a MessageInBuffer. More...
 
template<typename GRAPH_T , typename MESSAGE_T , typename FUNC_T >
void ParallelProcess (int thread_num, const GRAPH_T &frag, const FUNC_T &func)
 Parallel process all incoming messages with given function of last round. More...
 
template<typename MESSAGE_T , typename FUNC_T >
void ParallelProcess (int thread_num, const FUNC_T &func)
 Parallel process all incoming messages with given function of last round. More...
 

Private Member Functions

void startSendThread ()
 
void probeAllIncomingMessages ()
 
int probeIncomingMessages ()
 
void startRecvThread ()
 
void stopRecvThread ()
 
size_t finishMsgFilling ()
 
void resetRecvQueue ()
 
void waitSend ()
 

Private Attributes

fid_t fid_
 
fid_t fnum_
 
CommSpec comm_spec_
 
MPI_Comm comm_
 
std::vector< InArchiveto_self_
 
std::vector< InArchiveto_others_
 
std::vector< ThreadLocalMessageBuffer< ParallelMessageManager > > channels_
 
int round_
 
BlockingQueue< std::pair< fid_t, InArchive > > sending_queue_
 
std::thread send_thread_
 
std::array< BlockingQueue< OutArchive >, 2 > recv_queues_
 
std::thread recv_thread_
 
bool force_continue_
 
size_t sent_size_
 
bool force_terminate_
 
TerminateInfo terminate_info_
 

Static Private Attributes

static constexpr size_t default_msg_send_block_size = 2 * 1023 * 1024
 
static constexpr size_t default_msg_send_block_capacity = 2 * 1023 * 1024
 

Detailed Description

A kind of parallel message manager.

ParallelMessageManager support multi-threads to send messages concurrently with channels. Each channel contains a thread local message buffer.

For each thread local message buffer, when accumulated a given amount of messages, the buffer will be sent through MPI.

After a round of evaluation, there is a global barrier to determine whether the fixed point is reached.

Member Function Documentation

◆ GetMessageInBuffer()

bool grape::ParallelMessageManager::GetMessageInBuffer ( MessageInBuffer buf)
inline

Get a bunch of messages, stored in a MessageInBuffer.

Parameters
bufMessage buffer which holds a grape::OutArchive.

◆ InitChannels()

void grape::ParallelMessageManager::InitChannels ( int  channel_num = 1,
size_t  block_size = default_msg_send_block_size,
size_t  block_cap = default_msg_send_block_capacity 
)
inline

Init a set of channels, each channel is a thread local message buffer.

Parameters
channel_numNumber of channels.
block_sizeSize of each channel.
block_capCapacity of each channel.

◆ ParallelProcess() [1/2]

template<typename MESSAGE_T , typename FUNC_T >
void grape::ParallelMessageManager::ParallelProcess ( int  thread_num,
const FUNC_T &  func 
)
inline

Parallel process all incoming messages with given function of last round.

Template Parameters
GRAPH_TGraph type.
MESSAGE_TMessage type.
FUNC_TFunction type.
Parameters
thread_numNumber of threads.
frag
func

◆ ParallelProcess() [2/2]

template<typename GRAPH_T , typename MESSAGE_T , typename FUNC_T >
void grape::ParallelMessageManager::ParallelProcess ( int  thread_num,
const GRAPH_T &  frag,
const FUNC_T &  func 
)
inline

Parallel process all incoming messages with given function of last round.

Template Parameters
GRAPH_TGraph type.
MESSAGE_TMessage type.
FUNC_TFunction type.
Parameters
thread_numNumber of threads.
frag
func

◆ SendMsgThroughEdges()

template<typename GRAPH_T , typename MESSAGE_T >
void grape::ParallelMessageManager::SendMsgThroughEdges ( const GRAPH_T &  frag,
const typename GRAPH_T::vertex_t &  v,
const MESSAGE_T &  msg,
int  channel_id = 0 
)
inline

SendMsgThroughEdges on a channel.

Template Parameters
GRAPH_TGraph type.
MESSAGE_TMessage type.
Parameters
fragSource fragment.
vSource vertex.
msg
channel_id

◆ SendMsgThroughIEdges()

template<typename GRAPH_T , typename MESSAGE_T >
void grape::ParallelMessageManager::SendMsgThroughIEdges ( const GRAPH_T &  frag,
const typename GRAPH_T::vertex_t &  v,
const MESSAGE_T &  msg,
int  channel_id = 0 
)
inline

SendMsgThroughIEdges on a channel.

Template Parameters
GRAPH_TGraph type.
MESSAGE_TMessage type.
Parameters
fragSource fragment.
vSource vertex.
msg
channel_id

◆ SendMsgThroughOEdges()

template<typename GRAPH_T , typename MESSAGE_T >
void grape::ParallelMessageManager::SendMsgThroughOEdges ( const GRAPH_T &  frag,
const typename GRAPH_T::vertex_t &  v,
const MESSAGE_T &  msg,
int  channel_id = 0 
)
inline

SendMsgThroughOEdges on a channel.

Template Parameters
GRAPH_TGraph type.
MESSAGE_TMessage type.
Parameters
fragSource fragment.
vSource vertex.
msg
channel_id

◆ SendRawMsgByFid()

void grape::ParallelMessageManager::SendRawMsgByFid ( fid_t  fid,
InArchive &&  arc 
)
inline

Send a buffer to a fragment.

Parameters
fidDestination fragment id.
arcMessage buffer.

◆ SendToFragment()

template<typename MESSAGE_T >
void grape::ParallelMessageManager::SendToFragment ( fid_t  dst_fid,
const MESSAGE_T &  msg,
int  channel_id = 0 
)
inline

Send message to a fragment.

Template Parameters
MESSAGE_TMessage type.
Parameters
dst_fidDestination fragment id.
msg
channelId

◆ SyncStateOnOuterVertex()

template<typename GRAPH_T , typename MESSAGE_T >
void grape::ParallelMessageManager::SyncStateOnOuterVertex ( const GRAPH_T &  frag,
const typename GRAPH_T::vertex_t &  v,
const MESSAGE_T &  msg,
int  channel_id = 0 
)
inline

SyncStateOnOuterVertex on a channel.

Template Parameters
GRAPH_TGraph type.
MESSAGE_TMessage type.
Parameters
fragSource fragment.
vSource vertex.
msg
channel_id