00001
00002
00003
00004
00005 #include <streams.h>
00006 #include <asyncio.h>
00007
00008 #pragma warning(disable: 4706)
00009
00010 HRESULT
00011 CAsyncRequest::Request(
00012 CAsyncIo *pIo,
00013 CAsyncStream *pStream,
00014 LONGLONG llPos,
00015 LONG lLength,
00016 BOOL bAligned,
00017 BYTE* pBuffer,
00018 LPVOID pContext,
00019 DWORD dwUser)
00020 {
00021 m_pIo = pIo;
00022 m_pStream = pStream;
00023 m_llPos = llPos;
00024 m_lLength = lLength;
00025 m_bAligned = bAligned;
00026 m_pBuffer = pBuffer;
00027 m_pContext = pContext;
00028 m_dwUser = dwUser;
00029 m_hr = VFW_E_TIMEOUT;
00030
00031 return S_OK;
00032 }
00033
00034 HRESULT
00035 CAsyncRequest::Complete()
00036 {
00037
00038 m_pStream->Lock();
00039
00040 m_hr = m_pStream->SetPointer(m_llPos);
00041 if (S_OK == m_hr) {
00042
00043 DWORD dwActual;
00044 m_hr = m_pStream->Read(m_pBuffer, m_lLength, m_bAligned, &dwActual);
00045 if (m_hr == OLE_S_FIRST) {
00046 if (m_pContext) {
00047 IMediaSample *pSample = reinterpret_cast<IMediaSample *>(m_pContext);
00048 pSample->SetDiscontinuity(TRUE);
00049 m_hr = S_OK;
00050 }
00051 }
00052 if (FAILED(m_hr)) {
00053 } else if (dwActual != (DWORD)m_lLength) {
00054
00055 m_lLength = (LONG) dwActual;
00056 m_hr = S_FALSE;
00057 } else {
00058 m_hr = S_OK;
00059 }
00060 }
00061
00062 m_pStream->Unlock();
00063 return m_hr;
00064 }
00065
00066 CAsyncIo::CAsyncIo(CAsyncStream *pStream)
00067 : m_hThread(NULL),
00068 m_evWork(TRUE),
00069 m_evDone(TRUE),
00070 m_evStop(TRUE),
00071 m_listWork(NAME("Work list")),
00072 m_listDone(NAME("Done list")),
00073 m_bFlushing(FALSE),
00074 m_cItemsOut(0),
00075 m_bWaiting(FALSE),
00076 m_pStream(pStream)
00077 {
00078
00079 }
00080
00081 CAsyncIo::~CAsyncIo()
00082 {
00083
00084 BeginFlush();
00085
00086
00087 CloseThread();
00088
00089
00090 POSITION pos = m_listDone.GetHeadPosition();
00091 while (pos) {
00092 CAsyncRequest* pRequest = m_listDone.GetNext(pos);
00093 delete pRequest;
00094 }
00095 m_listDone.RemoveAll();
00096 }
00097
00098 HRESULT
00099 CAsyncIo::AsyncActive(void)
00100 {
00101 return StartThread();
00102 }
00103
00104 HRESULT
00105 CAsyncIo::AsyncInactive(void)
00106 {
00107 return CloseThread();
00108 }
00109
00110 HRESULT
00111 CAsyncIo::Request(
00112 LONGLONG llPos,
00113 LONG lLength,
00114 BOOL bAligned,
00115 BYTE* pBuffer,
00116 LPVOID pContext,
00117 DWORD dwUser)
00118 {
00119 if (bAligned) {
00120 if (!IsAligned(llPos) ||
00121 !IsAligned(lLength) ||
00122 !IsAligned((LONG) pBuffer)) {
00123 return VFW_E_BADALIGN;
00124 }
00125 }
00126
00127 CAsyncRequest* pRequest = new CAsyncRequest;
00128
00129 HRESULT hr = pRequest->Request(
00130 this,
00131 m_pStream,
00132 llPos,
00133 lLength,
00134 bAligned,
00135 pBuffer,
00136 pContext,
00137 dwUser);
00138 if (SUCCEEDED(hr)) {
00139
00140 hr = PutWorkItem(pRequest);
00141 }
00142
00143 if (FAILED(hr)) {
00144 delete pRequest;
00145 }
00146 return hr;
00147 }
00148
00149 HRESULT
00150 CAsyncIo::WaitForNext(
00151 DWORD dwTimeout,
00152 LPVOID *ppContext,
00153 DWORD * pdwUser,
00154 LONG* pcbActual)
00155 {
00156
00157
00158 *ppContext = NULL;
00159
00160
00161
00162 for (;;) {
00163
00164 if (!m_evDone.Wait(dwTimeout)) {
00165
00166 return VFW_E_TIMEOUT;
00167 }
00168
00169
00170 CAsyncRequest* pRequest = GetDoneItem();
00171 if (pRequest) {
00172
00173
00174
00175 HRESULT hr = pRequest->GetHResult();
00176 if (hr == S_FALSE) {
00177
00178
00179
00180 if ((pRequest->GetActualLength() +
00181 pRequest->GetStart()) == Size()) {
00182 hr = S_OK;
00183 } else {
00184
00185 hr = E_FAIL;
00186 }
00187 }
00188
00189
00190 *pcbActual = pRequest->GetActualLength();
00191
00192
00193 *ppContext = pRequest->GetContext();
00194 *pdwUser = pRequest->GetUser();
00195 delete pRequest;
00196 return hr;
00197 } else {
00198
00199
00200 CAutoLock lck(&m_csLists);
00201 if (m_bFlushing && !m_bWaiting) {
00202
00203
00204
00205
00206
00207
00208 return VFW_E_WRONG_STATE;
00209 }
00210 }
00211
00212
00213
00214 }
00215 }
00216
00217 HRESULT
00218 CAsyncIo::SyncReadAligned(
00219 LONGLONG llPos,
00220 LONG lLength,
00221 BYTE* pBuffer,
00222 LONG* pcbActual,
00223 PVOID pvContext
00224 )
00225 {
00226 if (!IsAligned(llPos) ||
00227 !IsAligned(lLength) ||
00228 !IsAligned((LONG) pBuffer)) {
00229 return VFW_E_BADALIGN;
00230 }
00231
00232 CAsyncRequest request;
00233
00234 HRESULT hr = request.Request(
00235 this,
00236 m_pStream,
00237 llPos,
00238 lLength,
00239 TRUE,
00240 pBuffer,
00241 pvContext,
00242 0);
00243
00244 if (FAILED(hr)) {
00245 return hr;
00246 }
00247
00248 hr = request.Complete();
00249
00250
00251 *pcbActual = request.GetActualLength();
00252 return hr;
00253 }
00254
00255 HRESULT
00256 CAsyncIo::Length(LONGLONG *pllTotal, LONGLONG* pllAvailable)
00257 {
00258 return m_pStream->Size(pllTotal, pllAvailable);
00259 }
00260
00261 HRESULT
00262 CAsyncIo::BeginFlush()
00263 {
00264
00265 {
00266 CAutoLock lock(&m_csLists);
00267
00268
00269
00270
00271
00272 m_bFlushing = TRUE;
00273
00274 CAsyncRequest * preq;
00275 while(preq = GetWorkItem()) {
00276 preq->Cancel();
00277 PutDoneItem(preq);
00278 }
00279
00280
00281 if (m_cItemsOut > 0) {
00282
00283
00284 ASSERT(!m_bWaiting);
00285
00286
00287
00288
00289 m_bWaiting = TRUE;
00290 } else {
00291
00292
00293
00294
00295
00296
00297 m_evDone.Set();
00298 return S_OK;
00299 }
00300 }
00301
00302 ASSERT(m_bWaiting);
00303
00304
00305 for (;;) {
00306 m_evAllDone.Wait();
00307 {
00308
00309 CAutoLock lock(&m_csLists);
00310
00311 if (m_cItemsOut == 0) {
00312
00313
00314
00315 m_bWaiting = FALSE;
00316
00317
00318
00319
00320
00321 m_evDone.Set();
00322
00323 return S_OK;
00324 }
00325 }
00326 }
00327 }
00328
00329 HRESULT
00330 CAsyncIo::EndFlush()
00331 {
00332 CAutoLock lock(&m_csLists);
00333
00334 m_bFlushing = FALSE;
00335
00336 ASSERT(!m_bWaiting);
00337
00338
00339
00340 if (m_listDone.GetCount() > 0) {
00341 m_evDone.Set();
00342 } else {
00343 m_evDone.Reset();
00344 }
00345
00346 return S_OK;
00347 }
00348
00349 HRESULT
00350 CAsyncIo::StartThread(void)
00351 {
00352 if (m_hThread) {
00353 return S_OK;
00354 }
00355
00356
00357 m_evStop.Reset();
00358
00359 DWORD dwThreadID;
00360 m_hThread = CreateThread(
00361 NULL,
00362 0,
00363 InitialThreadProc,
00364 this,
00365 0,
00366 &dwThreadID);
00367 if (!m_hThread) {
00368 DWORD dwErr = GetLastError();
00369 return HRESULT_FROM_WIN32(dwErr);
00370 }
00371 return S_OK;
00372 }
00373
00374 HRESULT
00375 CAsyncIo::CloseThread(void)
00376 {
00377
00378 m_evStop.Set();
00379
00380 if (m_hThread) {
00381
00382 WaitForSingleObject(m_hThread, INFINITE);
00383 CloseHandle(m_hThread);
00384 m_hThread = NULL;
00385 }
00386 return S_OK;
00387 }
00388
00389 CAsyncRequest*
00390 CAsyncIo::GetWorkItem()
00391 {
00392 CAutoLock lck(&m_csLists);
00393
00394 CAsyncRequest * preq = m_listWork.RemoveHead();
00395
00396
00397 if (m_listWork.GetCount() == 0) {
00398 m_evWork.Reset();
00399 }
00400 return preq;
00401 }
00402
00403 CAsyncRequest*
00404 CAsyncIo::GetDoneItem()
00405 {
00406 CAutoLock lock(&m_csLists);
00407
00408 CAsyncRequest * preq = m_listDone.RemoveHead();
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420 if (m_listDone.GetCount() == 0 &&
00421 (!m_bFlushing || m_bWaiting)) {
00422 m_evDone.Reset();
00423 }
00424
00425 return preq;
00426 }
00427
00428 HRESULT
00429 CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
00430 {
00431 CAutoLock lock(&m_csLists);
00432 HRESULT hr;
00433
00434 if (m_bFlushing) {
00435 hr = VFW_E_WRONG_STATE;
00436 }
00437 else if (m_listWork.AddTail(pRequest)) {
00438
00439
00440 m_evWork.Set();
00441
00442
00443 hr = StartThread();
00444
00445 } else {
00446 hr = E_OUTOFMEMORY;
00447 }
00448 return(hr);
00449 }
00450
00451 HRESULT
00452 CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
00453 {
00454 ASSERT(CritCheckIn(&m_csLists));
00455
00456 if (m_listDone.AddTail(pRequest)) {
00457
00458
00459 m_evDone.Set();
00460 return S_OK;
00461 } else {
00462 return E_OUTOFMEMORY;
00463 }
00464 }
00465
00466 void
00467 CAsyncIo::ProcessRequests(void)
00468 {
00469
00470 CAsyncRequest * preq = NULL;
00471 for (;;) {
00472 {
00473 CAutoLock lock(&m_csLists);
00474
00475 preq = GetWorkItem();
00476 if (preq == NULL) {
00477
00478 return;
00479 }
00480
00481
00482 m_cItemsOut++;
00483
00484
00485 }
00486
00487 preq->Complete();
00488
00489
00490 {
00491 CAutoLock l(&m_csLists);
00492
00493 PutDoneItem(preq);
00494
00495 if (--m_cItemsOut == 0) {
00496 if (m_bWaiting) {
00497 m_evAllDone.Set();
00498 }
00499 }
00500 }
00501 }
00502 }
00503
00504 DWORD
00505 CAsyncIo::ThreadProc(void)
00506 {
00507 HANDLE ahev[] = {m_evStop, m_evWork};
00508
00509 for (;;) {
00510 DWORD dw = WaitForMultipleObjects(
00511 2,
00512 ahev,
00513 FALSE,
00514 INFINITE);
00515 if (dw == WAIT_OBJECT_0+1) {
00516
00517
00518 ProcessRequests();
00519 } else {
00520
00521 return 0;
00522 }
00523 }
00524 }
00525
00526 HRESULT
00527 CAsyncIo::SyncRead(
00528 LONGLONG llPos,
00529 LONG lLength,
00530 BYTE* pBuffer)
00531 {
00532 if (IsAligned(llPos) &&
00533 IsAligned(lLength) &&
00534 IsAligned((LONG) pBuffer)) {
00535 LONG cbUnused;
00536 return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL);
00537 }
00538
00539
00540
00541
00542 CAsyncRequest request;
00543
00544 HRESULT hr = request.Request(
00545 this,
00546 m_pStream,
00547 llPos,
00548 lLength,
00549 FALSE,
00550 pBuffer,
00551 NULL,
00552 0);
00553
00554 if (FAILED(hr)) {
00555 return hr;
00556 }
00557
00558 return request.Complete();
00559 }
00560
00561 HRESULT
00562 CAsyncIo::Alignment(LONG *pl)
00563 {
00564 *pl = Alignment();
00565 return S_OK;
00566 }
00567