async_workflow.cppΒΆ
Example for asynchronous operations workflow.
Example for asynchronous operations workflow. See corresponding tutorial.
// Copyright (c) 2018, Smart Projects Holdings Ltd // All rights reserved. // See LICENSE file for license details. #include <ugcs/vsm/vsm.h> /* Target function for callback. */ int Sample_callback_function(int arg) { LOG("Sample callback function, arg: %d", arg); return arg + 10; } class Sample_class { public: int Sample_callback_method(int arg) { LOG("Sample callback method, arg: %d", arg); return arg + 10; } }; class Callable_class { public: int x; Callable_class(int x): x(x) {} int operator ()(int y) { LOG("Sample callback method, arg: %d", y); return x + y; } }; /* Callbacks creation example. */ void Simple_callbacks() { auto func_cbk = ugcs::vsm::Make_callback(Sample_callback_function, 10); LOG("Callback call result: %d", func_cbk()); Sample_class class_instance; auto method_cbk = ugcs::vsm::Make_callback(&Sample_class::Sample_callback_method, &class_instance, 10); LOG("Callback call result: %d", method_cbk()); Callable_class callable_instance(10); auto callable_cbk = ugcs::vsm::Make_callback(callable_instance, 10); LOG("Callback call result: %d", callable_cbk()); auto lambda_cbk = ugcs::vsm::Make_callback( [](int arg) { LOG("Sample lambda callback, arg: %d", arg); return arg + 10; }, 10); LOG("Callback call result: %d", lambda_cbk()); } typedef ugcs::vsm::Callback_proxy<int, double> Sample_handler_type; void Some_api_method(Sample_handler_type handler) { LOG("Callback result: %d", handler(20)); } DEFINE_CALLBACK_BUILDER(Sample_handler_builder, (double), (3.14)) /* Illustration for using ugcs::vsm::Callback_proxy class */ void Callback_proxies() { auto My_callback_target = [](double enforced_arg, int user_arg) { LOG("Callback called, enforced argument %f user argument %d", enforced_arg, user_arg); return 30; }; Some_api_method(Sample_handler_builder(My_callback_target, 10)); } void Requests_and_contexts() { /* Create waiter object for our contexts. */ ugcs::vsm::Request_waiter::Ptr waiter = ugcs::vsm::Request_waiter::Create(); /* Create processor for requests. */ ugcs::vsm::Request_processor::Ptr processor = ugcs::vsm::Request_processor::Create("Processor", waiter); /* Create completion context for notifications processing. */ ugcs::vsm::Request_completion_context::Ptr comp_ctx = ugcs::vsm::Request_completion_context::Create("Completion context", waiter); processor->Enable(); comp_ctx->Enable(); ugcs::vsm::Request::Ptr req = ugcs::vsm::Request::Create(); req->Set_processing_handler( ugcs::vsm::Make_callback( [](int arg, ugcs::vsm::Request::Ptr req) { LOG("Processing handler called, arg %d", arg); req->Complete(); }, 10, req)); req->Set_completion_handler(comp_ctx, ugcs::vsm::Make_callback( [](int arg) { LOG("Completion notification handler called, arg %d", arg); }, 20)); processor->Submit_request(req); LOG("Request submitted"); LOG("Before processor running"); waiter->Wait_and_process({processor}); LOG("After processor running"); LOG("Before completion context running"); waiter->Wait_and_process({comp_ctx}); LOG("After completion context running"); processor->Disable(); comp_ctx->Disable(); /* Demonstrate contexts serving in a separated thread. */ ugcs::vsm::Request_worker::Ptr worker = ugcs::vsm::Request_worker::Create("Another thread", std::initializer_list<ugcs::vsm::Request_container::Ptr>{processor, comp_ctx}); /* Enable all attached containers. */ worker->Enable_containers(); /* Enable the worker. This will launch its thread. */ worker->Enable(); req = ugcs::vsm::Request::Create(); req->Set_processing_handler( ugcs::vsm::Make_callback( [](int arg, ugcs::vsm::Request::Ptr req) { LOG("Processing handler called, arg %d", arg); req->Complete(); }, 10, req)); req->Set_completion_handler(comp_ctx, ugcs::vsm::Make_callback( [](int arg) { LOG("Completion notification handler called, arg %d", arg); }, 20)); LOG("Before request submission"); processor->Submit_request(req); /* Give a chance to process the request in a parallel thread. */ std::this_thread::sleep_for(std::chrono::seconds(1)); LOG("After request submission"); /* Disable attached containers and the worker itself. */ worker->Disable_containers(); worker->Disable(); } class Sample_processor: public ugcs::vsm::Request_processor { DEFINE_COMMON_CLASS(Sample_processor, ugcs::vsm::Request_processor) public: /* Type for result handler. */ typedef ugcs::vsm::Callback_proxy<void, double> Handler; /* Builder for handler. */ DEFINE_CALLBACK_BUILDER(Make_handler, (double), (3.14)); Sample_processor() : ugcs::vsm::Request_processor("Sample processor") {} /* The method for accessing processor provided services. */ ugcs::vsm::Operation_waiter Sample_api_method(/* Some request parameter. */ int param, /* Result handler. Default one does nothing thus discarding * the result. */ Handler handler = ugcs::vsm::Make_dummy_callback<void, double>(), /* Completion context for result handler invocation. Default * value will use processor context. */ ugcs::vsm::Request_completion_context::Ptr comp_ctx = nullptr); private: /* Default completion context if the caller does not provide own one. */ ugcs::vsm::Request_completion_context::Ptr def_comp_ctx; /* Worker with dedicated thread for this processor. */ ugcs::vsm::Request_worker::Ptr worker; /* Request processing handler. It is always invoked in the processor dedicated thread. */ void Process_api_call(int param, ugcs::vsm::Request::Ptr request, Handler handler); /* Called when the processor is enabled. */ virtual void On_enable() override; /* Called when the processor is enabled. */ virtual void On_disable() override; }; void Sample_processor::On_enable() { ugcs::vsm::Request_processor::On_enable(); def_comp_ctx = ugcs::vsm::Request_completion_context::Create("Completion context"); def_comp_ctx->Enable(); worker = ugcs::vsm::Request_worker::Create("Worker", std::initializer_list<ugcs::vsm::Request_container::Ptr>{Shared_from_this(), def_comp_ctx}); worker->Enable(); } void Sample_processor::On_disable() { Set_disabled(); worker->Disable(); def_comp_ctx->Disable(); def_comp_ctx = nullptr; worker = nullptr; } ugcs::vsm::Operation_waiter Sample_processor::Sample_api_method(int param, Handler handler, ugcs::vsm::Request_completion_context::Ptr comp_ctx) { ugcs::vsm::Request::Ptr req = ugcs::vsm::Request::Create(); req->Set_processing_handler( ugcs::vsm::Make_callback(&Sample_processor::Process_api_call, Shared_from_this(), param, req, handler)); req->Set_completion_handler(comp_ctx ? comp_ctx : def_comp_ctx, handler); Submit_request(req); return req; } void Sample_processor::Process_api_call(int param, ugcs::vsm::Request::Ptr request, Handler handler) { auto lock = request->Lock(); if (!request->Is_processing()) { /* It might be canceled. */ return; } handler.Set_args(param * 2); request->Complete(ugcs::vsm::Request::Status::OK, std::move(lock)); } void Custom_processor() { Sample_processor::Ptr processor = Sample_processor::Create(); processor->Enable(); auto handler = Sample_processor::Make_handler( [](double result, int user_param) { LOG("Request completed, result %f, user param %d", result, user_param); }, 30); processor->Sample_api_method(10, handler); /* Give a chance to process the request in a parallel thread. */ std::this_thread::sleep_for(std::chrono::seconds(1)); processor->Disable(); } int main (int argc, char *argv[]) { ugcs::vsm::Initialize(argc, argv); Simple_callbacks(); Callback_proxies(); Requests_and_contexts(); Custom_processor(); ugcs::vsm::Terminate(); return 0; }