// // Miguel de Icaza (miguel@novell.com) // using System; using System.Collections; using System.Threading; using System.Web; // // This is the iterators-based application pipeline // class ApplicationPipeline { public event EventHandler Hook; public event EventHandler Hook2; public bool stop_processing = false; IEnumerator pipeline; string prefix; // Can be a ManualResetEvent or an AsyncRequestState. ManualResetEvent done; AsyncRequestState begin_iar; public ApplicationPipeline (string p) { done = new ManualResetEvent (false); prefix = p; } public void AddAsync (BeginEventHandler b, EndEventHandler e) { AsyncInvoker invoker = new AsyncInvoker (b, e); Hook += new EventHandler (invoker.Invoke); } // // Ticks the clock: next step on the pipeline. // void Tick () { if (pipeline.MoveNext ()){ if ((bool)pipeline.Current){ Console.WriteLine (prefix + "Stop requested"); Done (); } } } void Done () { if (begin_iar != null) begin_iar.Complete (); done.Set (); } // // Invoked when our async callback called from RunHooks completes, // we restart the pipeline here. // void async_callback_completed_cb (IAsyncResult ar) { AsyncInvoker ai = (AsyncInvoker) ar.AsyncState; if (ai.end != null) ai.end (ar); Console.WriteLine (prefix + "Completed async operation: {0}", ar.GetType ()); Tick (); } IEnumerable RunHooks (string stage, Delegate list) { Delegate [] delegates = list.GetInvocationList (); foreach (EventHandler d in delegates){ if (d.Target != null && d.Target.GetType () == typeof (AsyncInvoker)){ AsyncInvoker ai = (AsyncInvoker) d.Target; Console.WriteLine (prefix + "Queuing an async method"); ai.begin (this, EventArgs.Empty, async_callback_completed_cb, ai); yield return false; } else d (this, EventArgs.Empty); if (stop_processing) yield return true; } Console.WriteLine (prefix + "{0}: All hooks invoked", stage); } IEnumerator Pipeline () { Console.WriteLine (prefix + "starting"); if (Hook != null) foreach (bool stop in RunHooks ("....", Hook)) yield return stop; if (Hook2 != null) foreach (bool stop in RunHooks ("----", Hook2)) yield return stop; Done (); } void Start (object x) { done.Reset (); pipeline = Pipeline (); Tick (); } public IAsyncResult BeginProcessRequest (AsyncCallback cb, object extraData) { begin_iar = new AsyncRequestState (null, cb, extraData); ThreadPool.QueueUserWorkItem (new WaitCallback (Start), null); return begin_iar; } public void ProcessRequest () { Start (null); done.WaitOne (); Console.WriteLine (prefix + "terminating"); } } // // This is the test driver code. // class Y { static ApplicationPipeline x; // // My test app, exercises various modes of operation: // continuous // stop by async module // stop by sync module // full async execution // static void Main () { x = new ApplicationPipeline ("[full] "); x.Hook += MyHook; x.Hook2 += MyHook2; x.AddAsync (b, e); x.AddAsync (b, e); x.ProcessRequest (); x = new ApplicationPipeline ("[asyncstop] "); x.Hook += MyHook; x.AddAsync (b, e); x.Hook2 += MyHook2; stop = true; x.ProcessRequest (); stop = false; x = new ApplicationPipeline ("[syncstop] "); x.Hook += Stop; x.AddAsync (b, e); x.Hook2 += MyHook2; x.ProcessRequest (); x = new ApplicationPipeline ("[asyncwait] "); x.Hook += MyHook; x.Hook2 += MyHook2; x.AddAsync (b, e); x.AddAsync (b, e); IAsyncResult iar = x.BeginProcessRequest (complete, null); iar.AsyncWaitHandle.WaitOne (); if (called != 1923) throw new Exception ("EndPR was never called"); x = new ApplicationPipeline ("[beginpr] "); x.Hook += MyHook; x.Hook2 += MyHook2; x.AddAsync (b, e); x.AddAsync (b, e); x.BeginProcessRequest (null, null); Console.WriteLine ("BeingProcessRequest invoked, press return to stop"); Console.ReadLine (); Console.WriteLine ("The End"); } static int called = 3193; static void complete (IAsyncResult ar) { if (called != 3193) throw new Exception ("fuck"); called = 1923; Console.WriteLine ("BeginPR callback invoked"); } static void MyHook (object sender, EventArgs a) { Console.WriteLine ("My Hook"); } static void Stop (object sender, EventArgs a) { Console.WriteLine ("Stopping pipeline" + Environment.StackTrace); x.stop_processing = true; } static void MyHook2 (object sender, EventArgs a) { Console.WriteLine ("My Second hook"); } static void done (IAsyncResult a) { Console.WriteLine ("Done"); } static bool stop = false; static void do_stuff (object data) { Console.WriteLine ("SLOW: Starting slow process"); Thread.Sleep (2000); Console.WriteLine ("SLOW: signaling completion"); AsyncRequestState a = (AsyncRequestState) data; x.stop_processing = stop; a.Complete (); } static IAsyncResult b (object sender, EventArgs e, AsyncCallback cb, object extraData) { IAsyncResult o = new AsyncRequestState (null, cb, extraData); ThreadPool.QueueUserWorkItem (do_stuff, o); return o; } static void e (IAsyncResult iar) { Console.WriteLine ("SLOW: COMPLETED"); } } // // Based on Fritz' Onion's AsyncRequestState class for asynchronous IHttpAsyncHandlers // class AsyncRequestState : IAsyncResult { public object Context; AsyncCallback cb; object cb_data; bool completed; ManualResetEvent complete_event = null; internal AsyncRequestState (object context, AsyncCallback cb, object cb_data) { this.Context = context; this.cb = cb; this.cb_data = cb_data; } internal void Complete () { completed = true; if (cb != null) cb (this); lock (this){ if (complete_event != null) complete_event.Set (); } } public object AsyncState { get { return cb_data; } } public bool CompletedSynchronously { get { return false; } } public bool IsCompleted { get { return completed; } } public WaitHandle AsyncWaitHandle { get { lock (this){ if (complete_event == null) complete_event = new ManualResetEvent (completed); return complete_event; } } } } // // Helper class to track begin/end invoker async pairs // public class AsyncInvoker { public BeginEventHandler begin; public EndEventHandler end; public AsyncInvoker (BeginEventHandler bh, EndEventHandler eh) { begin = bh; end = eh; } public void Invoke (object sender, EventArgs e) { // Just a stub so we can "detect" the async handlers on the normal list of handlers. } }