00001
00002
00003
00004
00005
00006 #include <streams.h>
00007
00008 COutputQueue::COutputQueue(
00009 IPin *pInputPin,
00010 HRESULT *phr,
00011 BOOL bAuto,
00012 BOOL bQueue,
00013 LONG lBatchSize,
00014 BOOL bBatchExact,
00015 LONG lListSize,
00016 DWORD dwPriority,
00017 bool bFlushingOpt
00018 ) : m_lBatchSize(lBatchSize),
00019 m_bBatchExact(bBatchExact && (lBatchSize > 1)),
00020 m_hThread(NULL),
00021 m_hSem(NULL),
00022 m_List(NULL),
00023 m_pPin(pInputPin),
00024 m_ppSamples(NULL),
00025 m_lWaiting(0),
00026 m_pInputPin(NULL),
00027 m_bSendAnyway(FALSE),
00028 m_nBatched(0),
00029 m_bFlushing(FALSE),
00030 m_bFlushed(TRUE),
00031 m_bFlushingOpt(bFlushingOpt),
00032 m_bTerminate(FALSE),
00033 m_hEventPop(NULL),
00034 m_hr(S_OK)
00035 {
00036 ASSERT(m_lBatchSize > 0);
00037
00038 if (FAILED(*phr)) {
00039 return;
00040 }
00041
00042
00043
00044 *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
00045 if (FAILED(*phr)) {
00046 return;
00047 }
00048
00049
00050
00051 if (bAuto) {
00052 HRESULT hr = m_pInputPin->ReceiveCanBlock();
00053 if (SUCCEEDED(hr)) {
00054 bQueue = hr == S_OK;
00055 }
00056 }
00057
00058
00059
00060 m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
00061 if (m_ppSamples == NULL) {
00062 *phr = E_OUTOFMEMORY;
00063 return;
00064 }
00065
00066
00067
00068 if (bQueue) {
00069 DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
00070 m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
00071 if (m_hSem == NULL) {
00072 DWORD dwError = GetLastError();
00073 *phr = AmHresultFromWin32(dwError);
00074 return;
00075 }
00076 m_List = new CSampleList(NAME("Sample Queue List"),
00077 lListSize,
00078 FALSE
00079 );
00080 if (m_List == NULL) {
00081 *phr = E_OUTOFMEMORY;
00082 return;
00083 }
00084
00085 DWORD dwThreadId;
00086 m_hThread = CreateThread(NULL,
00087 0,
00088 InitialThreadProc,
00089 (LPVOID)this,
00090 0,
00091 &dwThreadId);
00092 if (m_hThread == NULL) {
00093 DWORD dwError = GetLastError();
00094 *phr = AmHresultFromWin32(dwError);
00095 return;
00096 }
00097 SetThreadPriority(m_hThread, dwPriority);
00098 } else {
00099 DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
00100 }
00101 }
00102
00103 COutputQueue::~COutputQueue()
00104 {
00105 DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
00106
00107 if (m_pInputPin != NULL) {
00108 m_pInputPin->Release();
00109 }
00110 if (m_hThread != NULL) {
00111 {
00112 CAutoLock lck(this);
00113 m_bTerminate = TRUE;
00114 m_hr = S_FALSE;
00115 NotifyThread();
00116 }
00117 DbgWaitForSingleObject(m_hThread);
00118 EXECUTE_ASSERT(CloseHandle(m_hThread));
00119
00120
00121
00122 ASSERT(m_List->GetCount() == 0);
00123 delete m_List;
00124 } else {
00125 FreeSamples();
00126 }
00127 if (m_hSem != NULL) {
00128 EXECUTE_ASSERT(CloseHandle(m_hSem));
00129 }
00130 delete [] m_ppSamples;
00131 }
00132
00133 DWORD WINAPI COutputQueue::InitialThreadProc(LPVOID pv)
00134 {
00135 HRESULT hrCoInit = CAMThread::CoInitializeHelper();
00136
00137 COutputQueue *pSampleQueue = (COutputQueue *)pv;
00138 DWORD dwReturn = pSampleQueue->ThreadProc();
00139
00140 if(hrCoInit == S_OK) {
00141 CoUninitialize();
00142 }
00143
00144 return dwReturn;
00145 }
00146
00147 DWORD COutputQueue::ThreadProc()
00148 {
00149 while (TRUE) {
00150 BOOL bWait = FALSE;
00151 IMediaSample *pSample;
00152 LONG lNumberToSend;
00153 NewSegmentPacket* ppacket;
00154
00155
00156
00157
00158
00159
00160 {
00161 CAutoLock lck(this);
00162 while (TRUE) {
00163
00164 if (m_bTerminate) {
00165 FreeSamples();
00166 return 0;
00167 }
00168 if (m_bFlushing) {
00169 FreeSamples();
00170 SetEvent(m_evFlushComplete);
00171 }
00172
00173
00174
00175 pSample = m_List->RemoveHead();
00176
00177 if (m_hEventPop) {
00178
00179 SetEvent(m_hEventPop);
00180 }
00181
00182 if (pSample != NULL &&
00183 !IsSpecialSample(pSample)) {
00184
00185
00186
00187
00188 m_ppSamples[m_nBatched++] = pSample;
00189 if (m_nBatched == m_lBatchSize) {
00190 break;
00191 }
00192 } else {
00193
00194
00195
00196
00197
00198 if (pSample == NULL &&
00199 (m_bBatchExact || m_nBatched == 0)) {
00200
00201
00202
00203
00204 ASSERT(m_lWaiting == 0);
00205 m_lWaiting++;
00206 bWait = TRUE;
00207 } else {
00208
00209
00210
00211
00212 if (pSample == SEND_PACKET && m_nBatched == 0) {
00213 continue;
00214 }
00215
00216 if (pSample == NEW_SEGMENT) {
00217
00218
00219 ppacket = (NewSegmentPacket *) m_List->RemoveHead();
00220
00221 if (m_hEventPop) {
00222
00223 SetEvent(m_hEventPop);
00224 }
00225
00226 ASSERT(ppacket);
00227 }
00228
00229
00230 }
00231 break;
00232 }
00233 }
00234 if (!bWait) {
00235
00236
00237 lNumberToSend = m_nBatched;
00238 m_nBatched = 0;
00239 }
00240 }
00241
00242
00243
00244 if (bWait) {
00245 DbgWaitForSingleObject(m_hSem);
00246 continue;
00247 }
00248
00249
00250
00251
00252
00253
00254
00255 if (lNumberToSend != 0) {
00256 long nProcessed;
00257 if (m_hr == S_OK) {
00258 ASSERT(!m_bFlushed);
00259 HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
00260 lNumberToSend,
00261 &nProcessed);
00262
00263 CAutoLock lck(this);
00264 if (m_hr == S_OK) {
00265 m_hr = hr;
00266 }
00267 ASSERT(!m_bFlushed);
00268 }
00269 while (lNumberToSend != 0) {
00270 m_ppSamples[--lNumberToSend]->Release();
00271 }
00272 if (m_hr != S_OK) {
00273
00274
00275
00276
00277 DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
00278 m_hr));
00279 }
00280 }
00281
00282
00283
00284 if (pSample == EOS_PACKET) {
00285
00286
00287
00288
00289
00290
00291
00292 if (m_hr == S_OK) {
00293 DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
00294 HRESULT hr = m_pPin->EndOfStream();
00295 if (FAILED(hr)) {
00296 DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
00297 }
00298 }
00299 }
00300
00301
00302
00303 if (pSample == RESET_PACKET) {
00304 m_hr = S_OK;
00305 SetEvent(m_evFlushComplete);
00306 }
00307
00308 if (pSample == NEW_SEGMENT) {
00309 m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
00310 delete ppacket;
00311 }
00312 }
00313 }
00314
00315 void COutputQueue::SendAnyway()
00316 {
00317 if (!IsQueued()) {
00318
00319
00320
00321 m_bSendAnyway = TRUE;
00322 LONG nProcessed;
00323 ReceiveMultiple(NULL, 0, &nProcessed);
00324 m_bSendAnyway = FALSE;
00325
00326 } else {
00327 CAutoLock lck(this);
00328 QueueSample(SEND_PACKET);
00329 NotifyThread();
00330 }
00331 }
00332
00333 void
00334 COutputQueue::NewSegment(
00335 REFERENCE_TIME tStart,
00336 REFERENCE_TIME tStop,
00337 double dRate)
00338 {
00339 if (!IsQueued()) {
00340 if (S_OK == m_hr) {
00341 if (m_bBatchExact) {
00342 SendAnyway();
00343 }
00344 m_pPin->NewSegment(tStart, tStop, dRate);
00345 }
00346 } else {
00347 if (m_hr == S_OK) {
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357 NewSegmentPacket * ppack = new NewSegmentPacket;
00358 if (ppack == NULL) {
00359 return;
00360 }
00361 ppack->tStart = tStart;
00362 ppack->tStop = tStop;
00363 ppack->dRate = dRate;
00364
00365 CAutoLock lck(this);
00366 QueueSample(NEW_SEGMENT);
00367 QueueSample( (IMediaSample*) ppack);
00368 NotifyThread();
00369 }
00370 }
00371 }
00372
00373 void COutputQueue::EOS()
00374 {
00375 CAutoLock lck(this);
00376 if (!IsQueued()) {
00377 if (m_bBatchExact) {
00378 SendAnyway();
00379 }
00380 if (m_hr == S_OK) {
00381 DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
00382 m_bFlushed = FALSE;
00383 HRESULT hr = m_pPin->EndOfStream();
00384 if (FAILED(hr)) {
00385 DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
00386 }
00387 }
00388 } else {
00389 if (m_hr == S_OK) {
00390 m_bFlushed = FALSE;
00391 QueueSample(EOS_PACKET);
00392 NotifyThread();
00393 }
00394 }
00395 }
00396
00397 void COutputQueue::BeginFlush()
00398 {
00399 if (IsQueued()) {
00400 {
00401 CAutoLock lck(this);
00402
00403
00404
00405
00406
00407
00408 m_bFlushing = TRUE;
00409
00410
00411
00412 if (m_hr == S_OK) {
00413 m_hr = S_FALSE;
00414 }
00415
00416
00417
00418 if (m_bFlushed && m_bFlushingOpt) {
00419 return;
00420 }
00421
00422
00423 m_evFlushComplete.Reset();
00424
00425 NotifyThread();
00426 }
00427
00428
00429
00430 m_pPin->BeginFlush();
00431 } else {
00432
00433 m_pPin->BeginFlush();
00434 CAutoLock lck(this);
00435
00436
00437 m_bFlushing = TRUE;
00438
00439
00440
00441 if (m_hr == S_OK) {
00442 m_hr = S_FALSE;
00443 }
00444 }
00445
00446 }
00447
00448 void COutputQueue::EndFlush()
00449 {
00450 {
00451 CAutoLock lck(this);
00452 ASSERT(m_bFlushing);
00453 if (m_bFlushingOpt && m_bFlushed && IsQueued()) {
00454 m_bFlushing = FALSE;
00455 m_hr = S_OK;
00456 return;
00457 }
00458 }
00459
00460
00461
00462
00463
00464
00465
00466 if (IsQueued()) {
00467 m_evFlushComplete.Wait();
00468 } else {
00469 FreeSamples();
00470 }
00471
00472
00473
00474
00475 m_bFlushing = FALSE;
00476 m_bFlushed = TRUE;
00477
00478
00479
00480 m_pPin->EndFlush();
00481
00482 m_hr = S_OK;
00483 }
00484
00485 void COutputQueue::QueueSample(IMediaSample *pSample)
00486 {
00487 if (NULL == m_List->AddTail(pSample)) {
00488 if (!IsSpecialSample(pSample)) {
00489 pSample->Release();
00490 }
00491 }
00492 }
00493
00494 HRESULT COutputQueue::Receive(IMediaSample *pSample)
00495 {
00496 LONG nProcessed;
00497 return ReceiveMultiple(&pSample, 1, &nProcessed);
00498 }
00499
00500 HRESULT COutputQueue::ReceiveMultiple (
00501 IMediaSample **ppSamples,
00502 long nSamples,
00503 long *nSamplesProcessed)
00504 {
00505 CAutoLock lck(this);
00506
00507
00508 if (!IsQueued()) {
00509
00510
00511
00512 if (S_OK != m_hr) {
00513
00514
00515
00516
00517
00518 ASSERT(!m_bFlushed || m_bFlushing);
00519
00520
00521 *nSamplesProcessed = 0;
00522 for (int i = 0; i < nSamples; i++) {
00523 DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
00524 nSamples, m_hr));
00525 ppSamples[i]->Release();
00526 }
00527
00528 return m_hr;
00529 }
00530
00531
00532
00533 ASSERT(!m_bFlushing);
00534 m_bFlushed = FALSE;
00535
00536 ASSERT(m_nBatched < m_lBatchSize);
00537 ASSERT(m_nBatched == 0 || m_bBatchExact);
00538
00539
00540
00541 LONG iLost = 0;
00542 for (long iDone = 0;
00543 iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);
00544 ) {
00545
00546 ASSERT(m_nBatched < m_lBatchSize);
00547 if (iDone < nSamples) {
00548 m_ppSamples[m_nBatched++] = ppSamples[iDone++];
00549 }
00550 if (m_nBatched == m_lBatchSize ||
00551 nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
00552 LONG nDone;
00553 DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
00554 m_nBatched));
00555
00556 if (m_hr == S_OK) {
00557 m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
00558 m_nBatched,
00559 &nDone);
00560 } else {
00561 nDone = 0;
00562 }
00563 iLost += m_nBatched - nDone;
00564 for (LONG i = 0; i < m_nBatched; i++) {
00565 m_ppSamples[i]->Release();
00566 }
00567 m_nBatched = 0;
00568 }
00569 }
00570 *nSamplesProcessed = iDone - iLost;
00571 if (*nSamplesProcessed < 0) {
00572 *nSamplesProcessed = 0;
00573 }
00574 return m_hr;
00575 } else {
00576
00577
00578 if (m_hr != S_OK) {
00579 *nSamplesProcessed = 0;
00580 DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
00581 nSamples, m_hr));
00582 for (int i = 0; i < nSamples; i++) {
00583 ppSamples[i]->Release();
00584 }
00585 return m_hr;
00586 }
00587 m_bFlushed = FALSE;
00588 for (long i = 0; i < nSamples; i++) {
00589 QueueSample(ppSamples[i]);
00590 }
00591 *nSamplesProcessed = nSamples;
00592 if (!m_bBatchExact ||
00593 m_nBatched + m_List->GetCount() >= m_lBatchSize) {
00594 NotifyThread();
00595 }
00596 return S_OK;
00597 }
00598 }
00599
00600 void COutputQueue::Reset()
00601 {
00602 if (!IsQueued()) {
00603 m_hr = S_OK;
00604 } else {
00605 CAutoLock lck(this);
00606 QueueSample(RESET_PACKET);
00607 NotifyThread();
00608 m_evFlushComplete.Wait();
00609 }
00610 }
00611
00612 void COutputQueue::FreeSamples()
00613 {
00614 CAutoLock lck(this);
00615 if (IsQueued()) {
00616 while (TRUE) {
00617 IMediaSample *pSample = m_List->RemoveHead();
00618
00619 if (m_hEventPop) {
00620
00621 SetEvent(m_hEventPop);
00622 }
00623
00624 if (pSample == NULL) {
00625 break;
00626 }
00627 if (!IsSpecialSample(pSample)) {
00628 pSample->Release();
00629 } else {
00630 if (pSample == NEW_SEGMENT) {
00631
00632 NewSegmentPacket *ppacket =
00633 (NewSegmentPacket *) m_List->RemoveHead();
00634
00635 if (m_hEventPop) {
00636
00637 SetEvent(m_hEventPop);
00638 }
00639
00640 ASSERT(ppacket != NULL);
00641 delete ppacket;
00642 }
00643 }
00644 }
00645 }
00646 for (int i = 0; i < m_nBatched; i++) {
00647 m_ppSamples[i]->Release();
00648 }
00649 m_nBatched = 0;
00650 }
00651
00652 void COutputQueue::NotifyThread()
00653 {
00654
00655 ASSERT(IsQueued());
00656 if (m_lWaiting) {
00657 ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
00658 m_lWaiting = 0;
00659 }
00660 }
00661
00662 BOOL COutputQueue::IsIdle()
00663 {
00664 CAutoLock lck(this);
00665
00666
00667
00668
00669
00670
00671
00672 if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
00673 return FALSE;
00674 } else {
00675
00676
00677
00678
00679 ASSERT(!IsQueued() || m_List->GetCount() == 0);
00680 return TRUE;
00681 }
00682 }
00683
00684 void COutputQueue::SetPopEvent(HANDLE hEvent)
00685 {
00686 m_hEventPop = hEvent;
00687 }