Browse Source

上传文件至 ''

ytm 5 years ago
parent
commit
b16bcf8328
2 changed files with 341 additions and 0 deletions
  1. 213 0
      v_msg.cpp
  2. 128 0
      v_msg.h

+ 213 - 0
v_msg.cpp

@@ -0,0 +1,213 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include "v_msg.h"
+
+TMsg * TMsg::pInstance = NULL; // 初始化
+
+TMsg::TMsg()
+{
+
+}
+
+TInt sem_post(TSem *pSem)
+{
+	if (NULL == pSem)
+	{
+		return VOS_FALSE;
+	}
+	//pSem->post();
+	ReleaseSemaphore(pSem, 1, NULL);
+	return VOS_TRUE;
+}
+
+TInt sem_wait(TSem *pSem, TULong ulMilliseconds)
+{
+	if (NULL == pSem)
+	{
+		return VOS_FALSE;
+	}
+	//pSem->wait();
+	if (0 != WaitForSingleObject(pSem, ulMilliseconds))   //信号量值-1
+	{
+		return VOS_FALSE;
+	}
+	return VOS_TRUE;
+}
+
+void TMsgInfo::init(int iSize)
+{
+	m_pMsgHead = new  TMsgNode[iSize];
+	m_iMaxSize = iSize;
+
+	m_pSem = CreateSemaphore(NULL          //信号量的安全特性
+		, 0            //设置信号量的初始计数。可设置零到最大值之间的一个值
+		, m_iMaxSize + 1     //设置信号量的最大计数
+		, NULL         //指定信号量对象的名称
+		); // 初始化 信号量
+	if (NULL == m_pSem)
+	{
+		clear();
+		return;
+	}
+
+	m_pSpinMutex = new TSpinMutex();
+	if (NULL == m_pSpinMutex)
+	{
+		clear();
+		return;
+	}
+}
+void TMsgInfo::clear()
+{
+	if (NULL != m_pSpinMutex)
+	{
+		delete m_pSpinMutex;
+	}
+	CloseHandle(m_pSem);
+	if (NULL != m_pMsgHead)
+	{
+		delete m_pMsgHead;
+	}
+}
+
+TInt  TMsgInfo::Push_Msg(void *pMsgContent)
+{
+	{
+		TMutex mutex(m_pSpinMutex);
+		m_pMsgHead[m_iTailPos].pMsg = pMsgContent;
+		m_iTailPos = (++m_iTailPos) % m_iMaxSize;
+		++m_iMsgSize;
+	}
+
+	if (VOS_TRUE != sem_post(m_pSem))
+	{
+		return VOS_FALSE;
+	}
+	return VOS_TRUE;
+}
+
+void *TMsgInfo::Pop_Msg(TULong ulMilliseconds)
+{
+	void *pMsg = NULL;
+
+	if (VOS_TRUE != sem_wait(m_pSem, ulMilliseconds))
+	{
+		return NULL;
+	}
+
+	{
+		TMutex lock(m_pSpinMutex);
+		--m_iMsgSize;
+		pMsg = m_pMsgHead[m_iHeadPos].pMsg;
+		m_iHeadPos = (++m_iHeadPos) % m_iMaxSize;
+	}
+
+	return pMsg;
+}
+
+TMsg *TMsg::GetInstance()
+{
+	if (NULL == pInstance)
+	{
+		pInstance = new TMsg();
+
+		pInstance->InitMsgInfo();
+	}
+	return pInstance;
+}
+TInt TMsg::InitMsgInfo()
+{
+	const int iMsgSize = MAX_MSG_QUE_SIZE;
+	m_pMsgQue = new TMsgInfo(iMsgSize);
+
+	m_pFreeMsgHead = (TMsgBody *)malloc(sizeof(TMsgHead));
+	m_pFreeHead    = (TMsgHead *)m_pFreeMsgHead;
+
+	m_pFreeHead->pPrev = m_pFreeHead->pNext = m_pFreeMsgHead;
+	m_pFreeHead->iReserve = m_pFreeHead->iFreeSize = 0; 
+
+	TMsgBody *pstMsgNode = new TMsgBody[iMsgSize];
+	m_pSpinMutex = new TSpinMutex();
+
+
+	for (int i = 0; i < iMsgSize; ++i)
+	{
+		InsertFree(pstMsgNode + i);
+	}
+
+	return VOS_TRUE;
+}
+
+inline void TMsg::InsertFree(TMsgBody *pMsgNode)
+{
+	{
+		TMutex mutex(m_pSpinMutex);
+		pMsgNode->pNext = m_pFreeMsgHead;
+		pMsgNode->pPrev = m_pFreeMsgHead->pPrev;
+		m_pFreeMsgHead->pPrev->pNext = pMsgNode;
+		m_pFreeMsgHead->pPrev        = pMsgNode;
+		++(m_pFreeHead->iFreeSize);
+	}
+}
+
+// 获取到的默认内存大小为4080
+void *TMsg::ObtainMsg()
+{
+	return PopFreeMsg();
+}
+
+void  TMsg::FreeMsg(void *pvMsg)
+{
+	if (NULL == pvMsg)
+	{
+		return;
+	}
+	TMsgBody *pMsgNode = (TMsgBody *)((char *)pvMsg - sizeof(void *) * 2);
+	InsertFree(pMsgNode);
+
+	pvMsg = NULL;
+	return;
+}
+
+void *TMsg::PopFreeMsg()
+{
+	TMsgBody *pMsgNode = NULL;
+
+	if (IsEmpty()) 
+	{
+		return NULL;
+	}
+
+	{
+		TMutex mutex(m_pSpinMutex);
+
+		if (IsEmpty())  { return NULL; }
+
+		pMsgNode = m_pFreeMsgHead->pNext;
+
+		pMsgNode->pNext->pPrev = pMsgNode->pPrev;
+		m_pFreeMsgHead->pNext = pMsgNode->pNext;
+
+		--(m_pFreeHead->iFreeSize);
+	}
+
+	return pMsgNode->acBody;
+}
+
+inline TBool TMsg::IsEmpty()
+{
+	return  ((m_pFreeMsgHead->pNext == m_pFreeMsgHead) ? VOS_TRUE : VOS_FALSE);
+}
+
+TInt  TMsg::SendMsg(void *pvMsg, TInt iMsgLen)
+{
+	TMsgBody *pMsgNode = (TMsgBody *)((char *)pvMsg - sizeof(void *) * 2);
+	
+	return m_pMsgQue->Push_Msg(pMsgNode);
+}
+
+void *TMsg::RecvMsg(TULong ulMilliseconds)
+{
+	void *pvMsg = m_pMsgQue->Pop_Msg(ulMilliseconds);
+	return ((pvMsg == NULL) ? pvMsg : (char *)(pvMsg)+sizeof(void *) * 2);
+}

+ 128 - 0
v_msg.h

@@ -0,0 +1,128 @@
+#ifndef _MSG_H_
+#define _MSG_H_
+#include <fcntl.h>           /* For O_* constants */
+#include <sys/stat.h>        /* For mode constants */
+
+#include <map>
+#include <vector>
+#include <atomic>
+#include <winsock2.h>
+#include <condition_variable>
+using namespace std;
+
+#define MAX_MSG_QUE_SIZE 1000
+#define MAX_MSG_BUF_LEN  4080
+
+typedef int           TInt;
+typedef unsigned int  TUInt;
+typedef long          TLong;
+typedef unsigned long TULong;
+
+class TSpinMutex {
+	atomic_flag *pLockWrite;// = ATOMIC_FLAG_INIT;
+public:
+	TSpinMutex() { pLockWrite = new atomic_flag{ ATOMIC_FLAG_INIT }; };
+	TSpinMutex(const TSpinMutex&) = delete;
+	TSpinMutex& operator= (const TSpinMutex&) = delete;
+	void lock()   { while ((pLockWrite->test_and_set())); } // 获取自旋锁
+	void unlock() { pLockWrite->clear(); }                  // 释放自旋锁
+};
+
+
+//typedef std::mutex  TMutex_t;
+typedef TSpinMutex  TMutex_t;
+
+class TMutex
+{
+public:
+	TMutex(TMutex_t *pMutex)
+	{
+		m_pMutex = pMutex;
+		pMutex->lock();
+	}
+
+	~TMutex()
+	{
+		m_pMutex->unlock();
+	}
+
+private:
+	TMutex_t *m_pMutex;
+};
+
+
+struct TMsgNode
+{
+	void     *pMsg;
+
+	TMsgNode():pMsg(NULL){}
+	~TMsgNode(){pMsg = NULL;}
+};
+typedef void  TSem;
+
+//typedef std::condition_variable TSem;
+
+class TMsgInfo
+{
+public:
+	TMsgNode *m_pMsgHead;
+	TSem     *m_pSem;
+	TSpinMutex *m_pSpinMutex;
+	TInt      m_iMsgSize;
+	TUInt     m_iTailPos;
+	TUInt     m_iHeadPos;
+	TUInt     m_iMaxSize;
+
+	TMsgInfo() :m_iMsgSize(0), m_iTailPos(0), m_iHeadPos(0){ init(100); }
+	TMsgInfo(TUInt iSize) :m_iMsgSize(0), m_iTailPos(0), m_iHeadPos(0) { init(iSize); }
+
+	~TMsgInfo() { clear(); }
+
+	TInt  Push_Msg(void *pMsgContent);
+	void *Pop_Msg(TULong ulMilliseconds);
+	TBool IsEmpty(){ return (m_iTailPos == m_iHeadPos ? VOS_TRUE : VOS_FALSE); }
+private:
+	void init(int iSize);
+	void clear();
+};
+
+struct TMsgBody
+{
+	TMsgBody *pPrev;
+	TMsgBody *pNext;
+	char acBody[MAX_MSG_BUF_LEN];
+};
+struct TMsgHead
+{
+	TMsgBody *pPrev;
+	TMsgBody *pNext;
+	TInt      iFreeSize;
+	TInt      iReserve;
+};
+class TMsg
+{
+public:
+	static TMsg  *GetInstance();
+	TInt  SendMsg(void *pMsgContent, TInt iMsgLen);  // 发送消息到消息队列
+	void *RecvMsg(TULong ulMilliseconds);                                 // 从消息队列中获取消息,接收线程调用
+	void *ObtainMsg();          // 获取消息buffer
+	void  FreeMsg(void *pvMsg); // 释放获取的消息buffer
+private:
+	TInt         InitMsgInfo();
+	inline void  InsertFree(TMsgBody *pMsgNode);
+	void        *PopFreeMsg();
+	inline TBool IsEmpty();
+
+private:
+	TMsg();
+	static TMsg *pInstance;
+
+	TMsgInfo *m_pMsgQue;
+
+	TMsgBody   *m_pFreeMsgHead;
+	TMsgHead   *m_pFreeHead;    // 与 m_pFreeMsgHead 地址相同
+	TSpinMutex *m_pSpinMutex;   // 保护 m_pFreeMsgHead
+};
+
+#endif
+