Application inserts a lot of data to mongo databse. Insertion of small records one-by-one takes a lot of time. So, obvious solution to buffer this records. Buffer feature of Rx extensions fits well for such buffering.
Code notes:
- Line 17-25 – Aggregates messages for 100ms period into an array and pass to saving function
- Line 56 – Pushes measures to Rx buffer.
This buffering gave me roughly 30% performance gain.
The code itself:
public class MeasureGateway : IMeasureGateway, IDisposable
{
private readonly AddObserver _addObserver = new AddObserver();
private static readonly TimeSpan BufferTimeout = TimeSpan.FromMilliseconds(100);
private DateTime _lastMeasureAddedAt = DateTime.MinValue;
private readonly IDisposable _bufferSubscription;
[InjectionConstructor]
public MeasureGateway() : this(true)
{
}
public MeasureGateway(bool useBuffering)
{
if (useBuffering)
{
_bufferSubscription =
_addObserver
.Buffer(BufferTimeout).Subscribe((q) =>
{
if (q.Any())
{
AddMeasureInner(q);
}
});
}
}
private void AddMeasureInner(Document q)
{
using (MongoScope mongo = MongoScope())
{
mongo.DefaultCollection.Insert(q);
}
}
private void AddMeasureInner(IEnumerable<Document> q)
{
using (MongoScope mongo = MongoScope())
{
mongo.DefaultCollection.Insert(q);
}
}
public void Add(DeviceMeasure measure)
{
Document measureDoc = GetMeasureDoc(measure);
if (_bufferSubscription != null)
{
if (DateTime.Now.Subtract(_lastMeasureAddedAt) > BufferTimeout)
{
AddMeasureInner(measureDoc);
}
else
{
_addObserver.Add(measureDoc);
}
_lastMeasureAddedAt = DateTime.Now;
}
else
{
AddMeasureInner(measureDoc);
}
}
private class AddObserver : IObservable<Document>, IDisposable
{
private IObserver<Document> _observer;
#region IDisposable Members
public void Dispose()
{
}
#endregion
#region IObservable<Document> Members
public IDisposable Subscribe(IObserver<Document> observer)
{
_observer = observer;
return this;
}
#endregion
public void Add(Document doc)
{
_observer.OnNext(doc);
}
}
public void Dispose()
{
if (_bufferSubscription != null)
{
_bufferSubscription.Dispose();
}
}
No comments:
Post a Comment