Problem Statement

MXNet tries to leverage multithreading on CPUs and GPUs to solve many problems. Few big areas are: dependency engine to run operators in parallel, within operator implementation logic, for data loading using iterators. This designs helps MXNet achieve great performance, but adds some challenges with respect to usability. Below I demonstrate two scenarios where MXNet doesn't handle exceptions gracefully and causes the main thread to crash. 

Example 1

import mxnet as mx
mx.nd.random_normal(0, -1, (2,3))
mx.nd.waitall()



 

The above code crashes in the following way:

 

terminate called after throwing an instance of 'dmlc::Error'
  what():  [02:32:04] ../src/engine/./threaded_engine.h:359: [02:32:04] ../src/operator/random/./sample_op.h:301: Check failed: param.scale > 0 (-1 vs. 0) scale parameter in gaussian has to be positive
Stack trace returned 10 entries:
[bt] (0) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN4dmlc10StackTraceB5cxx11Ev+0x54) [0x7eff0140bf5b]
[bt] (1) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN4dmlc15LogMessageFatalD1Ev+0x2a) [0x7eff0140c242]
[bt] (2) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN5mxnet2op12SampleMasterIN7mshadow3cpuENS0_13NormalSamplerIS3_EEE2opERKN4nnvm9NodeAttrsERKNS_9OpContextERKNS_9OpReqTypeEPNS_5TBlobE+0x120) [0x7eff01a56c8a]
[bt] (3) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN5mxnet2op7Sample_IN7mshadow3cpuENS0_13NormalSamplerIS3_EEEEvRKN4nnvm9NodeAttrsERKNS_9OpContextERKSt6vectorINS_5TBlobESaISE_EERKSD_INS_9OpReqTypeESaISJ_EESI_+0xa1) [0x7eff01a4e8ca]
[bt] (4) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNSt17_Function_handlerIFvRKN4nnvm9NodeAttrsERKN5mxnet9OpContextERKSt6vectorINS4_5TBlobESaIS9_EERKS8_INS4_9OpReqTypeESaISE_EESD_EPSJ_E9_M_invokeERKSt9_Any_dataS3_S7_SD_SI_SD_+0x91) [0x7eff01606165]
[bt] (5) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNKSt8functionIFvRKN4nnvm9NodeAttrsERKN5mxnet9OpContextERKSt6vectorINS4_5TBlobESaIS9_EERKS8_INS4_9OpReqTypeESaISE_EESD_EEclES3_S7_SD_SI_SD_+0xa6) [0x7eff03d1732c]
[bt] (6) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZZN5mxnet10imperative12PushFComputeERKSt8functionIFvRKN4nnvm9NodeAttrsERKNS_9OpContextERKSt6vectorINS_5TBlobESaISA_EERKS9_INS_9OpReqTypeESaISF_EESE_EEPKNS2_2OpES5_RKNS_7ContextERKS9_IPNS_6engine3VarESaISW_EES10_RKS9_INS_8ResourceESaIS11_EERKS9_IPNS_7NDArrayESaIS17_EES1B_RKS9_IjSaIjEESJ_ENKUlNS_10RunContextEE_clES1G_+0x1f2) [0x7eff03e691f6]
[bt] (7) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNSt17_Function_handlerIFvN5mxnet10RunContextEEZNS0_10imperative12PushFComputeERKSt8functionIFvRKN4nnvm9NodeAttrsERKNS0_9OpContextERKSt6vectorINS0_5TBlobESaISD_EERKSC_INS0_9OpReqTypeESaISI_EESH_EEPKNS5_2OpES8_RKNS0_7ContextERKSC_IPNS0_6engine3VarESaISZ_EES13_RKSC_INS0_8ResourceESaIS14_EERKSC_IPNS0_7NDArrayESaIS1A_EES1E_RKSC_IjSaIjEESM_EUlS1_E_E9_M_invokeERKSt9_Any_dataOS1_+0x44) [0x7eff03e712f5]
[bt] (8) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNKSt8functionIFvN5mxnet10RunContextEEEclES1_+0x56) [0x7eff03c731fc]
[bt] (9) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(+0x39b9f33) [0x7eff03c90f33]
 
A fatal error occurred in asynchronous engine operation. If you do not know what caused this error, you can try set environment variable MXNET_ENGINE_TYPE to NaiveEngine and run with debugger (i.e. gdb). This will force all operations to be synchronous and backtrace will give you the series of calls that lead to this error. Remember to set MXNET_ENGINE_TYPE back to empty after debugging.
Stack trace returned 10 entries:
[bt] (0) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN4dmlc10StackTraceB5cxx11Ev+0x54) [0x7eff0140bf5b]
[bt] (1) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN4dmlc15LogMessageFatalD1Ev+0x2a) [0x7eff0140c242]
[bt] (2) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN5mxnet6engine14ThreadedEngine15ExecuteOprBlockENS_10RunContextEPNS0_8OprBlockE+0x4f6) [0x7eff03c7cb44]
[bt] (3) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN5mxnet6engine23ThreadedEnginePerDevice9CPUWorkerILN4dmlc19ConcurrentQueueTypeE0EEEvNS_7ContextEPNS1_17ThreadWorkerBlockIXT_EEESt10shared_ptrINS0_10ThreadPool11SimpleEventEE+0x9d) [0x7eff03c878c9]
[bt] (4) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZZZN5mxnet6engine23ThreadedEnginePerDevice13PushToExecuteEPNS0_8OprBlockEbENKUlvE_clEvENKUlSt10shared_ptrINS0_10ThreadPool11SimpleEventEEE_clES8_+0x56) [0x7eff03c85774]
[bt] (5) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNSt17_Function_handlerIFvSt10shared_ptrIN5mxnet6engine10ThreadPool11SimpleEventEEEZZNS2_23ThreadedEnginePerDevice13PushToExecuteEPNS2_8OprBlockEbENKUlvE_clEvEUlS5_E_E9_M_invokeERKSt9_Any_dataOS5_+0x5c) [0x7eff03c8a424]
[bt] (6) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNKSt8functionIFvSt10shared_ptrIN5mxnet6engine10ThreadPool11SimpleEventEEEEclES5_+0x49) [0x7eff03c900f3]
[bt] (7) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNSt12_Bind_simpleIFSt8functionIFvSt10shared_ptrIN5mxnet6engine10ThreadPool11SimpleEventEEEES6_EE9_M_invokeIILm0EEEEvSt12_Index_tupleIIXspT_EEE+0x68) [0x7eff03c90066]
[bt] (8) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNSt12_Bind_simpleIFSt8functionIFvSt10shared_ptrIN5mxnet6engine10ThreadPool11SimpleEventEEEES6_EEclEv+0x2c) [0x7eff03c8fefa]
[bt] (9) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNSt6thread5_ImplISt12_Bind_simpleIFSt8functionIFvSt10shared_ptrIN5mxnet6engine10ThreadPool11SimpleEventEEEES8_EEE6_M_runEv+0x1c) [0x7eff03c8fe4a]

 

Example 2

import mxnet as mx
data_path = 'manual_2.csv'
data_train = None
try:
    data_train = mx.io.CSVIter(data_csv=data_path, data_shape=(4,10),
            batch_size=1)
    for batch in iter(data_train):
        print data_train.getdata().asnumpy()
except mx.base.MXNetError:
    print 'Exception handled'

Here is how my manual_2.csv looks like

 

1,2,3,4
2,3,4,5
3,4,5,6
4,5,6,7
5,6,7,8
6,7,8,9
9,10,11,12
10,11,12,13

 

As you can clearly see the data_shape doesnt match the input shape. When I try to run this, the python process crashes with the following error and stacktrace and there is no way to do a graceful failure by catching the exception.

 

terminate called after throwing an instance of 'dmlc::Error'
  what():  [02:08:14] ../src/io/iter_csv.cc:125: Check failed: row.length == shape.Size() (4 vs. 40) The data size in CSV do not match size of shape: specified shape=[4,10], the csv row-length=4
Stack trace returned 10 entries:
[bt] (0) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN4dmlc10StackTraceB5cxx11Ev+0x54) [0x7febfb693f5b]
[bt] (1) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN4dmlc15LogMessageFatalD1Ev+0x2a) [0x7febfb694242]
[bt] (2) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN5mxnet2io7CSVIter7AsTBlobERKN4dmlc3RowIjEERKN4nnvm6TShapeE+0x14a) [0x7febfe0d9832]
[bt] (3) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN5mxnet2io7CSVIter4NextEv+0x25e) [0x7febfe0d9312]
[bt] (4) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZN5mxnet2io11BatchLoader4NextEv+0xa1) [0x7febfe0653f3]
[bt] (5) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZZN5mxnet2io14PrefetcherIter4InitERKSt6vectorISt4pairINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES9_ESaISA_EEENKUlPPNS_9DataBatchEE_clESH_+0x50) [0x7febfe04bf98]
[bt] (6) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNSt17_Function_handlerIFbPPN5mxnet9DataBatchEEZNS0_2io14PrefetcherIter4InitERKSt6vectorISt4pairINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESE_ESaISF_EEEUlS3_E_E9_M_invokeERKSt9_Any_dataOS3_+0x37) [0x7febfe053473]
[bt] (7) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNKSt8functionIFbPPN5mxnet9DataBatchEEEclES3_+0x49) [0x7febfe053797]
[bt] (8) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZZN4dmlc12ThreadedIterIN5mxnet9DataBatchEE4InitESt8functionIFbPPS2_EES4_IFvvEEENKUlvE_clEv+0x311) [0x7febfe0512eb]
[bt] (9) /home/ubuntu/sparse_support/mxnet/python/mxnet/../../build/libmxnet.so(_ZNSt12_Bind_simpleIFZN4dmlc12ThreadedIterIN5mxnet9DataBatchEE4InitESt8functionIFbPPS3_EES5_IFvvEEEUlvE_vEE9_M_invokeIJEEEvSt12_Index_tupleIJXspT_EEE+0x28) [0x7febfe05a188]

 

Why is this a problem ?

Proper exception handling and propagation in MXNet is important for two types of use-case.  The first is for MXNet users who are using one of our APIs to build or test a model, and the second is MXNet service owners who are using MXNet in production for DL enabled services.

From the perspective of an MXNet user (especially a casual, or new user), the above makes for a poor user experience.  This poor user experience is worse in a non-terminal environment like jupyter notebook, docker container.  Even if the user understands the error, they're unable to respond in a high-level language like python because MXNet currently doesn't allow users to handle exceptions and exit gracefully (or to retry, or perform some other action).

From a service owner's perspective, when we don't properly propagate errors through our language bindings it becomes extremely difficult to debug and support our service.  As an example, crashing instead of propagating errors obviously has a negative affect on highly-available services.  Many services would page on-call staff when this occurs.  Additionally, although MXNet has a largely asynchronous API, it currently does not allow for services to handle exceptions on a per-request basis.  If we have one mis-shaped request in our queue to be processed, and 50 other in-flight requests, the single mis-shaped request will crash the entire process.  This currently requires services to implement logic to monitor and retry the other in-flight requests (which negatively affects latencies, and may in turn violate SLAs).

We have had multiple customer requests to fix this:

Look at the community requests here:https://github.com/apache/incubator-mxnet/issues/7335

Exception Handling for Iterators

Approach

MXNet uses a general IO processing pipeline based on ThreadedIter class in dmlc-core.

The design of the ThreadedIter is based on the producer consumer model. It launches a stand-alone thread that acts as a data provider while the main thread acts as the data consumer.

One of the training schemes that is commonly used is the PrefetchingIter where the data is pre-fetched for the next batch on the producer thread, while the consumer thread performs training on the current batch. All iterators in MXNet are based on the PrefetchingIter or SparsePrefetchingIter.

The PrefetchingIter/SparsePrefetchingIter provide an interface for users to write custom data loaders that reads their customized binary format and automatically gets multi-threaded prefetcher support.

The above diagram is based on https://mxnet.incubator.apache.org/architecture/note_data_loading.html#hide-io-cost-using-threadediter in mxnet io docs .Changes have been made to existing diagram with additional information about exception handling.

As mentioned above threaded iter provides support for producer consumer model where producer is a standalone thread. The PrefetchingIter uses the threaded iter and provides interface for data parsing and loading from custom format or record io format. This parsing and loading logic may in turn spawn multiple threads. Any exceptions thrown in these threads should be caught and transported to the child thread where it will be rethrown. This rethrown exception will be caught and transported to the main thread.

Another scenario can be that the exception is thrown in the child thread itself. In this case too, the exception will be caught and transported to the main thread.

The main thread maintains a queue of exception_ptrs and checks if the queue is non empty. If the queue is non empty it pulls out the exception ptrs and rethrows the exception.

Proof of Concept

https://github.com/dmlc/dmlc-core/pull/355

 

Open Questions

1. Should we keep a queue of arbitary size or queue of size 1. One advantage of using queue of arbitary size is that we can store exceptions from multiple threads in the case where multiple threads throw exceptions. One disadvantage of using queue of arbitary size is that it consumes extra memory.

Exception Handling for Operators

Approach1

  • Add exception_ptr member for ThreadedOpr opr_ex and exception_ptr member for ThreadedVar var_ex.
  • Put a try catch block in the ExecuteOprBlock around the execution of the operator. 
  • If there is an exeption thrown during the execution of the operator, then we intend to catch the exception and use the exception_ptr member for the ThreadedOpr to point to the exception object. We explicitly make a call to callback in this case. 
  • In the callback, we set the exception_ptr member for all the variables that the current operator will mutate.
  • In the callback we also set the exception_ptr member for the current operator to the one held by one of its dependencies. This way we can propagate an old exception_ptr down the dependency chain.
  • Also set the global_exc_ptr depending on whether there is exception associated with a read var.
  • In WaitForVar, check if the threaded_var->var_ex is set. If it is set, rethrow the exception. Since we are waiting for this var, if this var had an exception associated with it means somewhere in the dependency path to get to the var there was an exception thrown.
  • In WaitForAll, we can rethrow exception based on whether global_exc_ptr is set or not.

Proof of Concept for Approach 1

https://github.com/apache/incubator-mxnet/pull/9373

Approach2

  • Add exception_ptr member for ThreadedOpr opr_ex and exception_ptr member for ThreadedVar var_ex.
  • Put a try catch block in the ExecuteOprBlock around execution of the operator. dont execute the operator if the threaded_opr already contains the exception.
  • Functions pushed using Push_async will take three parameters instead of two: on_start, execute, on_complete.
  • on_start callback will propagate exception_ptr based on whether read dependencies have exception_ptr associated with them.
  • on complete callback will propagate exception_ptr to write_vars based on whether the threaded_opr has exception associated with them.
  • The logic to rethrow the exception in WaitForVar and WaitForAll should be same as approach1.

Comparison of Approach1 and Approach2

Approach1Approach2
Forces to execute operators even if prev operators failed. This can be a problem if subsequent operators after a failed operator throw exceptions other than dmlc::Error

Once there is a failed operator all the operators that depend on the current operator won't be executed.

Minimal api changes.The lambda closure expected by PushAsync has a different signature after adding onstart callback.
For the cases where exception is thrown, there is an overhead of execution of subsequent operators. For the cases where exception is thrown, there is no overhead of execution of subsequent operators.
Performance impact should be minimal in cases where there will be no exception thrown.Performance impact needs to be investigated because of additional overhead of the onstart callback even for cases where no exception thrown.

Recommendation

My recommendation is to take Approach1 since this introduces minimal api changes and also minimal performance impact in the case where no exception is thrown.

This will require a thorough examination of all operators to make sure they throw exceptions only wrapped in dmlc::Error.

 

The issue with Approach1 is that it forces execution of the following operators and may lead to incorrect results for variables that are directly or indirectly mutated. Also, the performance impact will be similar to Approach2 since we have additional overhead to propagate exceptions  from the callback instead of onstart callback. The performance impact of adding Exception Handling Functionality with Approach2 needs to be investigated. 

Since the performance impact of both Approaches should be similar and since Approach2 has an advantage of non execution of subsequent operators and addresses the issue with Approach1, the recommendation is to proceed with Approach2.

Open Questions

1. How to handle WaitForAll situation ?

2. Is it guaranteed that the exception will be only  thrown in the Execution of operator and not the callback ?

  • No labels

10 Comments

  1. For exception handling for operators, I like approach 2 better. In general it is always better to fail fast when a error condition occurs instead of continuing the execution with read variables in some undefined state. Do measure the overhead introduced by checking which read vars have exceptions set.

  2. Absolutely for approach 2. Continue to execute depending operators will either fail, or worse produce garbage. So the user will see tons of subsequent exceptions (and will have trouble to figure out what the root cause is) or may not realize that garbage is created.

     

  3. Huge thanks for pushing this forward, this has been a problem that I've seen turn away many potential MXNet users, and it should be addressed.

    I'm also in favour of approach 2.  

    Recommendation: Let's have a mapping between known exception types.  For example a shape error or dtype error that is raised during graph initialization can have its own typed exception thrown in support languages, with contextual, actionable information describing why the error happened.  Users with dtype errors should be able to read the typed exception and realize that 1) they need to cast datatypes and 2) this cast should occur between these two operations.  This is something that we can support incrementally, by default we can project all exception types as RuntimeErrors, and then provide more descriptive errors for common problems.

    Comment: I think we're under-emphasizing an important point in the 'why is this a problem?' section.  There's a reasonable expectation that MXNet will be / is being used as a core library for a wide variety of machine learning services.  When we don't properly propagate errors through our language bindings it becomes extremely difficult to debug these services.  Crashing instead of propagating errors is the worst thing we could possibly do.  Imagine trying to support an SLA on a service where the core library is crashing every time it encounters a non-fatal error.  To me the usability for researchers and jupyter notebook issues should be noted, but crashing a production service for a non-fatal error should be the primary reason to develop a comprehensive fix.

    Question: Would you be willing to take a contribution for the diagram?  To me it doesn't clearly demonstrate what's changing and where exception handling is being taken care of.  I wanted to get a good sense of the threading model of MXNet and helping with a diagram like this could be a good exercise.

    1. Kellen, Thanks a lot for your feedback.

      Typed exception for frontends really goes a long way to improving usability. I will add a section for Phase2 of the project and welcome contributions from the community for the same.

      You make a really good point with respect to Why is this a problem? section. I will update the section and add your point.

      'Would you be willing to take a contribution for the diagram?' -> Do you want to contribute a diagram for the exception handling in operators or iterators? I would really appreciate your contribution I have a PPT which has  an example depicting how the exception handing in works which I didn't include in the PPT earlier. You can use that as a reference. I have attached it here.

    2. Do you have edit permissions to this Wiki? If so, please feel free to modify the Why is this a problem section with your comment.

      1. Thanks Anirudh.  Tweaked the wording such that we capture the service owner's perspective.

  4. Question: From a design perspective we might consider keeping things simple and just having a single global exception that gets set rather than a queue of exceptions.  Generally the first-failure is the important failure, especially if we follow a 'fast-fail' mentality as in approach 2. 

    1. For the iterators case, I think its enough to have a single exception and I will modify the open questions section of the wiki ?

      For the operators case, one exception by itself won't suffice and we will need exceptions corresponding to the operator and the vars.

      For example, consider two independent execution graphs, exception thrown in both the execution graphs should be actionable seperately.

      1. I may be missing something, but just to clarify here's my thought process on this: 

        If we have two completely independent graphs then we'll have two separate high-level blocking calls where we'd like to raise an exception, right?  So in both these cases what we'd want to do is fail-fast and raise the exception at the first blocking call.  In reality because we're single-threaded/asyncrounous we would actually throw the exceptions one-after-another, but each throw is still independent.  If the graphs is grouped I would throw the first exception only at the group's first blocking call. 

        To me what makes sense on the operator side is that we have a single exception per block of executed operators.  If we have two graphs executed concurrently this would mean two exceptions.  If one of the operators in the block fails we'd immediately set that exception and inform the corresponding blocking call in the high-level language (i.e. asnumpy, wait_to_read).

         

  5. Kellen Sunderland : You are right that if there are two completely independent graphs and we have two high level blocking calls for each of the two graphs, each of them would raise exception. It will be executed one after the other since the thread we are calling WaitToReads and PushAsyncs from is single threaded.

    By "per block of executed operators", do you mean the Bulk mode where multiple small operators are executed together ? In the Bulk mode, all operators are combined into one operator and then pushed to engine, so we will still have exceptions corresponding to the final write vars.