src/stream/ZStreamMUX.h

00001 /*  @(#) $Id: ZStreamMUX.h,v 1.10 2006/07/23 21:58:20 agreen Exp $ */
00002 
00003 /* ------------------------------------------------------------
00004 Copyright (c) 2002 Andrew Green and Learning in Motion, Inc.
00005 http://www.zoolib.org
00006 
00007 Permission is hereby granted, free of charge, to any person obtaining a copy
00008 of this software and associated documentation files (the "Software"), to deal
00009 in the Software without restriction, including without limitation the rights
00010 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
00011 copies of the Software, and to permit persons to whom the Software is
00012 furnished to do so, subject to the following conditions:
00013 
00014 The above copyright notice and this permission notice shall be included in
00015 all copies or substantial portions of the Software.
00016 
00017 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
00018 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
00019 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
00020 COPYRIGHT HOLDER(S) BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
00021 AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
00022 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
00023 ------------------------------------------------------------ */
00024 
00025 #ifndef __ZStreamMUX__
00026 #define __ZStreamMUX__ 1
00027 
00028 #include "zconfig.h"
00029 #include "ZStream.h"
00030 #include "ZThread.h"
00031 
00032 #include <deque>
00033 #include <string>
00034 
00035 // =================================================================================================
00036 #pragma mark -
00037 #pragma mark * ZStreamMUX
00038 
00039 class ZStreamMUX
00040         {
00041 public:
00042         typedef uint16 SessionID;
00043         static const SessionID kInvalidSessionID = 0;
00044 
00045         enum Error
00046                 {
00048                 errorNone,
00049 
00051                 errorAbortLocal,
00052 
00054                 errorAbortRemote,
00055 
00057                 errorSendClosed,
00058 
00060                 errorReceiveClosed,
00061 
00063                 errorInvalidSession,
00064 
00066                 errorAlreadyListening,
00067 
00069                 errorNotListening,
00070 
00072                 errorBadListenName,
00073 
00075                 errorConnectionDied,
00076 
00078                 errorDisposed,
00079 
00081                 waitPending
00082                 };
00083 
00084         class StreamRW;
00085 
00086         ZStreamMUX(const ZStreamR& iStreamR, const ZStreamW& iStreamW);
00087         ~ZStreamMUX();
00088 
00089         void Startup();
00090         void Shutdown();
00091 
00092         Error Listen(const std::string& iListenName);
00093         Error StopListen(const std::string& iListenName);
00094         Error Accept(const std::string& iListenName, SessionID& oSessionID);
00095 
00096         Error Open(const std::string& iListenName, const void* iData, size_t iCount,
00097                 size_t* oCountSent, SessionID& oSessionID);
00098         Error Close(SessionID iSessionID);
00099         Error Abort(SessionID iSessionID);
00100 
00101         Error Send(SessionID iSessionID, const void* iSource, size_t iCount, size_t* oCountSent);
00102         Error Receive(SessionID iSessionID, void* iDest, size_t iCount, size_t* oCountReceived);
00103         Error CountReceiveable(SessionID iSessionID, size_t& oCountReceivable);
00104 
00105         void SetMaximumFragmentSize(size_t iSize);
00106 
00107 private:
00108         typedef uint16 PrivID;
00109         struct SessionCB;
00110 
00111         void RunSend();
00112         static void sRunSend(ZStreamMUX* iStreamMUX);
00113 
00114         bool HandleSend_Open(ZMutexLocker& iLocker);
00115         bool HandleSend_RST(ZMutexLocker& iLocker);
00116         bool HandleSend_SYNACK(ZMutexLocker& iLocker);
00117         bool HandleSend_Credit(ZMutexLocker& iLocker);
00118         bool HandleSend_Data(ZMutexLocker& iLocker);
00119 
00120         void RunReceive();
00121         static void sRunReceive(ZStreamMUX* iStreamMUX);
00122 
00123         void HandleReceive_Payload(ZMutexLocker& iLocker, SessionCB* iSession, size_t iPayloadLength);
00124         void HandleReceive_Credit(bool iIsNear, PrivID iPrivID, size_t iCredit);
00125         void HandleReceive_SYN(PrivID iPrivID, uint8 iListenerID, size_t iPayloadLength);
00126         void HandleReceive_SYNACK(PrivID iPrivID);
00127         void HandleReceive_FIN(bool iIsNear, PrivID iPrivID);
00128         void HandleReceive_RST(bool iIsNear, PrivID iPrivID);
00129         void HandleReceive_DefineListener(uint8 iListenerID, size_t iListenNameLength);
00130 
00131         void WriteFragment_SYNACK(PrivID iPrivID);
00132         void WriteFragment_FIN(bool iIsNear, PrivID iPrivID);
00133         void WriteFragment_RST(bool iIsNear, PrivID iPrivID);
00134         void WriteFragment_Credit(bool iIsNear, PrivID iPrivID, size_t iSentCredit);
00135         void WriteFragment_Payload(bool iIsNear, PrivID iPrivID, const void* iSource, size_t iLength);
00136         void WriteFragment_DefineListener(uint8 iListenerID, const std::string& iListenName);
00137 
00138         void CopyDataIn(ZMutexLocker& iLocker, SessionCB* iSession,
00139                 const void* iSource, size_t iCount, size_t& oCount);
00140         void CopyDataOut(SessionCB* iSession, void* iDest, size_t iCount, size_t& oCount);
00141 
00142         void AbortSession(SessionCB* iSession);
00143 
00144         SessionCB* UseSession(bool iIsNear, PrivID iPrivID);
00145         SessionCB* UseSession(SessionCB* iSession);
00146         void ReleaseSession(SessionCB* iSession);
00147 
00148         static SessionID sMakeSessionID(bool iIsNear, PrivID iPrivID);
00149         static void sMakeSessionID(PrivID iPrivID, bool iIsNear);
00150         static void sMakePrivID(SessionID iSessionID, bool& oIsNear, PrivID& oPrivID);
00151         static const PrivID kMaxPrivID = 0x1FF;
00152 
00153         const ZStreamR& fStreamR;
00154         const ZStreamW& fStreamW;
00155         size_t fDefaultCredit;
00156         size_t fMaxFragmentSize;
00157 
00158         struct SessionCB;
00159 
00160         struct Waiter_Send
00161                 {
00162                 const uint8* fSource;
00163                 size_t fCount;
00164                 Error fResult;
00165                 Waiter_Send* fNext;
00166                 };
00167 
00168         struct Waiter_Receive
00169                 {
00170                 uint8* fDest;
00171                 size_t fCount;
00172                 Error fResult;
00173                 Waiter_Receive* fNext;
00174                 };
00175 
00176         struct Waiter_Open
00177                 {
00178                 std::string fListenName;
00179                 SessionCB* fSession;
00180                 const uint8* fSource;
00181                 size_t fCount;
00182                 Error fResult;
00183                 Waiter_Open* fNext;
00184                 };
00185 
00186         struct Waiter_RST
00187                 {
00188                 SessionCB* fSession;
00189                 bool fDoneIt;
00190                 Waiter_RST* fNext;
00191                 };
00192 
00193         struct Waiter_Accept
00194                 {
00195                 SessionCB* fSession;
00196                 Error fResult;
00197                 Waiter_Accept* fNext;
00198                 };
00199 
00200         struct SYNReceived
00201                 {
00202                 std::string fListenName;
00203                 PrivID fID;
00204                 uint8* fBuffer;
00205                 size_t fBufferSize;
00206                 SYNReceived* fNext;
00207                 };
00208 
00209         struct ActiveListen
00210                 {
00211                 std::string fListenName;
00212                 std::deque<SessionCB*> fPendingSessions;
00213                 Waiter_Accept* fWaiters_Head;
00214                 Waiter_Accept* fWaiters_Tail;
00215                 ActiveListen* fNext;
00216                 };
00217 
00220         struct SessionCB
00221                 {
00222                 SessionCB(bool iIsNear, PrivID iPrivID, size_t iCreditSend, size_t iReceiveBufferSize);
00223                 ~SessionCB();
00224 
00225                 bool fIsNear;
00226                 PrivID fID;
00227                 size_t fUseCount;
00228 
00229                 SessionCB* fPrev;
00230                 SessionCB* fNext;
00231 
00232                 bool fAborted;
00233 
00234                 size_t fCredit_Send;
00235                 bool fOpen_Send;
00236                 bool fSentFIN;
00237                 Waiter_Send* fWaiters_Send_Head;
00238                 Waiter_Send* fWaiters_Send_Tail;
00239                 ZCondition fCondition_Send;
00240 
00241                 size_t fCredit_Receive;
00242                 bool fOpen_Receive;
00243                 Waiter_Receive* fWaiters_Receive_Head;
00244                 Waiter_Receive* fWaiters_Receive_Tail;
00245                 ZCondition fCondition_Receive;
00246 
00247                 uint8* fReceiveBuffer;
00248                 size_t fReceiveBufferSize;
00249                 size_t fReceiveBufferFeedIn;
00250                 size_t fReceiveBufferFeedOut;
00251 
00252                 size_t fTotalSent;
00253                 size_t fTotalReceived;
00254                 };
00255 
00256         struct ListenName
00257                 {
00258                 std::string fName;
00259                 uint8 fID;
00260                 ListenName* fNext;
00261                 };
00262 
00263         ZCondition fCondition_Send;
00264         ZCondition fCondition_Open;
00265         ZCondition fCondition_Accept;
00266 
00267         ZMutex fMutex;
00268 
00269         SessionCB* fSessions_Head;
00270         SessionCB* fSessions_Tail;
00271         SessionCB* fSessionNextSend;
00272 
00273         SYNReceived* fPendingSYN_Head;
00274         SYNReceived* fPendingSYN_Tail;
00275 
00276         Waiter_Open* fWaiters_Open_Head;
00277         Waiter_Open* fWaiters_Open_Tail;
00278         Waiter_Open* fWaiters_OpenSYNSent;
00279 
00280         Waiter_RST* fWaiters_RST_Head;
00281         Waiter_RST* fWaiters_RST_Tail;
00282 
00283         ActiveListen* fActiveListens;
00284 
00285         ActiveListen* fActiveListens_RST;
00286 
00287         Waiter_Accept* fWaiters_Accept_Head;
00288         Waiter_Accept* fWaiters_Accept_Tail;
00289 
00290         ListenName* fListenNames_Send;
00291         ListenName* fListenNames_Receive;
00292 
00293         ZSemaphore* fSem_Disposed;
00294         bool fExitedSend;
00295         bool fExitedReceive;
00296         };
00297 
00298 // =================================================================================================
00299 #pragma mark -
00300 #pragma mark * ZStreamMUX::StreamRW
00301 
00302 class ZStreamMUX::StreamRW  : public ZStreamR, public ZStreamW
00303         {
00304 public:
00305         StreamRW(ZStreamMUX& iStreamMUX, SessionID iSessionID);
00306         ~StreamRW();
00307 
00308 // From ZStreamR
00309         virtual void Imp_Read(void* iDest, size_t iCount, size_t* oCountRead);
00310         virtual size_t Imp_CountReadable();
00311 
00312 // From ZStreamW
00313         virtual void Imp_Write(const void* iSource, size_t iCount, size_t* oCountWritten);
00314 
00315 private:
00316         ZStreamMUX& fStreamMUX;
00317         SessionID fSessionID;
00318         };
00319 
00320 #endif // __ZStreamMUX__

Generated on Thu Jul 26 11:21:55 2007 for ZooLib by  doxygen 1.4.7