enc-vfw: Fix several issues

- Encoding Loop no longer checks queue size, fixes an infinite loop issue.
- A lot of debug only code has been made to compile in debug only.
This commit is contained in:
Xaymar
2017-10-27 07:06:18 +02:00
parent 3ff6faeb23
commit 658063ea41
+127 -63
View File
@@ -553,20 +553,18 @@ bool VFW::Encoder::encode(struct encoder_frame *frame, struct encoder_packet *pa
namespace sc = std::chrono; namespace sc = std::chrono;
using schrc = std::chrono::high_resolution_clock; using schrc = std::chrono::high_resolution_clock;
size_t queueSize = 0;
{
std::unique_lock<std::mutex> ulock(m_finalPacketsLock);
queueSize = m_finalPackets.size();
}
bool submittedFrame = false; bool submittedFrame = false;
long long maxTime = size_t((double_t(m_fpsNum) / double_t(m_fpsDen)) * 1000000000); long long maxTime = size_t((double_t(m_fpsNum) / double_t(m_fpsDen)) * 1000000000);
while (((*received_packet == false && queueSize >= m_latency) || (submittedFrame == false)) while (((*received_packet == false) || (submittedFrame == false))
&& (sc::nanoseconds((schrc::now() - tbegin)).count() < maxTime)) { && (sc::nanoseconds((schrc::now() - tbegin)).count() < maxTime)) {
// Submit frame to PreProcessor // Submit frame to PreProcessor
if (!submittedFrame) { if (!submittedFrame) {
std::unique_lock<std::mutex> ulock(m_preProcessData.lock); std::unique_lock<std::mutex> ulock(m_preProcessData.lock);
if (m_preProcessData.data.size() < m_maxQueueSize) { std::unique_lock<std::mutex> elock(m_encodeData.lock);
std::unique_lock<std::mutex> plock(m_postProcessData.lock);
if ((m_preProcessData.data.size() < m_maxQueueSize)
&& (m_encodeData.data.size() < m_maxQueueSize)
&& (m_postProcessData.data.size() < m_maxQueueSize)) {
m_preProcessData.data.push(std::make_tuple( m_preProcessData.data.push(std::make_tuple(
std::make_shared<std::vector<char>>(frame->data[0], frame->data[0] + (frame->linesize[0] * this->m_height)), std::make_shared<std::vector<char>>(frame->data[0], frame->data[0] + (frame->linesize[0] * this->m_height)),
frame->pts, frame->pts,
@@ -588,8 +586,10 @@ bool VFW::Encoder::encode(struct encoder_frame *frame, struct encoder_packet *pa
packet->keyframe = std::get<2>(front); packet->keyframe = std::get<2>(front);
*received_packet = true; *received_packet = true;
m_finalPackets.pop(); m_finalPackets.pop();
#ifdef _DEBUG
PLOG_DEBUG("<%s> PTS: %" PRIu32 ", DTS: %" PRIu32 ", Keyframe: %s, Size: %" PRIu32, PLOG_DEBUG("<%s> PTS: %" PRIu32 ", DTS: %" PRIu32 ", Keyframe: %s, Size: %" PRIu32,
myInfo->Name.c_str(), packet->pts, packet->dts, packet->keyframe ? "Yes" : "No", packet->size); myInfo->Name.c_str(), packet->pts, packet->dts, packet->keyframe ? "Yes" : "No", packet->size);
#endif
} }
} }
@@ -673,12 +673,16 @@ void VFW::Encoder::threadLocal(int32_t flag) {
} }
void VFW::Encoder::preProcessLocal(std::unique_lock<std::mutex>& ul) { void VFW::Encoder::preProcessLocal(std::unique_lock<std::mutex>& ul) {
#ifdef _DEBUG
auto total_start = std::chrono::high_resolution_clock::now(); auto total_start = std::chrono::high_resolution_clock::now();
#endif
auto kv = m_preProcessData.data.front(); auto kv = m_preProcessData.data.front();
ul.unlock(); ul.unlock();
#ifdef _DEBUG
auto invert_start = std::chrono::high_resolution_clock::now(); auto invert_start = std::chrono::high_resolution_clock::now();
#endif
std::shared_ptr<std::vector<char>> inbuf = std::get<0>(kv); std::shared_ptr<std::vector<char>> inbuf = std::get<0>(kv);
std::shared_ptr<std::vector<char>> outbuf = inbuf;// std::make_shared<std::vector<char>>(inbuf->size()); std::shared_ptr<std::vector<char>> outbuf = inbuf;// std::make_shared<std::vector<char>>(inbuf->size());
@@ -693,23 +697,31 @@ void VFW::Encoder::preProcessLocal(std::unique_lock<std::mutex>& ul) {
std::memcpy(outbuf->data() + front, inbuf->data() + back, lineSize); std::memcpy(outbuf->data() + front, inbuf->data() + back, lineSize);
std::memcpy(outbuf->data() + back, tempBuf.data(), lineSize); std::memcpy(outbuf->data() + back, tempBuf.data(), lineSize);
} }
#ifdef _DEBUG
auto invert_end = std::chrono::high_resolution_clock::now(); auto invert_end = std::chrono::high_resolution_clock::now();
#endif
auto wait_start = std::chrono::high_resolution_clock::now(); //#ifdef _DEBUG
// Do not fill queue if it is > latency. // auto wait_start = std::chrono::high_resolution_clock::now();
size_t queueSize = m_maxQueueSize; //#endif
while (queueSize >= m_maxQueueSize) { // // Do not fill queue if it is > latency.
{ // size_t queueSize = m_maxQueueSize;
std::unique_lock<std::mutex> elock(m_encodeData.lock); // while (queueSize >= m_maxQueueSize) {
queueSize = m_encodeData.data.size(); // {
} // std::unique_lock<std::mutex> elock(m_encodeData.lock);
// queueSize = m_encodeData.data.size();
if (queueSize >= m_maxQueueSize) // }
std::this_thread::sleep_for(std::chrono::milliseconds(1)); //
} // if (queueSize >= m_maxQueueSize)
auto wait_end = std::chrono::high_resolution_clock::now(); // std::this_thread::sleep_for(std::chrono::milliseconds(1));
// }
//#ifdef _DEBUG
// auto wait_end = std::chrono::high_resolution_clock::now();
//#endif
#ifdef _DEBUG
auto queue_start = std::chrono::high_resolution_clock::now(); auto queue_start = std::chrono::high_resolution_clock::now();
#endif
{ {
std::unique_lock<std::mutex> plock(m_preProcessData.lock); std::unique_lock<std::mutex> plock(m_preProcessData.lock);
std::unique_lock<std::mutex> elock(m_encodeData.lock); std::unique_lock<std::mutex> elock(m_encodeData.lock);
@@ -717,41 +729,55 @@ void VFW::Encoder::preProcessLocal(std::unique_lock<std::mutex>& ul) {
m_encodeData.cv.notify_all(); m_encodeData.cv.notify_all();
m_preProcessData.data.pop(); m_preProcessData.data.pop();
} }
#ifdef _DEBUG
auto queue_end = std::chrono::high_resolution_clock::now(); auto queue_end = std::chrono::high_resolution_clock::now();
#endif
ul.lock(); ul.lock();
#ifdef _DEBUG
auto total_end = std::chrono::high_resolution_clock::now(); auto total_end = std::chrono::high_resolution_clock::now();
#endif
#ifdef _DEBUG
auto time_total = std::chrono::duration_cast<std::chrono::nanoseconds>(total_end - total_start); auto time_total = std::chrono::duration_cast<std::chrono::nanoseconds>(total_end - total_start);
auto time_invert = std::chrono::duration_cast<std::chrono::nanoseconds>(invert_end - invert_start); auto time_invert = std::chrono::duration_cast<std::chrono::nanoseconds>(invert_end - invert_start);
auto time_wait = std::chrono::duration_cast<std::chrono::nanoseconds>(wait_end - wait_start); auto time_wait = std::chrono::duration_cast<std::chrono::nanoseconds>(wait_end - wait_start);
auto time_queue = std::chrono::duration_cast<std::chrono::nanoseconds>(queue_end - queue_start); auto time_queue = std::chrono::duration_cast<std::chrono::nanoseconds>(queue_end - queue_start);
//PLOG_INFO("[Thread PrePro] Frame %" PRId64 ": " #endif
// "Total: %" PRId64 "ns, " #ifdef _DEBUG
// "Invert: %" PRId64 "ns, " PLOG_INFO("[Thread PrePro] Frame %" PRId64 ": "
// "Wait: %" PRId64 "ns, " "Total: %" PRId64 "ns, "
// "Queue: %" PRId64 "ns", "Invert: %" PRId64 "ns, "
// std::get<1>(kv), "Wait: %" PRId64 "ns, "
// time_total.count(), "Queue: %" PRId64 "ns",
// time_invert.count(), std::get<1>(kv),
// time_wait.count(), time_total.count(),
// time_queue.count()); time_invert.count(),
time_wait.count(),
time_queue.count());
#endif
} }
void VFW::Encoder::encodeLocal(std::unique_lock<std::mutex>& ul) { void VFW::Encoder::encodeLocal(std::unique_lock<std::mutex>& ul) {
#ifdef _DEBUG
auto total_start = std::chrono::high_resolution_clock::now(); auto total_start = std::chrono::high_resolution_clock::now();
#endif
auto kv = m_encodeData.data.front(); auto kv = m_encodeData.data.front();
ul.unlock(); ul.unlock();
#ifdef _DEBUG
auto encode_start = std::chrono::high_resolution_clock::now(); auto encode_start = std::chrono::high_resolution_clock::now();
#endif
bool isKeyframe = false; bool isKeyframe = false;
bool makeKeyframe = (m_keyframeInterval > 0) && ((std::get<1>(kv) % m_keyframeInterval) == 0); bool makeKeyframe = (m_keyframeInterval > 0) && ((std::get<1>(kv) % m_keyframeInterval) == 0);
std::shared_ptr<std::vector<char>> inbuf = std::get<0>(kv); std::shared_ptr<std::vector<char>> inbuf = std::get<0>(kv);
std::shared_ptr<std::vector<char>> outbuf = std::make_shared<std::vector<char>>(m_bufferOutput.size()); std::shared_ptr<std::vector<char>> outbuf = std::make_shared<std::vector<char>>(m_bufferOutput.size());
if (m_useNormalCompress) { if (m_useNormalCompress) {
DWORD dwFlags = 0, cwCompFlags = 0; DWORD dwFlags = 0, cwCompFlags = 0;
#ifdef _DEBUG
PLOG_DEBUG("<%s:Normal> PTS: %" PRIu32 ", Keyframe: %s", myInfo->Name.c_str(), std::get<1>(kv), makeKeyframe ? "Yes" : "No"); PLOG_DEBUG("<%s:Normal> PTS: %" PRIu32 ", Keyframe: %s", myInfo->Name.c_str(), std::get<1>(kv), makeKeyframe ? "Yes" : "No");
#endif
LRESULT err = ICCompress(hIC, LRESULT err = ICCompress(hIC,
makeKeyframe ? ICCOMPRESS_KEYFRAME : 0, makeKeyframe ? ICCOMPRESS_KEYFRAME : 0,
&(m_outputBitmapInfo->bmiHeader), outbuf->data(), &(m_outputBitmapInfo->bmiHeader), outbuf->data(),
@@ -771,15 +797,19 @@ void VFW::Encoder::encodeLocal(std::unique_lock<std::mutex>& ul) {
// Swap Buffers // Swap Buffers
m_bufferPrevInput.swap(m_bufferInput); m_bufferPrevInput.swap(m_bufferInput);
#ifdef _DEBUG
PLOG_DEBUG("<%s:Normal> PTS: %" PRIu32 ", Keyframe: %s, Size: %" PRIu32, PLOG_DEBUG("<%s:Normal> PTS: %" PRIu32 ", Keyframe: %s, Size: %" PRIu32,
myInfo->Name.c_str(), std::get<1>(kv), isKeyframe ? "Yes" : "No", outbuf->size()); myInfo->Name.c_str(), std::get<1>(kv), isKeyframe ? "Yes" : "No", outbuf->size());
#endif
} else { } else {
PLOG_ERROR("Unable to encode: %s.", FormattedICCError(err).c_str()); PLOG_ERROR("Unable to encode: %s.", FormattedICCError(err).c_str());
} }
} else { } else {
BOOL keyframe; LONG plSize = (LONG)inbuf->size(); BOOL keyframe; LONG plSize = (LONG)inbuf->size();
#ifdef _DEBUG
PLOG_DEBUG("<%s:Sequential> PTS: %" PRIu32 ", Keyframe: %s", PLOG_DEBUG("<%s:Sequential> PTS: %" PRIu32 ", Keyframe: %s",
myInfo->Name.c_str(), std::get<1>(kv), makeKeyframe ? "Yes" : "No"); myInfo->Name.c_str(), std::get<1>(kv), makeKeyframe ? "Yes" : "No");
#endif
LPVOID fptr = ICSeqCompressFrame( LPVOID fptr = ICSeqCompressFrame(
&cv, &cv,
makeKeyframe ? 1 : 0, makeKeyframe ? 1 : 0,
@@ -793,27 +823,37 @@ void VFW::Encoder::encodeLocal(std::unique_lock<std::mutex>& ul) {
std::memcpy(outbuf->data(), fptr, outbuf->size()); std::memcpy(outbuf->data(), fptr, outbuf->size());
isKeyframe = keyframe != 0; isKeyframe = keyframe != 0;
#ifdef _DEBUG
PLOG_DEBUG("<%s:Sequential> PTS: %" PRIu32 ", Keyframe: %s, Size: %" PRIu32, PLOG_DEBUG("<%s:Sequential> PTS: %" PRIu32 ", Keyframe: %s, Size: %" PRIu32,
myInfo->Name.c_str(), std::get<1>(kv), isKeyframe ? "Yes" : "No", outbuf->size()); myInfo->Name.c_str(), std::get<1>(kv), isKeyframe ? "Yes" : "No", outbuf->size());
#endif
} }
} }
isKeyframe = m_forceKeyframes ? makeKeyframe || isKeyframe : isKeyframe; isKeyframe = m_forceKeyframes ? makeKeyframe || isKeyframe : isKeyframe;
#ifdef _DEBUG
auto encode_end = std::chrono::high_resolution_clock::now(); auto encode_end = std::chrono::high_resolution_clock::now();
#endif
auto wait_start = std::chrono::high_resolution_clock::now(); //#ifdef _DEBUG
// Do not fill queue if it is > latency. // auto wait_start = std::chrono::high_resolution_clock::now();
size_t queueSize = m_maxQueueSize; //#endif
while (queueSize >= m_maxQueueSize) { // // Do not fill queue if it is > latency.
{ // size_t queueSize = m_maxQueueSize;
std::unique_lock<std::mutex> elock(m_postProcessData.lock); // while (queueSize >= m_maxQueueSize) {
queueSize = m_postProcessData.data.size(); // {
} // std::unique_lock<std::mutex> elock(m_postProcessData.lock);
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // queueSize = m_postProcessData.data.size();
} // }
auto wait_end = std::chrono::high_resolution_clock::now(); // std::this_thread::sleep_for(std::chrono::milliseconds(1));
// }
//#ifdef _DEBUG
// auto wait_end = std::chrono::high_resolution_clock::now();
//#endif
#ifdef _DEBUG
auto queue_start = std::chrono::high_resolution_clock::now(); auto queue_start = std::chrono::high_resolution_clock::now();
#endif
{ {
std::unique_lock<std::mutex> elock(m_encodeData.lock); std::unique_lock<std::mutex> elock(m_encodeData.lock);
std::unique_lock<std::mutex> plock(m_postProcessData.lock); std::unique_lock<std::mutex> plock(m_postProcessData.lock);
@@ -821,25 +861,31 @@ void VFW::Encoder::encodeLocal(std::unique_lock<std::mutex>& ul) {
m_postProcessData.cv.notify_all(); m_postProcessData.cv.notify_all();
m_encodeData.data.pop(); m_encodeData.data.pop();
} }
#ifdef _DEBUG
auto queue_end = std::chrono::high_resolution_clock::now(); auto queue_end = std::chrono::high_resolution_clock::now();
#endif
ul.lock(); ul.lock();
#ifdef _DEBUG
auto total_end = std::chrono::high_resolution_clock::now(); auto total_end = std::chrono::high_resolution_clock::now();
#endif
#ifdef _DEBUG
auto time_total = std::chrono::duration_cast<std::chrono::nanoseconds>(total_end - total_start); auto time_total = std::chrono::duration_cast<std::chrono::nanoseconds>(total_end - total_start);
auto time_encode = std::chrono::duration_cast<std::chrono::nanoseconds>(encode_end - encode_start); auto time_encode = std::chrono::duration_cast<std::chrono::nanoseconds>(encode_end - encode_start);
auto time_wait = std::chrono::duration_cast<std::chrono::nanoseconds>(wait_end - wait_start); auto time_wait = std::chrono::duration_cast<std::chrono::nanoseconds>(wait_end - wait_start);
auto time_queue = std::chrono::duration_cast<std::chrono::nanoseconds>(queue_end - queue_start); auto time_queue = std::chrono::duration_cast<std::chrono::nanoseconds>(queue_end - queue_start);
//PLOG_INFO("[Thread Encode] Frame %" PRId64 ": " PLOG_DEBUG("[Thread Encode] Frame %" PRId64 ": "
// "Total: %" PRId64 "ns, " "Total: %" PRId64 "ns, "
// "Encode: %" PRId64 "ns, " "Encode: %" PRId64 "ns, "
// "Wait: %" PRId64 "ns, " "Wait: %" PRId64 "ns, "
// "Queue: %" PRId64 "ns", "Queue: %" PRId64 "ns",
// std::get<1>(kv), std::get<1>(kv),
// time_total.count(), time_total.count(),
// time_encode.count(), time_encode.count(),
// time_wait.count(), time_wait.count(),
// time_queue.count()); time_queue.count());
#endif
} }
void MatroxM2VBitstreamFixer(std::shared_ptr<std::vector<char>>& ptr, std::pair<uint32_t, uint32_t> framerate) { void MatroxM2VBitstreamFixer(std::shared_ptr<std::vector<char>>& ptr, std::pair<uint32_t, uint32_t> framerate) {
@@ -963,44 +1009,61 @@ void MatroxM2VBitstreamFixer(std::shared_ptr<std::vector<char>>& ptr, std::pair<
} }
void VFW::Encoder::postProcessLocal(std::unique_lock<std::mutex>& ul) { void VFW::Encoder::postProcessLocal(std::unique_lock<std::mutex>& ul) {
#ifdef _DEBUG
auto total_start = std::chrono::high_resolution_clock::now(); auto total_start = std::chrono::high_resolution_clock::now();
#endif
auto kv = m_postProcessData.data.front(); auto kv = m_postProcessData.data.front();
ul.unlock(); ul.unlock();
#ifdef _DEBUG
auto bitstream_start = std::chrono::high_resolution_clock::now(); auto bitstream_start = std::chrono::high_resolution_clock::now();
#endif
if ((myInfo->Id == "mvcVfwMpeg2-mmes") if ((myInfo->Id == "mvcVfwMpeg2-mmes")
|| (myInfo->Id == "mvcVfwMpeg2Alpha-m704") || (myInfo->Id == "mvcVfwMpeg2Alpha-m704")
|| (myInfo->Id == "mvcVfwMpeg2HD-m701") || (myInfo->Id == "mvcVfwMpeg2HD-m701")
|| (myInfo->Id == "mvcVfwMpeg2Alpha-m705")) { || (myInfo->Id == "mvcVfwMpeg2Alpha-m705")) {
MatroxM2VBitstreamFixer(std::get<0>(kv), std::make_pair(m_fpsNum, m_fpsDen)); MatroxM2VBitstreamFixer(std::get<0>(kv), std::make_pair(m_fpsNum, m_fpsDen));
} }
#ifdef _DEBUG
auto bitstream_end = std::chrono::high_resolution_clock::now(); auto bitstream_end = std::chrono::high_resolution_clock::now();
#endif
auto wait_start = std::chrono::high_resolution_clock::now(); //#ifdef _DEBUG
// Do not fill queue if it is > latency. // auto wait_start = std::chrono::high_resolution_clock::now();
size_t queueSize = m_maxQueueSize; //#endif
while (queueSize >= m_maxQueueSize) { // // Do not fill queue if it is > latency.
{ // size_t queueSize = m_maxQueueSize;
std::unique_lock<std::mutex> flock(m_finalPacketsLock); // while (queueSize >= m_maxQueueSize) {
queueSize = m_finalPackets.size(); // {
} // std::unique_lock<std::mutex> flock(m_finalPacketsLock);
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // queueSize = m_finalPackets.size();
} // }
auto wait_end = std::chrono::high_resolution_clock::now(); // std::this_thread::sleep_for(std::chrono::milliseconds(1));
// }
//#ifdef _DEBUG
// auto wait_end = std::chrono::high_resolution_clock::now();
//#endif
#ifdef _DEBUG
auto queue_start = std::chrono::high_resolution_clock::now(); auto queue_start = std::chrono::high_resolution_clock::now();
#endif
{ {
std::unique_lock<std::mutex> plock(m_postProcessData.lock); std::unique_lock<std::mutex> plock(m_postProcessData.lock);
std::unique_lock<std::mutex> flock(m_finalPacketsLock); std::unique_lock<std::mutex> flock(m_finalPacketsLock);
m_finalPackets.push(kv); m_finalPackets.push(kv);
m_postProcessData.data.pop(); m_postProcessData.data.pop();
} }
#ifdef _DEBUG
auto queue_end = std::chrono::high_resolution_clock::now(); auto queue_end = std::chrono::high_resolution_clock::now();
#endif
ul.lock(); ul.lock();
#ifdef _DEBUG
auto total_end = std::chrono::high_resolution_clock::now(); auto total_end = std::chrono::high_resolution_clock::now();
#endif
#ifdef _DEBUG
auto time_total = std::chrono::duration_cast<std::chrono::nanoseconds>(total_end - total_start); auto time_total = std::chrono::duration_cast<std::chrono::nanoseconds>(total_end - total_start);
auto time_bitstream = std::chrono::duration_cast<std::chrono::nanoseconds>(bitstream_end - bitstream_start); auto time_bitstream = std::chrono::duration_cast<std::chrono::nanoseconds>(bitstream_end - bitstream_start);
auto time_wait = std::chrono::duration_cast<std::chrono::nanoseconds>(wait_end - wait_start); auto time_wait = std::chrono::duration_cast<std::chrono::nanoseconds>(wait_end - wait_start);
@@ -1015,4 +1078,5 @@ void VFW::Encoder::postProcessLocal(std::unique_lock<std::mutex>& ul) {
time_bitstream.count(), time_bitstream.count(),
time_wait.count(), time_wait.count(),
time_queue.count()); time_queue.count());
#endif
} }