00001
00002
00003
00004
00005
00006 #include <streams.h>
00007 #include "pullpin.h"
00008
00009 CPullPin::CPullPin()
00010 : m_pReader(NULL),
00011 m_pAlloc(NULL),
00012 m_State(TM_Exit)
00013 {
00014
00015 }
00016
00017 CPullPin::~CPullPin()
00018 {
00019 Disconnect();
00020
00021 }
00022
00023 HRESULT
00024 CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync)
00025 {
00026 CAutoLock lock(&m_AccessLock);
00027
00028 if (m_pReader) {
00029 return VFW_E_ALREADY_CONNECTED;
00030 }
00031
00032 HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader);
00033 if (FAILED(hr)) {
00034
00035 return(hr);
00036 }
00037
00038 hr = DecideAllocator(pAlloc, NULL);
00039 if (FAILED(hr)) {
00040 Disconnect();
00041
00042 return hr;
00043 }
00044
00045 LONGLONG llTotal, llAvail;
00046 hr = m_pReader->Length(&llTotal, &llAvail);
00047 if (FAILED(hr)) {
00048 Disconnect();
00049
00050 return hr;
00051 }
00052
00053
00054 m_tDuration = llTotal * UNITS;
00055 m_tStop = m_tDuration;
00056 m_tStart = 0;
00057
00058 m_bSync = bSync;
00059
00060 return S_OK;
00061 }
00062
00063 HRESULT
00064 CPullPin::Disconnect()
00065 {
00066 CAutoLock lock(&m_AccessLock);
00067
00068 StopThread();
00069
00070 if (m_pReader) {
00071 m_pReader->Release();
00072 m_pReader = NULL;
00073 }
00074
00075 if (m_pAlloc) {
00076 m_pAlloc->Release();
00077 m_pAlloc = NULL;
00078 }
00079
00080 return S_OK;
00081 }
00082
00083 HRESULT
00084 CPullPin::DecideAllocator(
00085 IMemAllocator * pAlloc,
00086 ALLOCATOR_PROPERTIES * pProps)
00087 {
00088 ALLOCATOR_PROPERTIES *pRequest;
00089 ALLOCATOR_PROPERTIES Request;
00090 if (pProps == NULL) {
00091 Request.cBuffers = 3;
00092 Request.cbBuffer = 64*1024;
00093 Request.cbAlign = 0;
00094 Request.cbPrefix = 0;
00095 pRequest = &Request;
00096 } else {
00097 pRequest = pProps;
00098 }
00099 HRESULT hr = m_pReader->RequestAllocator(
00100 pAlloc,
00101 pRequest,
00102 &m_pAlloc);
00103 return hr;
00104 }
00105
00106 HRESULT
00107 CPullPin::Active(void)
00108 {
00109 ASSERT(!ThreadExists());
00110 return StartThread();
00111 }
00112
00113 HRESULT
00114 CPullPin::Inactive(void)
00115 {
00116 StopThread();
00117
00118 return S_OK;
00119 }
00120
00121 HRESULT
00122 CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
00123 {
00124 CAutoLock lock(&m_AccessLock);
00125
00126 ThreadMsg AtStart = m_State;
00127
00128 if (AtStart == TM_Start) {
00129 BeginFlush();
00130 PauseThread();
00131 EndFlush();
00132 }
00133
00134 m_tStart = tStart;
00135 m_tStop = tStop;
00136
00137 HRESULT hr = S_OK;
00138 if (AtStart == TM_Start) {
00139 hr = StartThread();
00140 }
00141
00142 return hr;
00143 }
00144
00145 HRESULT
00146 CPullPin::Duration(REFERENCE_TIME* ptDuration)
00147 {
00148 *ptDuration = m_tDuration;
00149 return S_OK;
00150 }
00151
00152 HRESULT
00153 CPullPin::StartThread()
00154 {
00155 CAutoLock lock(&m_AccessLock);
00156
00157 if (!m_pAlloc || !m_pReader) {
00158 return E_UNEXPECTED;
00159 }
00160
00161 HRESULT hr;
00162 if (!ThreadExists()) {
00163
00164
00165 hr = m_pAlloc->Commit();
00166 if (FAILED(hr)) {
00167 return hr;
00168 }
00169
00170
00171 if (!Create()) {
00172 return E_FAIL;
00173 }
00174 }
00175
00176 m_State = TM_Start;
00177 hr = (HRESULT) CallWorker(m_State);
00178 return hr;
00179 }
00180
00181 HRESULT
00182 CPullPin::PauseThread()
00183 {
00184 CAutoLock lock(&m_AccessLock);
00185
00186 if (!ThreadExists()) {
00187 return E_UNEXPECTED;
00188 }
00189
00190
00191
00192 HRESULT hr = m_pReader->BeginFlush();
00193 if (FAILED(hr)) {
00194 return hr;
00195 }
00196
00197 m_State = TM_Pause;
00198 hr = CallWorker(TM_Pause);
00199
00200 m_pReader->EndFlush();
00201 return hr;
00202 }
00203
00204 HRESULT
00205 CPullPin::StopThread()
00206 {
00207 CAutoLock lock(&m_AccessLock);
00208
00209 if (!ThreadExists()) {
00210 return S_FALSE;
00211 }
00212
00213
00214
00215 HRESULT hr = m_pReader->BeginFlush();
00216 if (FAILED(hr)) {
00217 return hr;
00218 }
00219
00220 m_State = TM_Exit;
00221 hr = CallWorker(TM_Exit);
00222
00223 m_pReader->EndFlush();
00224
00225
00226 Close();
00227
00228
00229 if (m_pAlloc) {
00230 m_pAlloc->Decommit();
00231 }
00232
00233 return S_OK;
00234 }
00235
00236 DWORD
00237 CPullPin::ThreadProc(void)
00238 {
00239 while(1) {
00240 DWORD cmd = GetRequest();
00241 switch(cmd) {
00242 case TM_Exit:
00243 Reply(S_OK);
00244 return 0;
00245
00246 case TM_Pause:
00247
00248 Reply(S_OK);
00249 break;
00250
00251 case TM_Start:
00252 Reply(S_OK);
00253 Process();
00254 break;
00255 }
00256
00257
00258
00259
00260
00261
00262
00263 m_pReader->BeginFlush();
00264 CleanupCancelled();
00265 m_pReader->EndFlush();
00266 }
00267 }
00268
00269 HRESULT
00270 CPullPin::QueueSample(
00271 REFERENCE_TIME& tCurrent,
00272 REFERENCE_TIME tAlignStop,
00273 BOOL bDiscontinuity
00274 )
00275 {
00276 IMediaSample* pSample;
00277
00278 HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
00279 if (FAILED(hr)) {
00280 return hr;
00281 }
00282
00283 LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
00284 if (tStopThis > tAlignStop) {
00285 tStopThis = tAlignStop;
00286 }
00287 pSample->SetTime(&tCurrent, &tStopThis);
00288 tCurrent = tStopThis;
00289
00290 pSample->SetDiscontinuity(bDiscontinuity);
00291
00292 hr = m_pReader->Request(
00293 pSample,
00294 0);
00295 if (FAILED(hr)) {
00296 pSample->Release();
00297
00298 CleanupCancelled();
00299 OnError(hr);
00300 }
00301 return hr;
00302 }
00303
00304 HRESULT
00305 CPullPin::CollectAndDeliver(
00306 REFERENCE_TIME tStart,
00307 REFERENCE_TIME tStop)
00308 {
00309 IMediaSample* pSample = NULL;
00310 DWORD_PTR dwUnused;
00311 HRESULT hr = m_pReader->WaitForNext(
00312 INFINITE,
00313 &pSample,
00314 &dwUnused);
00315 if (FAILED(hr)) {
00316 if (pSample) {
00317 pSample->Release();
00318 }
00319 } else {
00320 hr = DeliverSample(pSample, tStart, tStop);
00321 }
00322 if (FAILED(hr)) {
00323 CleanupCancelled();
00324 OnError(hr);
00325 }
00326 return hr;
00327
00328 }
00329
00330 HRESULT
00331 CPullPin::DeliverSample(
00332 IMediaSample* pSample,
00333 REFERENCE_TIME tStart,
00334 REFERENCE_TIME tStop
00335 )
00336 {
00337
00338 REFERENCE_TIME t1, t2;
00339 pSample->GetTime(&t1, &t2);
00340 if (t2 > tStop) {
00341 t2 = tStop;
00342 }
00343
00344
00345 t1 -= tStart;
00346 t2 -= tStart;
00347 pSample->SetTime(&t1, &t2);
00348
00349 HRESULT hr = Receive(pSample);
00350 pSample->Release();
00351 return hr;
00352 }
00353
00354 void
00355 CPullPin::Process(void)
00356 {
00357
00358 if (m_tStop <= m_tStart) {
00359 EndOfStream();
00360 return;
00361 }
00362
00363 BOOL bDiscontinuity = TRUE;
00364
00365
00366
00367
00368 ALLOCATOR_PROPERTIES Actual;
00369 HRESULT hr = m_pAlloc->GetProperties(&Actual);
00370
00371
00372 REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS;
00373 REFERENCE_TIME tCurrent = tStart;
00374
00375 REFERENCE_TIME tStop = m_tStop;
00376 if (tStop > m_tDuration) {
00377 tStop = m_tDuration;
00378 }
00379
00380
00381
00382 REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
00383
00384 DWORD dwRequest;
00385
00386 if (!m_bSync) {
00387
00388
00389
00390 while (tCurrent < tAlignStop) {
00391
00392
00393
00394 if (CheckRequest(&dwRequest)) {
00395 return;
00396 }
00397
00398
00399 if (Actual.cBuffers > 1) {
00400
00401 hr = QueueSample(tCurrent, tAlignStop, TRUE);
00402 bDiscontinuity = FALSE;
00403
00404 if (FAILED(hr)) {
00405 return;
00406 }
00407 }
00408
00409
00410 while (tCurrent < tAlignStop) {
00411
00412 hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity);
00413 bDiscontinuity = FALSE;
00414
00415 if (FAILED(hr)) {
00416 return;
00417 }
00418
00419 hr = CollectAndDeliver(tStart, tStop);
00420 if (S_OK != hr) {
00421
00422
00423
00424 return;
00425 }
00426 }
00427
00428 if (Actual.cBuffers > 1) {
00429 hr = CollectAndDeliver(tStart, tStop);
00430 if (FAILED(hr)) {
00431 return;
00432 }
00433 }
00434 }
00435 } else {
00436
00437
00438 while (tCurrent < tAlignStop) {
00439
00440
00441
00442 if (CheckRequest(&dwRequest)) {
00443 return;
00444 }
00445
00446 IMediaSample* pSample;
00447
00448 hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
00449 if (FAILED(hr)) {
00450 OnError(hr);
00451 return;
00452 }
00453
00454 LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
00455 if (tStopThis > tAlignStop) {
00456 tStopThis = tAlignStop;
00457 }
00458 pSample->SetTime(&tCurrent, &tStopThis);
00459 tCurrent = tStopThis;
00460
00461 if (bDiscontinuity) {
00462 pSample->SetDiscontinuity(TRUE);
00463 bDiscontinuity = FALSE;
00464 }
00465
00466 hr = m_pReader->SyncReadAligned(pSample);
00467
00468 if (FAILED(hr)) {
00469 pSample->Release();
00470 OnError(hr);
00471 return;
00472 }
00473
00474 hr = DeliverSample(pSample, tStart, tStop);
00475 if (hr != S_OK) {
00476 if (FAILED(hr)) {
00477 OnError(hr);
00478 }
00479 return;
00480 }
00481 }
00482 }
00483
00484 EndOfStream();
00485 }
00486
00487 void
00488 CPullPin::CleanupCancelled(void)
00489 {
00490 while (1) {
00491 IMediaSample * pSample;
00492 DWORD_PTR dwUnused;
00493
00494 HRESULT hr = m_pReader->WaitForNext(
00495 0,
00496 &pSample,
00497 &dwUnused);
00498 if(pSample) {
00499 pSample->Release();
00500 } else {
00501
00502 return;
00503 }
00504 }
00505 }