Jun 16th update

This commit is contained in:
noam
2023-01-17 11:34:09 +02:00
parent 90bcc544a2
commit ad04b8d063
168 changed files with 64034 additions and 932 deletions

View File

@@ -20,9 +20,13 @@
#include "cache.h"
#include "config.h"
#include "tenant_profile_pair.h"
#include "hash_combine.h"
using namespace std;
USE_DEBUG_FLAG(D_TENANT_MANAGER);
class TenantManager::Impl
:
Singleton::Provide<I_TenantManager>::From<TenantManager>
@@ -32,25 +36,26 @@ public:
void fini();
void uponNewTenants(const newTenantCB &cb) override;
bool isTenantActive(const string &tenant_id) const override;
bool areTenantAndProfileActive(const string &tenant_id, const string &profile_id) const override;
vector<string> fetchAllActiveTenants() const override;
vector<string> fetchActiveTenants() const override;
vector<string> getInstances(const string &tenant_id) const override;
vector<string> getInstances(const string &tenant_id, const string &profile_id) const override;
vector<string> fetchProfileIds(const string &tenant_id) const override;
void addActiveTenant(const string &tenant_id) override;
void addActiveTenants(const vector<string> &tenants_id) override;
void addActiveTenantAndProfile(const string &tenant_id, const string &profile_id) override;
void deactivateTenant(const string &tenant_id) override;
void deactivateTenants(const vector<string> &tenants_id) override;
void deactivateTenant(const string &tenant_id, const string &profile_id) override;
chrono::microseconds getTimeoutVal() const override;
void
addInstance(const string &tenant_id, const string &instace_id)
addInstance(const string &tenant_id, const string &profile_id, const string &instace_id)
{
auto tenant_cache = mapper.find(tenant_id);
auto tenant_profile_pair = TenantProfilePair(tenant_id, profile_id);
auto tenant_cache = mapper.find(tenant_profile_pair);
if (tenant_cache == mapper.end()) {
tenant_cache = mapper.insert(make_pair(tenant_id, TemporaryCache<string, void>())).first;
tenant_cache = mapper.insert(make_pair(tenant_profile_pair, TemporaryCache<string, void>())).first;
tenant_cache->second.startExpiration(
getTimeoutVal(),
Singleton::Consume<I_MainLoop>::by<TenantManager>(),
@@ -63,11 +68,14 @@ public:
private:
void runUponNewTenants(const vector<string> &new_tenants);
void sendTenant(const vector<string> &tenant_id);
bool sendWithCustomPort(const vector<string> &tenant_id, const uint16_t port);
void sendTenantAndProfile(const string &tenant_id, const string &profile_id);
vector<string> getAllTenants() const;
vector<string> fetchAllProfileIds(const string &tenant_id) const;
vector<string> getProfileIds(const string &profile_id) const;
bool sendWithCustomPort(const string &tenant_id, const string &profile_id, const uint16_t port);
TemporaryCache<string, void> active_tenants;
map<string, TemporaryCache<string, void>> mapper;
TemporaryCache<TenantProfilePair, void> active_tenants;
map<TenantProfilePair, TemporaryCache<string, void>> mapper;
vector<I_TenantManager::newTenantCB> upon_cb;
I_Messaging *i_messaging = nullptr;
@@ -82,31 +90,76 @@ public:
doCall() override
{
auto i_tenant_manager = Singleton::Consume<I_TenantManager>::from<TenantManager>();
i_tenant_manager->addActiveTenants(tenant_ids.get());
for (const auto &tenant_id: tenant_ids.get()) {
i_tenant_manager->addInstance(tenant_id, instance_id.get());
}
i_tenant_manager->addActiveTenantAndProfile(tenant_id.get(), profile_id.get());
i_tenant_manager->addInstance(tenant_id.get(), profile_id.get(), instance_id.get());
}
private:
C2S_LABEL_PARAM(vector<string>, tenant_ids, "tenantIds");
C2S_LABEL_PARAM(string, instance_id, "instanceId");
C2S_LABEL_PARAM(string, tenant_id, "tenantId");
C2S_LABEL_PARAM(string, profile_id, "profileId");
C2S_LABEL_PARAM(string, instance_id, "instanceId");
};
class SendNewTenants : public ClientRest
{
public:
SendNewTenants(const vector<string> &_tenant_ids)
SendNewTenants(const string &_tenant_id, const string &_profile_id)
:
tenant_ids(_tenant_ids)
tenant_id(_tenant_id),
profile_id(_profile_id)
{
auto _instance_id = Singleton::Consume<I_InstanceAwareness>::by<TenantManager>()->getUniqueID();
instance_id = _instance_id.ok() ? *_instance_id : "default";
}
private:
C2S_LABEL_PARAM(vector<string>, tenant_ids, "tenantIds");
C2S_LABEL_PARAM(string, instance_id, "instanceId");
C2S_LABEL_PARAM(string, tenant_id, "tenantId");
C2S_LABEL_PARAM(string, profile_id, "profileId");
C2S_LABEL_PARAM(string, instance_id, "instanceId");
};
class FetchActiveTenants : public ServerRest
{
public:
void
doCall() override
{
active_tenants = Singleton::Consume<I_TenantManager>::from<TenantManager>()->fetchAllActiveTenants();
}
S2C_PARAM(std::vector<std::string>, active_tenants);
};
class GetActiveTenants : public ClientRest
{
public:
GetActiveTenants() : active_tenants() {};
Maybe<string> genJson() const { return string("{}"); };
S2C_PARAM(vector<string>, active_tenants);
};
class FetchProfileIds : public ServerRest
{
public:
void
doCall() override
{
profile_ids = Singleton::Consume<I_TenantManager>::from<TenantManager>()->fetchProfileIds(tenant_id);
}
S2C_PARAM(vector<string>, profile_ids);
C2S_PARAM(string, tenant_id);
};
class GetProfileIds : public ClientRest
{
public:
GetProfileIds(const string &_tenant_id) : profile_ids(), tenant_id(_tenant_id) {};
S2C_PARAM(vector<string>, profile_ids);
C2S_PARAM(string, tenant_id);
};
void
@@ -129,6 +182,8 @@ TenantManager::Impl::init()
if (type == TenantManagerType::SERVER) {
auto rest = Singleton::Consume<I_RestApi>::by<TenantManager>();
rest->addRestCall<LoadNewTenants>(RestAction::SET, "tenant-id");
rest->addRestCall<FetchActiveTenants>(RestAction::SHOW, "active-tenants");
rest->addRestCall<FetchActiveTenants>(RestAction::SHOW, "profile-ids");
}
if (type == TenantManagerType::CLIENT) {
@@ -144,8 +199,13 @@ TenantManager::Impl::init()
interval,
[this] ()
{
auto tenants_id = fetchActiveTenants();
sendTenant(tenants_id);
auto tenants_ids = fetchActiveTenants();
for (auto tenant_id : tenants_ids) {
auto profile_ids = fetchAllProfileIds(tenant_id);
for (auto profile_id : profile_ids) {
sendTenantAndProfile(tenant_id, profile_id);
}
}
},
"Tenant manager client reporter"
);
@@ -160,21 +220,21 @@ TenantManager::Impl::fini()
}
bool
TenantManager::Impl::sendWithCustomPort(const vector<string> &tenants_id, const uint16_t port)
TenantManager::Impl::sendWithCustomPort(const string &tenant_id, const string &profile_id, const uint16_t port)
{
if (i_messaging == nullptr) {
i_messaging = Singleton::Consume<I_Messaging>::by<TenantManager>();
}
SendNewTenants new_tenants(tenants_id);
SendNewTenants new_tenant_and_profile(tenant_id, profile_id);
return i_messaging->sendNoReplyObject(
new_tenants,
new_tenant_and_profile,
I_Messaging::Method::POST,
"127.0.0.1",
port,
conn_flags,
"set-tenant-id"
"/set-tenant-id"
);
}
@@ -191,10 +251,11 @@ TenantManager::Impl::runUponNewTenants(const vector<string> &new_tenants)
}
void
TenantManager::Impl::sendTenant(const vector<string> &tenants_id)
TenantManager::Impl::sendTenantAndProfile(const string &tenant_id, const string &profile_id)
{
auto res = sendWithCustomPort(
tenants_id,
tenant_id,
profile_id,
getConfigurationWithDefault<uint16_t>(
7777,
"Tenant Manager",
@@ -204,7 +265,8 @@ TenantManager::Impl::sendTenant(const vector<string> &tenants_id)
if (!res) {
sendWithCustomPort(
tenants_id,
tenant_id,
profile_id,
getConfigurationWithDefault<uint16_t>(
7778,
"Tenant Manager",
@@ -214,6 +276,66 @@ TenantManager::Impl::sendTenant(const vector<string> &tenants_id)
}
}
vector<string>
TenantManager::Impl::getAllTenants() const
{
dbgFlow(D_TENANT_MANAGER) << "Tenant Manager is a client. Requesting the active tenants";
GetActiveTenants active_tenant;
auto res = i_messaging->sendObject(
active_tenant,
I_Messaging::Method::POST,
"127.0.0.1",
7777,
conn_flags,
"/show-active-tenants"
);
if (!res) {
i_messaging->sendObject(
active_tenant,
I_Messaging::Method::POST,
"127.0.0.1",
7778,
conn_flags,
"/show-active-tenants"
);
}
return active_tenant.active_tenants.get();
}
vector<string>
TenantManager::Impl::getProfileIds(const string &tenant_id) const
{
dbgFlow(D_TENANT_MANAGER) << "Tenant Manager is a client. Requesting the active tenants";
GetProfileIds profile_id(tenant_id);
auto res = i_messaging->sendObject(
profile_id,
I_Messaging::Method::POST,
"127.0.0.1",
7777,
conn_flags,
"/show-profile-ids"
);
if (!res) {
i_messaging->sendObject(
profile_id,
I_Messaging::Method::POST,
"127.0.0.1",
7778,
conn_flags,
"/show-profile-ids"
);
}
return profile_id.profile_ids.get();
}
void
TenantManager::Impl::uponNewTenants(const newTenantCB &cb)
{
@@ -221,69 +343,84 @@ TenantManager::Impl::uponNewTenants(const newTenantCB &cb)
}
bool
TenantManager::Impl::isTenantActive(const string &tenant_id) const
TenantManager::Impl::areTenantAndProfileActive(const string &tenant_id, const string &profile_id) const
{
return active_tenants.doesKeyExists(tenant_id);
return active_tenants.doesKeyExists(TenantProfilePair(tenant_id, profile_id));
}
void
TenantManager::Impl::addActiveTenant(const string &tenant_id)
TenantManager::Impl::addActiveTenantAndProfile(const string &tenant_id, const string &profile_id)
{
active_tenants.createEntry(tenant_id);
auto tenant_profile = TenantProfilePair(tenant_id, profile_id);
active_tenants.createEntry(tenant_profile);
if (type == TenantManagerType::CLIENT) {
sendTenant({tenant_id});
sendTenantAndProfile(tenant_id, profile_id);
} else {
runUponNewTenants({tenant_id});
}
}
void
TenantManager::Impl::addActiveTenants(const vector<string> &tenants_id)
TenantManager::Impl::deactivateTenant(const string &tenant_id, const string &profile_id)
{
for (const auto &tenant_id: tenants_id) active_tenants.createEntry(tenant_id);
if (type == TenantManagerType::CLIENT) {
sendTenant(tenants_id);
} else {
runUponNewTenants(tenants_id);
}
active_tenants.deleteEntry(TenantProfilePair(tenant_id, profile_id));
}
void
TenantManager::Impl::deactivateTenant(const string &tenant_id)
vector<string>
TenantManager::Impl::fetchAllActiveTenants() const
{
active_tenants.deleteEntry(tenant_id);
}
void
TenantManager::Impl::deactivateTenants(const vector<string> &tenants_id)
{
for (const auto &tenant_id: tenants_id) deactivateTenant(tenant_id);
dbgFlow(D_TENANT_MANAGER) << "Fetching all active tenants";
return (type == TenantManagerType::CLIENT) ? getAllTenants() : fetchActiveTenants();
}
vector<string>
TenantManager::Impl::fetchActiveTenants() const
{
dbgFlow(D_TENANT_MANAGER) << "Tenant Manager is a server. Fetching active tenants";
vector<string> tenants;
tenants.reserve(active_tenants.size());
for (auto iter = begin(active_tenants); iter != end(active_tenants); iter++) {
tenants.push_back(iter->first);
dbgDebug(D_TENANT_MANAGER) << "Found a tenant to return. Tenant ID: " << iter->first.getTenantId();
tenants.push_back(iter->first.getTenantId());
}
return tenants;
}
vector<string>
TenantManager::Impl::getInstances(const string &tenant_id) const
TenantManager::Impl::getInstances(const string &tenant_id, const string &profile_id) const
{
vector<string> tenants;
vector<string> instances;
auto tenant_profile_pair = TenantProfilePair(tenant_id, profile_id);
auto tenant_instance_cache = mapper.find(tenant_profile_pair);
auto tenant_instance_cache = mapper.find(tenant_id);
if (tenant_instance_cache == mapper.end()) return tenants;
if (tenant_instance_cache == mapper.end()) return instances;
tenants.reserve(tenant_instance_cache->second.size());
instances.reserve(tenant_instance_cache->second.size());
for (auto iter = begin(tenant_instance_cache->second); iter != end(tenant_instance_cache->second); iter++) {
tenants.push_back(iter->first);
instances.push_back(iter->first);
}
return tenants;
return instances;
}
vector<string>
TenantManager::Impl::fetchAllProfileIds(const string &tenant_id) const
{
vector<string> tenant_profile_ids;
for (auto iter = begin(active_tenants); iter != end(active_tenants); iter++) {
if (iter->first.getTenantId() == tenant_id) {
tenant_profile_ids.push_back(iter->first.getPfofileId());
}
}
return tenant_profile_ids;
}
vector<string>
TenantManager::Impl::fetchProfileIds(const string &tenant_id) const
{
dbgFlow(D_TENANT_MANAGER) << "Fetching all profile ids for tenant " << tenant_id;
return (type == TenantManagerType::CLIENT) ? getProfileIds(tenant_id) : fetchAllProfileIds(tenant_id);
}
chrono::microseconds