Optimizing Container Synchronization – DZone – Uplaza

Environment friendly knowledge synchronization is essential in high-performance computing and multi-threaded purposes. This text explores an optimization approach for eventualities the place frequent writes to a container happen in a multi-threaded surroundings. We’ll look at the challenges of conventional synchronization strategies and current a sophisticated method that considerably improves efficiency for write-heavy environments. The strategy in query is helpful as a result of it’s simple to implement and versatile, in contrast to pre-optimized containers which may be platform-specific, require particular knowledge sorts, or convey extra library dependencies.

Conventional Approaches and Their Limitations

Think about a state of affairs the place we now have a cache of person transactions:

struct TransactionData
{
    lengthy transactionId;
    lengthy userId;
    unsigned lengthy date;
    double quantity;
    int sort;
    std::string description;
};

std::map> transactionCache; // key - userId

In a multi-threaded surroundings, we have to synchronize entry to this cache. The standard method would possibly contain utilizing a mutex:

class SimpleSynchronizedCache
{
public:
    void write(const TransactionData&& transaction)
    {
        std::lock_guard lock(cacheMutex);
        transactionCache[transaction.userId].push_back(transaction);
    }

    std::vector learn(const lengthy&& userId)
    {
        std::lock_guard lock(cacheMutex);
        strive
        {
            return transactionCache.at(userId);
        }
        catch (const std::out_of_range& ex)
        {
            return std::vector();
        }
    }

std::vector pop(const lengthy& userId)
    {
        std::lock_guard lock(_cacheMutex);
        auto userNode = _transactionCache.extract(userId);
        return userNode.empty() ? std::vector() : std::transfer(userNode.mapped());
    }

personal:
    std::map> transactionCache;
    std::mutex cacheMutex;
};

As system load will increase, particularly with frequent reads, we would think about using a shared_mutex:

class CacheWithSharedMutex
{
public:
    void write(const TransactionData&& transaction)
    {
        std::lock_guard lock(cacheMutex);
        transactionCache[transaction.userId].push_back(transaction);
    }

    std::vector learn(const lengthy&& userId)
    {
        std::shared_lock lock(cacheMutex);
        strive
        {
            return transactionCache.at(userId);
        }
        catch (const std::out_of_range& ex)
        {
            return std::vector();
        }
    }

std::vector pop(const lengthy& userId)
    {
        std::lock_guard lock(_cacheMutex);
        auto userNode = _transactionCache.extract(userId);
        return userNode.empty() ? std::vector() : std::transfer(userNode.mapped());
    }

personal:
    std::map> transactionCache;
    std::shared_mutex cacheMutex;
};

Nonetheless, when the load is primarily generated by writes quite than reads, the benefit of a shared_mutex over a daily mutex turns into minimal. The lock will typically be acquired solely, negating the advantages of shared entry.

Furthermore, let’s think about that we don’t use learn() in any respect — as a substitute, we regularly write incoming transactions and periodically flush the collected transaction vectors utilizing pop(). As pop() includes studying with extraction, each write() and pop() operations would modify the cache, necessitating unique entry quite than shared entry. Thus, the shared_lock turns into fully ineffective by way of optimization over a daily mutex, and perhaps even performs worse — its extra intricate implementation is now used for a similar unique locks {that a} sooner common mutex gives. Clearly, we’d like one thing else.

Optimizing Synchronization With the Sharding Strategy

Given the next circumstances:

  1. A multi-threaded surroundings with a shared container
  2. Frequent modification of the container from totally different threads
  3. Objects within the container might be divided for parallel processing by some member variable.

Relating to level 3, in our cache, transactions from totally different customers might be processed independently. Whereas making a mutex for every person might sound ultimate, it could result in extreme overhead in sustaining so many locks. As a substitute, we are able to divide our cache into a set variety of chunks primarily based on the person ID, in a course of generally known as sharding. This method reduces the overhead and but permits the parallel processing, thereby optimizing efficiency in a multi-threaded surroundings.

class ShardedCache
{
public:
    ShardedCache(size_t shardSize):
        _shardSize(shardSize),
        _transactionCaches(shardSize)
    {
        std::generate(
            _transactionCaches.start(),
            _transactionCaches.finish(),
            []() { return std::make_unique(); });
    }

    void write(const TransactionData& transaction)
    {
        _transactionCaches[transaction.userId % _shardSize]->write(transaction);
    }

    std::vector learn(const lengthy& userId)
    {
        _transactionCaches[userId % _shardSize]->learn(userId);
    }

    std::vector pop(const lengthy& userId)
    {
        return std::transfer(_transactionCaches[userId % _shardSize]->pop(userId));
    }

personal:
    const size_t _shardSize;
    std::vector> _transactionCaches;
};

This method permits for finer-grained locking with out the overhead of sustaining an extreme variety of mutexes. The division might be adjusted primarily based on system structure specifics, resembling dimension of a thread pool that works with the cache, or {hardware} concurrency.

Let’s run checks the place we verify how sharding accelerates cache efficiency by testing totally different partition sizes.

Efficiency Comparability

In these checks, we purpose to do extra than simply measure the utmost variety of operations the processor can deal with. We need to observe how the cache behaves underneath circumstances that intently resemble real-world eventualities, the place transactions happen randomly. Our optimization purpose is to attenuate the processing time for these transactions, which reinforces system responsiveness in sensible purposes.

The implementation and checks can be found within the GitHub repository.

#embrace 
#embrace 
#embrace 
#embrace 
#embrace 
#embrace 
#embrace 
#embrace 

#embrace "SynchronizedContainers.h"

const auto hardware_concurrency = (size_t)std::thread::hardware_concurrency();

class TaskPool
{
public:
    template 
    TaskPool(size_t poolSize, Callable process)
    {
        for (auto i = 0; i  _workers;
};

template 
class Check
{
public:
    template 
    Check(const int testrunsNum, const size_t writeWorkersNum, const size_t popWorkersNum,
        const std::string& resultsFile, CacheArgs&& ... cacheArgs) :
        _cache(std::ahead(cacheArgs)...),
        _writeWorkersNum(writeWorkersNum), _popWorkersNum(popWorkersNum),
        _resultsFile(resultsFile),
        _testrunsNum(testrunsNum), _testStarted (false)
    {
        std::random_device rd;
        _randomGenerator = std::mt19937(rd());
    }


    void run()
    {
        for (auto i = 0; i  lock(_testStartSync);
            _testStarted = false;
        }

        // these swimming pools gained’t simply fireplace as many operations as they'll,
        // however will emulate real-time occuring requests to the cache in multithreaded surroundings
        auto writeTestPool = TaskPool(_writeWorkersNum, std::bind(&Check::writeTransactions, this));
        auto popTestPool = TaskPool(_popWorkersNum, std::bind(&Check::popTransactions, this));

        _writeTime = 0;
        _writeOpNum = 0;
        _popTime = 0;
        _popOpNum = 0;

        {
            std::lock_guard lock(_testStartSync);
            _testStarted = true;
            _testStartCv.notify_all();
        }
    }

    void logResults()
    {
        std::cout  lock(_testStartSync);
            _testStartCv.wait(lock, [this] { return _testStarted; });
        }
        std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();

        // hypothetical system has round 100k presently lively customers
        std::uniform_int_distribution userDistribution(1, 100000);

        // delay as much as 5 ms for each thread to not begin concurrently
        std::uniform_int_distribution waitTimeDistribution(0, 5000);
        std::this_thread::sleep_for(std::chrono::microseconds(waitTimeDistribution(_randomGenerator)));

        for (
            auto iterationStart = std::chrono::steady_clock::now();
            iterationStart - begin (operationEnd - operationStart).depend();

            // make span between iterations no less than 5ms
            std::this_thread::sleep_for(iterationStart + std::chrono::milliseconds(5) - std::chrono::steady_clock::now());
        }
    }

    void popTransactions()
    {
        {
            std::unique_lock lock(_testStartSync);
            _testStartCv.wait(lock, [this] { return _testStarted; });
        }
        std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();

        // hypothetical system has round 100k presently lively customers
        std::uniform_int_distribution userDistribution(1, 100000);

        // delay as much as 100 ms for each thread to not begin concurrently
        std::uniform_int_distribution waitTimeDistribution(0, 100000);
        std::this_thread::sleep_for(std::chrono::microseconds(waitTimeDistribution(_randomGenerator)));

        for (
            auto iterationStart = std::chrono::steady_clock::now();
            iterationStart - begin (operationEnd - operationStart).depend();

            // make span between iterations no less than 100ms
            std::this_thread::sleep_for(iterationStart + std::chrono::milliseconds(100) - std::chrono::steady_clock::now());
        }
    }

    CacheImpl _cache;

    std::atomic _writeTime;
    std::atomic _writeOpNum;
    std::atomic _popTime;
    std::atomic _popOpNum;

    size_t _writeWorkersNum;
    size_t _popWorkersNum;
    std::string _resultsFile;
    int _testrunsNum;
    bool _testStarted;
    std::mutex _testStartSync;
    std::condition_variable _testStartCv;
    std::mt19937 _randomGenerator;
};

void testCaches(const size_t testedShardSize, const size_t workersNum)
{
    if (testedShardSize == 1)
    {
        auto simpleImplTest = Check(
            10, workersNum, workersNum, "simple_cache_tests(" + std::to_string(workersNum) + "_workers).csv");

        simpleImplTest.run();
    }
    else
    {
        auto shardedImpl4Test = Check(
            10, workersNum, workersNum, "sharded_cache_" + std::to_string(testedShardSize) + "_tests(" + std::to_string(workersNum) + "_workers).csv", 4);

        shardedImpl4Test.run();
    }
}

int foremost()
{
    std::cout  testPlan = { 1, 4, 8, 32, 128, 4096, 100000 };

    for (auto i = 0; i  additionalTestPlan = { 1, 8, 128, 100000 };

    for (auto i = 0; i 

We observe that with 2,000 writes and 300 pops per second (with a concurrency of 8) — which aren’t very excessive numbers for a high-load system — optimization utilizing sharding considerably accelerates cache efficiency, by orders of magnitude. Nonetheless, evaluating the importance of this distinction is left to the reader, as, in each eventualities, operations took lower than a millisecond. It’s vital to notice that the checks used a comparatively light-weight knowledge construction for transactions, and synchronization was utilized solely to the container itself. In real-world eventualities, knowledge is usually extra complicated and bigger, and synchronized processing could require extra computations and entry to different knowledge, which might considerably improve the time of operation itself. Subsequently, we purpose to spend as little time on synchronization as doable.

The checks don’t present the numerous distinction in processing time when growing the shard dimension. The higher the dimensions the larger is the sustaining overhead, so how low ought to we go? I believe that the minimal efficient worth is tied to the system’s concurrency, so for contemporary server machines with a lot higher concurrency than my dwelling PC, a shard dimension that’s too small gained’t yield probably the most optimum outcomes. I’d like to see the outcomes on different machines with totally different concurrency which will verify or disprove this speculation, however for now I assume it’s optimum to make use of a shard dimension that’s a number of occasions bigger than the concurrency. You too can be aware that the biggest dimension examined — 100,000 — successfully matches the talked about earlier method of assigning a mutex to every person (within the checks, person IDs have been generated inside the vary of 100,000). As might be seen, this didn’t present any benefit in processing velocity, and this method is clearly extra demanding by way of reminiscence.

Limitations and Issues

So, we decided an optimum shard dimension, however this isn’t the one factor that needs to be thought-about for one of the best outcomes.

It’s vital to keep in mind that such a distinction in comparison with a easy implementation exists solely as a result of we try to carry out a sufficiently massive variety of transactions on the similar time, inflicting a “queue” to construct up. If the system’s concurrency and the velocity of every operation (inside the mutex lock) permit operations to be processed with out bottlenecks, the effectiveness of sharding optimization decreases. To exhibit this, let’s take a look at the check outcomes with decreased load — at 500 writes and 75 pops (with a concurrency of 8) — the distinction remains to be current, however it’s now not as important. That is one more reminder that untimely optimizations can complicate code with out considerably impacting outcomes. It’s essential to know the appliance necessities and anticipated load.

Additionally, it’s vital to notice that the effectiveness of sharding closely depends upon the distribution of values of the chosen key (on this case, person ID). If the distribution turns into closely skewed, we could revert to efficiency extra much like that of a single mutex — think about all the transactions coming from a single person.

Conclusion

In eventualities with frequent writes to a container in a multi-threaded surroundings, conventional synchronization strategies can turn into a bottleneck. By leveraging the flexibility of parallel processing of knowledge and predictable distribution by some particular key and implementing a sharded synchronization method, we are able to considerably enhance efficiency with out sacrificing thread security. This system can show itself efficient for methods coping with user-specific knowledge, resembling transaction processing methods, person session caches, or any state of affairs the place knowledge might be logically partitioned primarily based on a key attribute.

As with every optimization, it’s essential to profile your particular use case and alter the implementation accordingly. The method offered right here gives a place to begin for tackling synchronization challenges in write-heavy, multi-threaded purposes.

Keep in mind, the purpose of optimization isn’t just to make issues sooner, however to make them extra environment friendly and scalable. By considering critically about your knowledge entry patterns and leveraging the inherent construction of your knowledge, you possibly can typically discover revolutionary options to efficiency bottlenecks.

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version