Thursday, April 22, 2010

Speed-up database insert operation using Reactive extensions

Application inserts a lot of data to mongo databse. Insertion of small records one-by-one takes a lot of time. So, obvious solution to buffer this records. Buffer feature of Rx extensions fits well for such buffering.

Code notes:

  • Line 17-25 – Aggregates messages for 100ms period into an array and pass to saving function
  • Line 56 – Pushes measures to Rx buffer.

This buffering gave me roughly 30% performance gain.

The code itself:

public class MeasureGateway : IMeasureGateway, IDisposable
{
private readonly AddObserver _addObserver = new AddObserver();
private static readonly TimeSpan BufferTimeout = TimeSpan.FromMilliseconds(100);
private DateTime _lastMeasureAddedAt = DateTime.MinValue;
private readonly IDisposable _bufferSubscription;

[InjectionConstructor]
public MeasureGateway() : this(true)
{

}
public MeasureGateway(bool useBuffering)
{
if (useBuffering)
{
_bufferSubscription =
_addObserver
.Buffer(BufferTimeout).Subscribe((q) =>
{
if (q.Any())
{
AddMeasureInner(q);
}
});
}
}

private void AddMeasureInner(Document q)
{
using (MongoScope mongo = MongoScope())
{
mongo.DefaultCollection.Insert(q);
}
}
private void AddMeasureInner(IEnumerable<Document> q)
{
using (MongoScope mongo = MongoScope())
{
mongo.DefaultCollection.Insert(q);
}
}

public void Add(DeviceMeasure measure)
{
Document measureDoc = GetMeasureDoc(measure);

if (_bufferSubscription != null)
{
if (DateTime.Now.Subtract(_lastMeasureAddedAt) > BufferTimeout)
{
AddMeasureInner(measureDoc);
}
else
{
_addObserver.Add(measureDoc);
}

_lastMeasureAddedAt = DateTime.Now;
}
else
{
AddMeasureInner(measureDoc);
}
}

private class AddObserver : IObservable<Document>, IDisposable
{
private IObserver<Document> _observer;

#region IDisposable Members

public void Dispose()
{
}

#endregion

#region IObservable<Document> Members

public IDisposable Subscribe(IObserver<Document> observer)
{
_observer = observer;
return this;
}

#endregion

public void Add(Document doc)
{
_observer.OnNext(doc);
}
}

public void Dispose()
{
if (_bufferSubscription != null)
{
_bufferSubscription.Dispose();
}
}