allow streams to marked as exhausted in more cases

At stream boundaries, we can mark streams as exhausted if there are no
groups active and there are no other ways to report matches. This allows us
to stop maintaining the history buffer on subsequent stream writes.
Previously, streams were only marked as exhausted if a pure highlander case
reported all patterns or the outfix in a sole outfix case died.
This commit is contained in:
Alex Coyte
2017-01-31 09:29:41 +11:00
committed by Matthew Barr
parent fbaa0a1b25
commit bbd64f98ae
11 changed files with 171 additions and 30 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, Intel Corporation
* Copyright (c) 2016-2017, Intel Corporation
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
@@ -33,6 +33,9 @@
#include "rose_build_groups.h"
#include "util/boundary_reports.h"
#include "util/compile_context.h"
#include <queue>
#include <vector>
@@ -71,24 +74,18 @@ bool superStrong(const rose_literal_id &lit) {
static
bool eligibleForAlwaysOnGroup(const RoseBuildImpl &build, u32 id) {
/* returns true if it or any of its delay versions have root role */
for (auto v : build.literal_info[id].vertices) {
if (build.isRootSuccessor(v)) {
NGHolder *h = build.g[v].left.graph.get();
if (!h || proper_out_degree(h->startDs, *h)) {
return true;
}
}
auto eligble = [&](RoseVertex v) {
return build.isRootSuccessor(v)
&& (!build.g[v].left || !isAnchored(build.g[v].left));
};
if (any_of_in(build.literal_info[id].vertices, eligble)) {
return true;
}
for (u32 delayed_id : build.literal_info[id].delayed_ids) {
for (auto v : build.literal_info[delayed_id].vertices) {
if (build.isRootSuccessor(v)) {
NGHolder *h = build.g[v].left.graph.get();
if (!h || proper_out_degree(h->startDs, *h)) {
return true;
}
}
if (any_of_in(build.literal_info[delayed_id].vertices, eligble)) {
return true;
}
}
@@ -170,6 +167,64 @@ u32 next_available_group(u32 counter, u32 min_start_group) {
return counter;
}
static
void allocateGroupForBoundary(RoseBuildImpl &build, u32 group_always_on,
map<u8, u32> &groupCount) {
/* Boundary reports at zero will always fired and forgotten, no need to
* worry about preventing the stream being marked as exhausted */
if (build.boundary.report_at_eod.empty()) {
return;
}
/* Group based stream exhaustion is only done at stream boundaries */
if (!build.cc.streaming) {
return;
}
DEBUG_PRINTF("allocating %u as boundary group id\n", group_always_on);
build.boundary_group_mask = 1ULL << group_always_on;
groupCount[group_always_on]++;
}
static
void allocateGroupForEvent(RoseBuildImpl &build, u32 group_always_on,
map<u8, u32> &groupCount, u32 *counter) {
if (build.eod_event_literal_id == MO_INVALID_IDX) {
return;
}
/* Group based stream exhaustion is only done at stream boundaries */
if (!build.cc.streaming) {
return;
}
rose_literal_info &info = build.literal_info[build.eod_event_literal_id];
if (info.vertices.empty()) {
return;
}
bool new_group = !groupCount[group_always_on];
for (RoseVertex v : info.vertices) {
if (build.g[v].left && !isAnchored(build.g[v].left)) {
new_group = false;
}
}
u32 group;
if (!new_group) {
group = group_always_on;
} else {
group = *counter;
*counter += 1;
}
DEBUG_PRINTF("allocating %u as eod event group id\n", *counter);
info.group_mask = 1ULL << group;
groupCount[group]++;
}
void assignGroupsToLiterals(RoseBuildImpl &build) {
auto &literals = build.literals;
auto &literal_info = build.literal_info;
@@ -211,6 +266,9 @@ void assignGroupsToLiterals(RoseBuildImpl &build) {
counter++;
}
allocateGroupForBoundary(build, group_always_on, groupCount);
allocateGroupForEvent(build, group_always_on, groupCount, &counter);
u32 min_start_group = counter;
priority_queue<tuple<s32, s32, u32>> pq;
@@ -453,6 +511,7 @@ rose_group getSquashableGroups(const RoseBuildImpl &build) {
}
DEBUG_PRINTF("squashable groups=0x%llx\n", squashable_groups);
assert(!(squashable_groups & build.boundary_group_mask));
return squashable_groups;
}
@@ -505,7 +564,7 @@ bool isGroupSquasher(const RoseBuildImpl &build, const u32 id /* literal id */,
lit_info.group_mask);
if (build.literals.right.at(id).table == ROSE_EVENT) {
DEBUG_PRINTF("event literal, has no groups to squash\n");
DEBUG_PRINTF("event literal\n");
return false;
}
@@ -628,7 +687,7 @@ bool isGroupSquasher(const RoseBuildImpl &build, const u32 id /* literal id */,
}
void findGroupSquashers(RoseBuildImpl &build) {
rose_group forbidden_squash_group = 0;
rose_group forbidden_squash_group = build.boundary_group_mask;
for (const auto &e : build.literals.right) {
if (e.second.delay) {
forbidden_squash_group |= build.literal_info[e.first].group_mask;