Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions stream_alert/shared/lookup_tables/driver_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def commit(self):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services
# /dynamodb.html#DynamoDB.Table.put_item
self._table.put_item(**put_item_args)
self._cache.set(key, value, self._cache_refresh_minutes)

except (ClientError, ConnectTimeoutError, ReadTimeoutError):
raise LookupTablesInitializationError(
Expand Down
26 changes: 14 additions & 12 deletions stream_alert/shared/lookup_tables/driver_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ def __init__(self, configuration):

self._cache = DriverCache(maximum_key_count=0)

# S3 cannot support a per-key TTL so I use a separate DriverCache that stores
# the global cache invalidation timer.
self._cache_clock = DriverCache()
self._dirty = False

# Explicitly set timeout for S3 connection. The boto default timeout is 60 seconds.
Expand Down Expand Up @@ -114,8 +117,9 @@ def commit(self):
# Upload to S3
self._s3_adapter.upload(data)

# Invalidate the cache key
self._cache.set(self._MAGIC_CACHE_TTL_KEY, False, 9999)
# Invalidate the cache key by setting a "False" value into the magic key
self._cache_clock.set(self._MAGIC_CACHE_TTL_KEY, False, 9999)
self._dirty = False

LOGGER.info('LookupTable (%s): Successfully uploaded new data', self.id)

Expand All @@ -124,12 +128,8 @@ def get(self, key, default=None):
return self._cache.get(key, default)

def set(self, key, value):
"""
Under construction
FIXME (derek.wang)
"""
self._reload_if_necessary()
self._cache.set(key, value, self._cache_refresh_minutes)
self._cache.set(key, value, 9999) # We don't do per-key cache TTLs
self._dirty = True

def _reload_if_necessary(self):
Expand All @@ -139,12 +139,12 @@ def _reload_if_necessary(self):

If it needs a reload, this method will appropriately call reload.
"""
if self._cache.has(self._MAGIC_CACHE_TTL_KEY) and \
self._cache.get(self._MAGIC_CACHE_TTL_KEY):
if self._cache_clock.has(self._MAGIC_CACHE_TTL_KEY) and \
self._cache_clock.get(self._MAGIC_CACHE_TTL_KEY, False):
LOGGER.debug(
'LookupTable (%s): Does not need refresh. TTL: %s',
self.id,
self._cache.ttl(self._MAGIC_CACHE_TTL_KEY)
self._cache_clock.ttl(self._MAGIC_CACHE_TTL_KEY)
)

else:
Expand Down Expand Up @@ -173,8 +173,10 @@ def _reload(self):
# as JSON.
data = Encoding.json_decode(self, bytes_data)

self._cache.setall(data, self._cache_refresh_minutes)
self._cache.set(self._MAGIC_CACHE_TTL_KEY, True, self._cache_refresh_minutes)
# We don't do per-key cache TTLs; instead, we use a single global cache TTL that's set
# as a "True" value in the magic key
self._cache.setall(data, 9999)
self._cache_clock.set(self._MAGIC_CACHE_TTL_KEY, True, self._cache_refresh_minutes)
LOGGER.info('LookupTable (%s): Successfully loaded', self.id)


Expand Down
32 changes: 27 additions & 5 deletions stream_alert_cli/test/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ def _run_classification(record):
def _run_rules_engine(self, record):
"""Create a fresh rules engine and process the record, returning the result"""
with patch.object(rules_engine.ThreatIntel, '_query') as ti_mock, \
patch.object(rules_engine.LookupTables, 'get_instance') as lt_mock, \
patch.object(rules_engine, 'AlertForwarder'), \
patch.object(rules_engine, 'RuleTable') as rule_table, \
patch('rules.helpers.base.random_bool', return_value=True):
Expand All @@ -256,16 +255,39 @@ def _run_rules_engine(self, record):
# non-required outputs to get properly populated on the Alerts that are generated
# when running the Rules Engine.
rule_table.return_value = False

ti_mock.side_effect = self._threat_intel_mock

# pylint: disable=protected-access
rules_engine.LookupTables._tables = self._lookup_tables_mock
lt_mock.return_value = rules_engine.LookupTables
_rules_engine = rules_engine.RulesEngine()

self._install_lookup_tables_mocks(_rules_engine)

return _rules_engine.run(records=record)

# pylint: disable=protected-access
def _install_lookup_tables_mocks(self, rules_engine_instance):
"""
Extremely gnarly, extremely questionable manner to install mocking data into our tables.
The reason this exists at all is to support the secret features of table scanning S3-backed
tables, which isn't a "normally" available feature but is required for some pre-existing
StreamAlert users.
"""
from stream_alert.shared.lookup_tables.driver_dynamodb import DynamoDBDriver
from stream_alert.shared.lookup_tables.driver_s3 import S3Driver

mock_data = self._lookup_tables_mock
for table_name, table in rules_engine_instance._lookup_tables._tables.items():
table._initialized = True
driver = table._driver
data = mock_data.get(table_name, {})
if isinstance(driver, S3Driver):
driver._cache.setall(data, 999999)
driver._cache_clock.set(S3Driver._MAGIC_CACHE_TTL_KEY, True, 999999)
continue

if isinstance(driver, DynamoDBDriver):
driver._cache.setall(data, 999999)
continue

@staticmethod
def _run_alerting(record):
"""Create a fresh alerts processor and send the alert(s), returning the result"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,15 @@ def test_no_need_refresh(self, mock_logger):
def test_barely_does_not_need_refresh(self, mock_logger):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL at this test name test_barely_does_not_need_refresh

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

barely

"""LookupTables - Drivers - S3 Driver - Refresh - Barely Does not need refresh"""
# Wind the clock back; note this is before initialize.
self._foo_driver._cache._clock.time_machine(datetime(year=3000, month=1, day=1))
self._foo_driver._cache_clock._clock.time_machine(datetime(year=3000, month=1, day=1))

self._foo_driver.initialize()

# Mess up some of the data so we fake that it's "stale"
self._foo_driver._cache.set('key_1', 'stale', 10) # 10-minute ttl

# Wind the "clock" forward JUST BEFORE it needs a refresh
self._foo_driver._cache._clock.time_machine(
self._foo_driver._cache_clock._clock.time_machine(
datetime(year=3000, month=1, day=1, minute=9, second=59)
)

Expand All @@ -207,11 +207,11 @@ def test_needs_refresh(self, mock_logger):
self._foo_driver.initialize()

# Mess up some of the data so we fake that it's "stale"
self._foo_driver._cache._clock.time_machine(datetime(year=3000, month=1, day=1))
self._foo_driver._cache_clock._clock.time_machine(datetime(year=3000, month=1, day=1))
self._foo_driver._cache.set('key_1', 'stale', 10) # 10-minute ttl

# Wind the "clock" forward JUST AFTER it needs a refresh
self._foo_driver._cache._clock.time_machine(
self._foo_driver._cache_clock._clock.time_machine(
datetime(year=3000, month=1, day=1, minute=10, second=1)
)

Expand Down