00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef __ZStreamMUX__
00026 #define __ZStreamMUX__ 1
00027
00028 #include "zconfig.h"
00029 #include "ZStream.h"
00030 #include "ZThread.h"
00031
00032 #include <deque>
00033 #include <string>
00034
00035
00036 #pragma mark -
00037 #pragma mark * ZStreamMUX
00038
00039 class ZStreamMUX
00040 {
00041 public:
00042 typedef uint16 SessionID;
00043 static const SessionID kInvalidSessionID = 0;
00044
00045 enum Error
00046 {
00048 errorNone,
00049
00051 errorAbortLocal,
00052
00054 errorAbortRemote,
00055
00057 errorSendClosed,
00058
00060 errorReceiveClosed,
00061
00063 errorInvalidSession,
00064
00066 errorAlreadyListening,
00067
00069 errorNotListening,
00070
00072 errorBadListenName,
00073
00075 errorConnectionDied,
00076
00078 errorDisposed,
00079
00081 waitPending
00082 };
00083
00084 class StreamRW;
00085
00086 ZStreamMUX(const ZStreamR& iStreamR, const ZStreamW& iStreamW);
00087 ~ZStreamMUX();
00088
00089 void Startup();
00090 void Shutdown();
00091
00092 Error Listen(const std::string& iListenName);
00093 Error StopListen(const std::string& iListenName);
00094 Error Accept(const std::string& iListenName, SessionID& oSessionID);
00095
00096 Error Open(const std::string& iListenName, const void* iData, size_t iCount,
00097 size_t* oCountSent, SessionID& oSessionID);
00098 Error Close(SessionID iSessionID);
00099 Error Abort(SessionID iSessionID);
00100
00101 Error Send(SessionID iSessionID, const void* iSource, size_t iCount, size_t* oCountSent);
00102 Error Receive(SessionID iSessionID, void* iDest, size_t iCount, size_t* oCountReceived);
00103 Error CountReceiveable(SessionID iSessionID, size_t& oCountReceivable);
00104
00105 void SetMaximumFragmentSize(size_t iSize);
00106
00107 private:
00108 typedef uint16 PrivID;
00109 struct SessionCB;
00110
00111 void RunSend();
00112 static void sRunSend(ZStreamMUX* iStreamMUX);
00113
00114 bool HandleSend_Open(ZMutexLocker& iLocker);
00115 bool HandleSend_RST(ZMutexLocker& iLocker);
00116 bool HandleSend_SYNACK(ZMutexLocker& iLocker);
00117 bool HandleSend_Credit(ZMutexLocker& iLocker);
00118 bool HandleSend_Data(ZMutexLocker& iLocker);
00119
00120 void RunReceive();
00121 static void sRunReceive(ZStreamMUX* iStreamMUX);
00122
00123 void HandleReceive_Payload(ZMutexLocker& iLocker, SessionCB* iSession, size_t iPayloadLength);
00124 void HandleReceive_Credit(bool iIsNear, PrivID iPrivID, size_t iCredit);
00125 void HandleReceive_SYN(PrivID iPrivID, uint8 iListenerID, size_t iPayloadLength);
00126 void HandleReceive_SYNACK(PrivID iPrivID);
00127 void HandleReceive_FIN(bool iIsNear, PrivID iPrivID);
00128 void HandleReceive_RST(bool iIsNear, PrivID iPrivID);
00129 void HandleReceive_DefineListener(uint8 iListenerID, size_t iListenNameLength);
00130
00131 void WriteFragment_SYNACK(PrivID iPrivID);
00132 void WriteFragment_FIN(bool iIsNear, PrivID iPrivID);
00133 void WriteFragment_RST(bool iIsNear, PrivID iPrivID);
00134 void WriteFragment_Credit(bool iIsNear, PrivID iPrivID, size_t iSentCredit);
00135 void WriteFragment_Payload(bool iIsNear, PrivID iPrivID, const void* iSource, size_t iLength);
00136 void WriteFragment_DefineListener(uint8 iListenerID, const std::string& iListenName);
00137
00138 void CopyDataIn(ZMutexLocker& iLocker, SessionCB* iSession,
00139 const void* iSource, size_t iCount, size_t& oCount);
00140 void CopyDataOut(SessionCB* iSession, void* iDest, size_t iCount, size_t& oCount);
00141
00142 void AbortSession(SessionCB* iSession);
00143
00144 SessionCB* UseSession(bool iIsNear, PrivID iPrivID);
00145 SessionCB* UseSession(SessionCB* iSession);
00146 void ReleaseSession(SessionCB* iSession);
00147
00148 static SessionID sMakeSessionID(bool iIsNear, PrivID iPrivID);
00149 static void sMakeSessionID(PrivID iPrivID, bool iIsNear);
00150 static void sMakePrivID(SessionID iSessionID, bool& oIsNear, PrivID& oPrivID);
00151 static const PrivID kMaxPrivID = 0x1FF;
00152
00153 const ZStreamR& fStreamR;
00154 const ZStreamW& fStreamW;
00155 size_t fDefaultCredit;
00156 size_t fMaxFragmentSize;
00157
00158 struct SessionCB;
00159
00160 struct Waiter_Send
00161 {
00162 const uint8* fSource;
00163 size_t fCount;
00164 Error fResult;
00165 Waiter_Send* fNext;
00166 };
00167
00168 struct Waiter_Receive
00169 {
00170 uint8* fDest;
00171 size_t fCount;
00172 Error fResult;
00173 Waiter_Receive* fNext;
00174 };
00175
00176 struct Waiter_Open
00177 {
00178 std::string fListenName;
00179 SessionCB* fSession;
00180 const uint8* fSource;
00181 size_t fCount;
00182 Error fResult;
00183 Waiter_Open* fNext;
00184 };
00185
00186 struct Waiter_RST
00187 {
00188 SessionCB* fSession;
00189 bool fDoneIt;
00190 Waiter_RST* fNext;
00191 };
00192
00193 struct Waiter_Accept
00194 {
00195 SessionCB* fSession;
00196 Error fResult;
00197 Waiter_Accept* fNext;
00198 };
00199
00200 struct SYNReceived
00201 {
00202 std::string fListenName;
00203 PrivID fID;
00204 uint8* fBuffer;
00205 size_t fBufferSize;
00206 SYNReceived* fNext;
00207 };
00208
00209 struct ActiveListen
00210 {
00211 std::string fListenName;
00212 std::deque<SessionCB*> fPendingSessions;
00213 Waiter_Accept* fWaiters_Head;
00214 Waiter_Accept* fWaiters_Tail;
00215 ActiveListen* fNext;
00216 };
00217
00220 struct SessionCB
00221 {
00222 SessionCB(bool iIsNear, PrivID iPrivID, size_t iCreditSend, size_t iReceiveBufferSize);
00223 ~SessionCB();
00224
00225 bool fIsNear;
00226 PrivID fID;
00227 size_t fUseCount;
00228
00229 SessionCB* fPrev;
00230 SessionCB* fNext;
00231
00232 bool fAborted;
00233
00234 size_t fCredit_Send;
00235 bool fOpen_Send;
00236 bool fSentFIN;
00237 Waiter_Send* fWaiters_Send_Head;
00238 Waiter_Send* fWaiters_Send_Tail;
00239 ZCondition fCondition_Send;
00240
00241 size_t fCredit_Receive;
00242 bool fOpen_Receive;
00243 Waiter_Receive* fWaiters_Receive_Head;
00244 Waiter_Receive* fWaiters_Receive_Tail;
00245 ZCondition fCondition_Receive;
00246
00247 uint8* fReceiveBuffer;
00248 size_t fReceiveBufferSize;
00249 size_t fReceiveBufferFeedIn;
00250 size_t fReceiveBufferFeedOut;
00251
00252 size_t fTotalSent;
00253 size_t fTotalReceived;
00254 };
00255
00256 struct ListenName
00257 {
00258 std::string fName;
00259 uint8 fID;
00260 ListenName* fNext;
00261 };
00262
00263 ZCondition fCondition_Send;
00264 ZCondition fCondition_Open;
00265 ZCondition fCondition_Accept;
00266
00267 ZMutex fMutex;
00268
00269 SessionCB* fSessions_Head;
00270 SessionCB* fSessions_Tail;
00271 SessionCB* fSessionNextSend;
00272
00273 SYNReceived* fPendingSYN_Head;
00274 SYNReceived* fPendingSYN_Tail;
00275
00276 Waiter_Open* fWaiters_Open_Head;
00277 Waiter_Open* fWaiters_Open_Tail;
00278 Waiter_Open* fWaiters_OpenSYNSent;
00279
00280 Waiter_RST* fWaiters_RST_Head;
00281 Waiter_RST* fWaiters_RST_Tail;
00282
00283 ActiveListen* fActiveListens;
00284
00285 ActiveListen* fActiveListens_RST;
00286
00287 Waiter_Accept* fWaiters_Accept_Head;
00288 Waiter_Accept* fWaiters_Accept_Tail;
00289
00290 ListenName* fListenNames_Send;
00291 ListenName* fListenNames_Receive;
00292
00293 ZSemaphore* fSem_Disposed;
00294 bool fExitedSend;
00295 bool fExitedReceive;
00296 };
00297
00298
00299 #pragma mark -
00300 #pragma mark * ZStreamMUX::StreamRW
00301
00302 class ZStreamMUX::StreamRW : public ZStreamR, public ZStreamW
00303 {
00304 public:
00305 StreamRW(ZStreamMUX& iStreamMUX, SessionID iSessionID);
00306 ~StreamRW();
00307
00308
00309 virtual void Imp_Read(void* iDest, size_t iCount, size_t* oCountRead);
00310 virtual size_t Imp_CountReadable();
00311
00312
00313 virtual void Imp_Write(const void* iSource, size_t iCount, size_t* oCountWritten);
00314
00315 private:
00316 ZStreamMUX& fStreamMUX;
00317 SessionID fSessionID;
00318 };
00319
00320 #endif // __ZStreamMUX__