async_workflow.cppΒΆ

Example for asynchronous operations workflow.

Example for asynchronous operations workflow. See corresponding tutorial.

// Copyright (c) 2018, Smart Projects Holdings Ltd
// All rights reserved.
// See LICENSE file for license details.

#include <ugcs/vsm/vsm.h>

/* Target function for callback. */
int
Sample_callback_function(int arg)
{
    LOG("Sample callback function, arg: %d", arg);
    return arg + 10;
}
class Sample_class {
public:
    int
    Sample_callback_method(int arg)
    {
        LOG("Sample callback method, arg: %d", arg);
        return arg + 10;
    }
};
class Callable_class {
public:
    int x;

    Callable_class(int x):
        x(x)
    {}

    int
    operator ()(int y)
    {
        LOG("Sample callback method, arg: %d", y);
        return x + y;
    }
};
/* Callbacks creation example. */
void
Simple_callbacks()
{
    auto func_cbk = ugcs::vsm::Make_callback(Sample_callback_function, 10);
    LOG("Callback call result: %d", func_cbk());
    Sample_class class_instance;
    auto method_cbk = ugcs::vsm::Make_callback(&Sample_class::Sample_callback_method,
                                         &class_instance, 10);
    LOG("Callback call result: %d", method_cbk());
    Callable_class callable_instance(10);
    auto callable_cbk = ugcs::vsm::Make_callback(callable_instance, 10);
    LOG("Callback call result: %d", callable_cbk());
    auto lambda_cbk = ugcs::vsm::Make_callback(
        [](int arg)
        {
            LOG("Sample lambda callback, arg: %d", arg);
            return arg + 10;
        },
        10);
    LOG("Callback call result: %d", lambda_cbk());
}

typedef ugcs::vsm::Callback_proxy<int, double> Sample_handler_type;
void
Some_api_method(Sample_handler_type handler)
{
    LOG("Callback result: %d", handler(20));
}
DEFINE_CALLBACK_BUILDER(Sample_handler_builder, (double), (3.14))
/* Illustration for using ugcs::vsm::Callback_proxy class */
void
Callback_proxies()
{
    auto My_callback_target = [](double enforced_arg, int user_arg)
        {
            LOG("Callback called, enforced argument %f user argument %d",
                enforced_arg, user_arg);
            return 30;
        };
    Some_api_method(Sample_handler_builder(My_callback_target, 10));
}

void
Requests_and_contexts()
{
    /* Create waiter object for our contexts. */
    ugcs::vsm::Request_waiter::Ptr waiter = ugcs::vsm::Request_waiter::Create();
    /* Create processor for requests. */
    ugcs::vsm::Request_processor::Ptr processor = ugcs::vsm::Request_processor::Create("Processor", waiter);
    /* Create completion context for notifications processing. */
    ugcs::vsm::Request_completion_context::Ptr comp_ctx =
            ugcs::vsm::Request_completion_context::Create("Completion context", waiter);
    processor->Enable();
    comp_ctx->Enable();
    ugcs::vsm::Request::Ptr req = ugcs::vsm::Request::Create();
    req->Set_processing_handler(
        ugcs::vsm::Make_callback(
            [](int arg, ugcs::vsm::Request::Ptr req)
            {
                LOG("Processing handler called, arg %d", arg);
                req->Complete();
            }, 10, req));
    req->Set_completion_handler(comp_ctx,
        ugcs::vsm::Make_callback(
            [](int arg)
            {
                LOG("Completion notification handler called, arg %d", arg);
            }, 20));
    processor->Submit_request(req);
    LOG("Request submitted");
    LOG("Before processor running");
    waiter->Wait_and_process({processor});
    LOG("After processor running");
    LOG("Before completion context running");
    waiter->Wait_and_process({comp_ctx});
    LOG("After completion context running");
    processor->Disable();
    comp_ctx->Disable();
    /* Demonstrate contexts serving in a separated thread. */
    ugcs::vsm::Request_worker::Ptr worker = ugcs::vsm::Request_worker::Create("Another thread",
        std::initializer_list<ugcs::vsm::Request_container::Ptr>{processor, comp_ctx});
    /* Enable all attached containers. */
    worker->Enable_containers();
    /* Enable the worker. This will launch its thread. */
    worker->Enable();
    req = ugcs::vsm::Request::Create();
    req->Set_processing_handler(
        ugcs::vsm::Make_callback(
            [](int arg, ugcs::vsm::Request::Ptr req)
            {
                LOG("Processing handler called, arg %d", arg);
                req->Complete();
            }, 10, req));
    req->Set_completion_handler(comp_ctx,
        ugcs::vsm::Make_callback(
            [](int arg)
            {
                LOG("Completion notification handler called, arg %d", arg);
            }, 20));
    LOG("Before request submission");
    processor->Submit_request(req);
    /* Give a chance to process the request in a parallel thread. */
    std::this_thread::sleep_for(std::chrono::seconds(1));
    LOG("After request submission");

    /* Disable attached containers and the worker itself. */
    worker->Disable_containers();
    worker->Disable();
}

class Sample_processor: public ugcs::vsm::Request_processor {
    DEFINE_COMMON_CLASS(Sample_processor, ugcs::vsm::Request_processor)
public:
    /* Type for result handler. */
    typedef ugcs::vsm::Callback_proxy<void, double> Handler;
    /* Builder for handler. */
    DEFINE_CALLBACK_BUILDER(Make_handler, (double), (3.14));

    Sample_processor() : ugcs::vsm::Request_processor("Sample processor") {}

    /* The method for accessing processor provided services. */
    ugcs::vsm::Operation_waiter
    Sample_api_method(/* Some request parameter. */
                      int param,
                      /* Result handler. Default one does nothing thus discarding
                       * the result.
                       */
                      Handler handler = ugcs::vsm::Make_dummy_callback<void, double>(),
                      /* Completion context for result handler invocation. Default
                       * value will use processor context.
                       */
                      ugcs::vsm::Request_completion_context::Ptr comp_ctx = nullptr);

private:
    /* Default completion context if the caller does not provide own one. */
    ugcs::vsm::Request_completion_context::Ptr def_comp_ctx;
    /* Worker with dedicated thread for this processor. */
    ugcs::vsm::Request_worker::Ptr worker;

    /* Request processing handler. It is always invoked in the processor dedicated thread. */
    void
    Process_api_call(int param, ugcs::vsm::Request::Ptr request, Handler handler);

    /* Called when the processor is enabled. */
    virtual void
    On_enable() override;

    /* Called when the processor is enabled. */
    virtual void
    On_disable() override;
};
void
Sample_processor::On_enable()
{
    ugcs::vsm::Request_processor::On_enable();
    def_comp_ctx = ugcs::vsm::Request_completion_context::Create("Completion context");
    def_comp_ctx->Enable();
    worker = ugcs::vsm::Request_worker::Create("Worker",
        std::initializer_list<ugcs::vsm::Request_container::Ptr>{Shared_from_this(), def_comp_ctx});
    worker->Enable();
}

void
Sample_processor::On_disable()
{
    Set_disabled();
    worker->Disable();
    def_comp_ctx->Disable();
    def_comp_ctx = nullptr;
    worker = nullptr;
}
ugcs::vsm::Operation_waiter
Sample_processor::Sample_api_method(int param,
                                    Handler handler,
                                    ugcs::vsm::Request_completion_context::Ptr comp_ctx)
{
    ugcs::vsm::Request::Ptr req = ugcs::vsm::Request::Create();
    req->Set_processing_handler(
        ugcs::vsm::Make_callback(&Sample_processor::Process_api_call, Shared_from_this(),
                           param, req, handler));
    req->Set_completion_handler(comp_ctx ? comp_ctx : def_comp_ctx,
                                handler);
    Submit_request(req);
    return req;
}
void
Sample_processor::Process_api_call(int param, ugcs::vsm::Request::Ptr request,
                                   Handler handler)
{
    auto lock = request->Lock();
    if (!request->Is_processing()) {
        /* It might be canceled. */
        return;
    }
    handler.Set_args(param * 2);
    request->Complete(ugcs::vsm::Request::Status::OK, std::move(lock));
}
void
Custom_processor()
{
    Sample_processor::Ptr processor = Sample_processor::Create();
    processor->Enable();

    auto handler = Sample_processor::Make_handler(
        [](double result, int user_param)
        {
            LOG("Request completed, result %f, user param %d", result, user_param);
        }, 30);
    processor->Sample_api_method(10, handler);
    /* Give a chance to process the request in a parallel thread. */
    std::this_thread::sleep_for(std::chrono::seconds(1));

    processor->Disable();
}

int
main (int argc, char *argv[])
{
    ugcs::vsm::Initialize(argc, argv);
    Simple_callbacks();

    Callback_proxies();

    Requests_and_contexts();

    Custom_processor();

    ugcs::vsm::Terminate();
    return 0;
}