飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • UEditor使用文档
  • AngularJS教程
  • ThinkPHP5.0教程

使用 Apache Spark 进行 Azure 网络安全组管理

时间:2023-06-09  作者:电脑狂魔  

随着 IT 环境继续快速发展,网络安全对于保护敏感数据和维护网络完整性变得越来越重要。实施强大的访问控制措施,例如网络安全组(NSGs),对于确保网络安全至关重要。NSG 充当虚拟防火墙,根据预定义的规则允许或拒绝进出 Azure 资源的入站和出站流量。但是,管理和监视 NSG 可能会变得复杂,尤其是在处理大量规则和资源时。通过确保定义的规则既不过分宽松也不过分限制,在允许合法流量和维持高级别安全性之间保持平衡势在必行。本文介绍了一种方法,该方法利用 Apache Spark 和 Python 代码通过分析网络观察程序流事件日志来识别最佳用户规则集。所提出的方法旨在提高管理 NSG 的效率和有效性,同时确保 Azure 环境中的强大网络安全性。

介绍

Azure 网络安全组 (NSG) 通过激活规则或访问控制列表 (ACL)允许或拒绝流向虚拟网络中虚拟机实例的网络流量。NSG 可以与子网或该子网中的单个虚拟机实例相关联。通过 NSG 连接到子网的所有虚拟机实例均受其 ACL 规则约束。您还可以通过将 NSG 直接与其相关联来限制流向单个虚拟机的流量。每个网络安全组都包含一组默认规则。每个 NSG 中的默认规则包括三个入站规则和三个出站规则。默认规则无法删除,但由于它们被分配了最低优先级,您可以用自己的规则替换它们。

入站与出站

有两种类型的 NSG:入站和出站。

从 VM 的角度来看,评估 NSG 规则集方向。例如,入站规则会影响从外部源(例如 Internet 或其他虚拟机)发起到虚拟机的流量。从 VM 发送的流量受出站安全规则的影响。自动允许会话的返回流量,并且不会根据反向规则进行验证。我们的重点应该是允许(或拒绝)会话的客户端到服务器方向。

默认网络安全规则图 1- 默认网络安全规则

用户安全规则

在网络安全组 (NSG) 中,我们可以将具有高优先级编号的正确规则应用于网络接口或子网,以保护 Azure 资源。每个安全规则中包含以下字段:

  • 规则名称和描述

  • 优先级数,定义规则在规则集中的位置。最上面的规则首先被处理;因此,较小的数字具有较高的偏好

  • 带有端口号的源和目标(对于 TCP 和 UDP)

  • IP 协议类型,例如 TCP、UDP 和 ICMP。这 3 个协议几乎涵盖了所有的应用需求。“Any”关键字允许所有 IP 协议

用户规则在默认规则之上应用,并根据网络安全组中的 IP 地址、端口号和协议限制访问。NSG 的入站安全规则如图 2 所示。我们还可以在默认规则之上定义出站安全规则。


图 2 - 入站用户安全规则

有时用户规则可能过于宽松,即使源/目标受 IP 地址或 IP 网络范围限制。图 2 显示 151 优先级有一个“Any”端口,而所有其他规则都有“Any”作为目标,它对所有网络范围开放。如果网络团队不确定虚拟机网络之间可以使用哪些端口/协议/目的地/源和目的地,则必须实施微分段和许可规则。

微分段

公共云中的分段是指划分和隔离网络或基础设施的不同组件以增强安全性和控制的做法。它涉及实施各种措施来防止未经授权的访问并限制云环境中不同资源之间的通信。网络安全组 (NSG) 是在Azure 虚拟网络中实现分段的基本工具。

以下是 NSG 在公共云分段环境中的工作方式:

过滤入站流量: NSG 允许你定义入站安全规则,为传入网络流量指定允许的源、协议、端口和目标。通过配置这些规则,您可以限制对虚拟网络中特定资源的访问,例如虚拟机或应用程序。这有助于保护敏感数据并防止未经授权的访问。

过滤出站流量: 同样,NSG 使你能够定义出站安全规则,以控制离开虚拟网络的流量。这允许您限制来自特定资源的传出连接或将它们限制到特定的目的地、端口或协议。通过实施出站规则,您可以防止数据泄露并控制您的资源使用的通信渠道。

流量隔离: NSG 可以在子网级别应用,允许您对虚拟网络的不同部分进行分段。通过将 NSG 与子网相关联,您可以为每个子网实施特定的安全策略,从而控制它们之间的流量。这使您能够在网络中创建安全区域,隔离不同的应用程序或基础设施层。

网络级监视和日志记录: NSG 提供监视和记录网络流量的能力。Azure 为 NSG 提供诊断日志记录功能,允许您捕获和分析网络流日志。通过检查这些日志,您可以深入了解网络活动、识别潜在的安全威胁并解决连接问题。

通过利用 NSG 进行网络流量过滤,Azure 用户可以为在其公共云环境中进行分段打下坚实的基础。NSG 为实施安全策略、控制网络流量和实现资源之间的精细隔离提供了一种灵活且可扩展的解决方案。但是,需要注意的是,NSG 只是综合安全策略的一个组成部分,根据具体要求和合规性标准,可能需要额外的安全措施,例如网络虚拟设备、入侵检测系统或安全网关。

微分段和许可规则检查的好处

通过正确配置 NSG 规则实现的微分段在网络安全方面提供了几个好处:

  1. 增强的安全性: 微分段允许对网络流量进行细粒度控制,使组织能够根据特定规则限制资源之间的通信。这有助于防止网络内的横向移动并限制安全漏洞的潜在影响。

  2. 提高合规性: 通过实施宽松的规则检查,组织可以确保其 NSG 符合安全最佳实践和监管要求。这有助于维护安全且合规的网络基础架构。

  3. 最小化攻击面:微分段通过限制资源之间的通信路径来减少攻击面。它可以防止未经授权的访问并限制恶意行为者在网络中的移动。

  4. 简化网络管理:使用 Apache Spark 进行权限检查使组织能够自动分析和监视 NSG 规则,减少安全审计所需的手动工作。Apache Spark 的分布式计算能力可以高效处理大型数据集,适合网络基础设施复杂的组织。

  5. 快速检测和响应:微分段与宽松的规则检查相结合,使组织能够快速识别和响应任何未经授权或可疑的网络流量。通过分析 NSG 日志和验证规则,可以及时发现潜在的安全事件。

网络安全流事件和日志记录

网络安全组流日志记录是 Azure 网络观察程序的一项功能,可让你记录有关流经网络安全组的IP 流量的信息。流数据被发送到 Azure 存储,您可以从那里访问它并将其导出到您选择的任何可视化工具、安全信息和事件管理 (SIEM) 解决方案或入侵检测系统 (IDS)。

监控、管理和了解您的网络至关重要,这样您才能保护和优化它。您需要知道网络的当前状态、谁在连接以及用户从哪里连接。您还需要知道哪些端口对 Internet 开放,预期的网络行为是什么,哪些网络行为是不规则的,以及流量突然增加的时间。流日志是云环境中所有网络活动的真实来源。无论您是在尝试优化资源的初创公司,还是在尝试检测入侵的大型企业,流日志都可以提供帮助。您可以使用它们来优化网络流量、监控吞吐量、验证合规性、检测入侵等。

NSG 日志架构

我们可以通过定义 NSG 日志的架构来启动 PySpark 代码,其中包括类别、macAddress、operationName、属性、resourceId、systemId 和时间等字段。这种模式结构有助于有效地组织和处理日志数据。

insightsNeworkFlowEventsSchema = StructType() \
.add("records", ArrayType(StructType() \
.add("category", StringType(), True) \
.add("macAddress", StringType(), True) \
.add("operationName", StringType(), True) \
.add("properties", StructType()\
.add("Version", LongType(), True) \
.add("flows", ArrayType(StructType() \
.add("flows", ArrayType(StructType() \
.add("flowTuples", ArrayType(StringType(), True)) \
.add("mac", StringType(), True)\
, True))\
.add("rule", StringType(), True) \
), True)) \
.add("resourceId", StringType(), True) \
.add("systemId", StringType(), True) \
.add("time", StringType(), True) \
, True))

有关 Azure NSG Flow Events 的更多详细信息,请参阅此网站。

为了创建正确的许可规则,我从网络安全组中选择了以下参数。

  1. 资源 ID 或 SystemID 作为主键之一

  2. 规则名称,位于属性→流→规则下

  3. FlowTuples,在 Properties → Flows → Flows → FlowTuples 下

NSG Flow 将流事件存储为 JSON 文件,如下所示。

{"records":[{"time":"2023-01-26T17:30:域名900Z","systemId":"57785417-608e-4bba-80d6-25c3a0ebf423","macAddress":"6045BDA85225","category":"NetworkSecurityGroupFlowEvent","resourceId":"/SUBSCRIPTIONS/DA35404A-2612-4419-BAEF-45FCDCE6045E/RESOURCEGROUPS/ROHNU-RESOURCES/PROVIDERS/域名ORK/NETWORKSECURITYGROUPS/CVS-NSGLOGS-NSG","operationName":"NetworkSecurityGroupFlowEvents","properties":{"Version":2,"flows":[{"rule":"DefaultRule_DenyAllInBound","flows":[{"mac":"6045BDA85225","flowTuples":["1674754192,域名.107,域名.0.4,54227,46988,T,I,D,B,,,,","1674754209,域名.150,域名.0.4,43146,62839,T,I,D,B,,,,","1674754210,域名.91,域名.0.4,58965,63896,T,I,D,B,,,,","1674754212,域名.30,域名.0.4,52429,41973,T,I,D,B,,,,","1674754223,87.域名,域名.0.4,43000,8443,T,I,D,B,,,,","1674754236,域名.15,域名.0.4,41014,8022,T,I,D,B,,,,"]}]}]}},{"time":"2023-01-26T17:31:域名108Z","systemId":"57785417-608e-4bba-80d6-25c3a0ebf423","macAddress":"6045BDA85225","category":"NetworkSecurityGroupFlowEvent","resourceId":"/SUBSCRIPTIONS/DA35404A-2612-4419-BAEF-45FCDCE6045E/RESOURCEGROUPS/ROHNU-RESOURCES/PROVIDERS/域名ORK/NETWORKSECURITYGROUPS/CVS-NSGLOGS-NSG","operationName":"NetworkSecurityGroupFlowEvents","properties":{"Version":2,"flows":[{"rule":"DefaultRule_AllowInternetOutBound","flows":[{"mac":"6045BDA85225","flowTuples":["1674754265,域名.0.4,域名.123,49909,443,T,O,A,B,,,,","1674754265,域名.0.4,域名.96,49910,443,T,O,A,B,,,,","1674754267,域名.0.4,域名.96,49911,443,T,O,A,B,,,,","1674754267,域名.0.4,域名.123,49912,443,T,O,A,B,,,,","1674754268,域名.0.4,域名.133,49913,443,T,O,A,B,,,,","1674754268,域名.0.4,域名.123,49914,443,T,O,A,B,,,,","1674754271,域名.0.4,域名.123,49909,443,T,O,A,E,1,66,1,66","1674754271,域名.0.4,域名.96,49910,443,T,O,A,E,24,12446,1,66","1674754273,域名.0.4,域名.123,49912,443,T,O,A,E,15,3542,12,5567","1674754274,域名.0.4,域名.133,49913,443,T,O,A,E,12,1326,10,4979","1674754277,域名.0.4,域名.123,49914,443,T,O,A,E,13,2922,14,5722","1674754278,域名.0.4,域名.228,49916,443,T,O,A,B,,,,","1674754279,域名.0.4,域名.78,49918,443,T,O,A,B,,,,","1674754279,域名.0.4,域名.78,49917,443,T,O,A,B,,,,","1674754280,域名.0.4,13.域名,49919,80,T,O,A,B,,,,","1674754280,域名.0.4,13.域名,49920,80,T,O,A,B,,,,","1674754280,域名.0.4,13.域名,49921,80,T,O,A,B,,,,","1674754280,域名.0.4,13.域名,49922,80,T,O,A,B,,,,","1674754281,域名.0.4,域名.96,49911,443,T,O,A,E,87,11226,1093,1613130","1674754284,域名.0.4,域名.88,49923,443,T,O,A,B,,,,","1674754284,域名.0.4,域名.209,49924,443,T,O,A,B,,,,","1674754289,域名.0.4,13.域名,49925,80,T,O,A,B,,,,","1674754290,域名.0.4,域名.88,49923,443,T,O,A,E,14,2877,13,5627","1674754291,域名.0.4,域名.209,49924,443,T,O,A,E,12,1452,10,4692","1674754300,域名.0.4,域名.209,49927,443,T,O,A,B,,,,","1674754306,域名.0.4,域名.209,49927,443,T,O,A,E,10,3220,9,5415"]}]},{"rule":"DefaultRule_DenyAllInBound","flows":[{"mac":"6045BDA85225","flowTuples":["1674754254,域名.197,域名.0.4,46050,41834,T,I,D,B,,,,","1674754255,域名.102,域名.0.4,44049,49361,T,I,D,B,,,,","1674754263,域名.152,域名.0.4,53162,5985,T,I,D,B,,,,","1674754297,122.域名,域名.0.4,58757,23,T,I,D,B,,,,"]}]}]}}]}

启用网络安全组日志记录

Azure 中的网络观察器使用诊断设置功能将 NSG 流事件存储在 Azure 存储帐户中。启用后,诊断设置允许网络观察程序将 NSG 流事件发送到指定的存储帐户以进行保留和分析。NSG Flow 事件以日志文件的形式存储在存储帐户中。每个日志文件都包含有关网络观察器捕获的网络流的信息,包括源和目标 IP 地址、端口、协议、时间戳和操作(允许/拒绝)。可以将存储帐户配置为将 NSG 流事件存储在存储帐户内的特定容器中。日志文件通常存储在 Azure Blob 存储服务中,该服务为非结构化数据提供可扩展且持久的存储。通过利用诊断设置和 Azure 存储帐户,组织可以有效地收集和保留 NSG Flow 事件,用于分析、监控和合规性目的。这些数据随后可用于各种安全和网络分析场景,以深入了解网络流量模式并识别潜在的安全威胁或异常情况。

https://域名/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview

图 3 - 使用网络观察器启用 NSG 流日志记录

存储在 Azure 存储帐户中的 NSG 流事件的目录结构通常遵循分层组织。这是一个可能的目录结构的示例:

域名.域名/resourceId=/SUBSCRIPTIONS//RESOURCEGROUPS//PROVIDERS/域名ORK/NETWORKSECURITYGROUPS//y={year}/m={month}/d={day}/h={hours}/m=00/mac={macID}/域名" data-lang="text/x-python" style="box-sizing: border-box;">1个

abfs://见解-日志-networksecuritygroupflowevent @ <storageaccount>。_ _ dfs。核心。窗户。net / resourceId = / SUBSCRIPTIONS /< Subscriptions >/ RESOURCEGROUPS /< ResourceGroups >/ PROVIDERS / MICROSOFT . NETWORK / NETWORKSECURITYGROUPS /< NetworkSecurityGroup >/ y ={ year } / m={月} / d ={日} / h ={小时} / m = 0 0 / mac ={ macID } / PT1H。JSON


NSG 流事件的目录结构

图 4 - NSG 流事件的目录结构

存储帐户充当用于存储不同类型数据的顶级容器。在存储帐户中,创建了一个名为“”的容器,专门用于保存 NSG Flow 事件日志。NSG Flow 事件随后根据捕获时的订阅、资源组和网络安全组进行组织。日志通常以分层方式组织,从年份开始,然后是捕获的月、日和小时。在每个小时目录下,NSG Flow 事件日志文件存储为 MAC 地址下的 域名。这些日志文件包含实际捕获的网络流量数据,通常采用 JSON 等结构化格式。此目录结构允许根据事件发生的特定时间段轻松组织和检索 NSG Flow 事件。

如何使用 Pyspark 读取 NSG 流文件

下面提供的代码是用 Python 编写的,并利用 Apache Spark 读取和处理存储在 Azure Blob 存储中的 NSG(网络安全组)流日志。该代码利用 Apache Spark 的分布式计算功能来高效处理大型数据集并以并行方式执行所需的计算。它利用 Spark SQL 函数和操作来有效地操作和分析数据。让我们逐步分解代码:

火花配置

  • SparkConf()创建一个 Spark 配置对象。

  • . setAppName(appName)设置 Spark 应用程序的名称。

  • .setAll([...]) 为 Spark 设置额外的配置属性,例如在 SQL 查询中启用区分大小写、设置随机分区的数量以及指定用于访问 Azure Blob 存储的 SAS(共享访问签名)令牌。

Spark 会话和 Spark 上下文

  • 域名der 创建一个SparkSession,这是在 Spark 中处理结构化数据的入口点。

  • .config(conf=sparkConf)应用先前定义的 Spark 配置。

  • .getOrCreate()检索现有的 SparkSession 或创建一个新的(如果不存在)。

  • 域名kContext从 SparkSession 获取 Spark 上下文 (sc)。

Hadoop 文件系统配置

  • 域名提供对运行 Spark 的 Java 虚拟机 (JVM) 的访问。

  • 域名, 域名域名、域名域名System和域名域名iguration是用于处理 Hadoop 分布式文件系统 (HDFS) 的 Java 类。

  • 域名opConfiguration().set(...) 在 Hadoop 配置中设置用于访问 Azure Blob 存储的 SAS 令牌。

在订阅级别访问 NSG 流日志的选项之一,即选项 1 

  • fs = 域名(...)创建一个FileSystem与 Azure Blob 存储交互的对象。

  •  域名Status(Path("/resourceId=/SUBSCRIPTIONS/"))检索 的状态files/directories in the specified path ("/resourceId=/SUBSCRIPTIONS/")。

  • 然后,代码遍历文件和目录,以根据订阅、资源组和日期构建 NSG 流日志的路径。

  • NSG 流日志的路径存储在字典 (NSGDict) 中以供进一步处理。

  • print(NSGStatus)在循环之外,表示 NSGStatus 的最后一个值。它将打印最近订阅、资源组和日期的 NSG 流日志路径。

另一种选择是完全访问 NSG 流日志

  • 选项 2 提供了使用带“*”的正则表达式一次读取 NSG 日志的替代选项。它使用订阅、资源组、日期和小时的占位符构建路径模式。

from 域名 import SparkSession
from 域名 import SparkConf
# Create Spark configuration
spark_conf = SparkConf() \
.setAppName(app_name) \
.setAll([
('域名Sensitive', 'true'),
('域名.%s.%域名.域名' % (blob_container, blob_account), blob_sas_token),
("域名域名itions", "300"),
("域名llelism", "300")
])
# Create Spark session
spark = 域名ig(conf=spark_conf).getOrCreate()
sc = 域名kContext
# Set Hadoop configuration for Azure Blob Storage
域名opConfiguration().set('域名.%s.%域名.域名' % (blob_container, blob_account), blob_sas_token)
# OPTION 1 - Read the NSG Flow Logs at subscription level and create dictionary
URI = 域名.域名
Path = 域名.域名域名
FileSystem = 域名.域名域名System
Configuration = 域名.域名域名iguration
fs = 域名(域名te("abfs://insights-logs-networksecuritygroupflowevent@域名.域名"), Configuration())
status = 域名Status(Path("/resourceId=/SUBSCRIPTIONS/"))
nsg_dict = dict()
for file_status in status:
  subscription_name = str(域名ath().getName())
  resource_group_path = "/resourceId=/SUBSCRIPTIONS/"+subscription_name+"/RESOURCEGROUPS/*/PROVIDERS/域名ORK/NETWORKSECURITYGROUPS/"
  resource_group_status = 域名Status(Path(resource_group_path+"*/"))
  for resource_group_file in resource_group_status:
    nsg_path = str(域名ath())
    nsg_status = nsg_path + f"/y={year}/m={month}/d={day}"
    if 域名ts(Path(nsg_status)):
      if subscription_name in nsg_dict:
        nsg_dict[subscription_name].extend([nsg_status])
      else:
         nsg_dict[subscription_name] = [nsg_status]
print(nsg_status)
# OPTION 2 - Read all the NSG logs at once with "*" Regular expression like below
value = "abfs://insights-logs-networksecuritygroupflowevent@<storage_account>.域名域名/resourceId=/SUBSCRIPTIONS/*/RESOURCEGROUPS/*/PROVIDERS/域名ORK/NETWORKSECURITYGROUPS/*/y={year}/m={month}/d={day}/h={hours}"
key = "All_Subscriptions"

要确定何时使用选项 1 或选项 2 来读取 NSG 流日志,您可以考虑以下因素:

订阅数量:

  1. 当 Azure 中的订阅数量相当多时,建议使用选项 1。

  2. 在这种情况下,选项 1 允许您使用多线程并行处理 NSG 日志,从而利用系统的全部处理能力。

  3. 您提供的代码片段利用域名adPool创建线程池(在本例中为 75 个)并并行处理每个订阅。

  4. 这种方法有助于跨多个线程分配工作负载并提高整体处理效率。

NSG 流事件的总规模:

  1. 当每天的 NSG 流事件的总大小相当高时,选项 1 也是可取的。

  2. 通过利用多线程,您可以同时处理多个 NSG 日志,从而减少整体处理时间。

  3. 这在处理大量数据时特别有用,因为它允许并行处理和有效利用系统资源。

简单性和资源限制:

  1. 当订阅数量可管理且每天 NSG 流事件的总大小相对较小时,建议使用选项 2。

  2. 选项 2 涉及通过单个读取操作一次读取整个 NSG 日志文件,使其成为一种直接且更简单的方法。

  3. 当资源限制或处理时间不是主要问题时,此方法适用。

总之,如果您在 Azure 中有大量订阅或每天 NSG 流事件的总大小很大,建议使用选项 1,使用多线程并行处理。这允许有效利用系统资源和更快的处理。另一方面,如果订阅数量可控且 NSG 流事件的总大小相对较小,则选项 2 提供了一种更简单直接的方法。

加载 NSG 流文件

一旦从存储帐户中检索到文件和目录,下一步就是将它们作为数据帧加载到 Spark 中。在提供的代码片段中,recursiveFileLookup使用了该选项,这意味着即使到达“day”文件夹,Spark 也会遍历文件路径中名为“hour”、“minute”和“macAddress”的目录。

将 JSON 文件加载到 Spark 时,该inferSchema选项默认启用。这允许 Spark 分析 JSON 文件并在将它们加载到数据帧中时自动推断模式。但是,使用inferSchema. 此过程会对 Spark 的性能产生重大影响,尤其是在处理大量文件或大文件时。单独读取和分析每个文件的架构可能既耗时又耗费资源。

为了克服这个问题,强烈建议在将 JSON 文件加载到 Spark 时提供预定义的模式。通过提供模式,Spark 可以绕过模式推理步骤,直接根据提供的模式加载文件。这种方法消除了对每个文件进行模式分析的需要,从而提高了 Spark 的性能。可以通过定义 JSON 字段的结构和数据类型手动为 JSON 文件创建模式。StructType这可以使用 Spark 中的和类来实现StructField。一旦定义了模式,它就可以域名()作为参数传递给方法,确保 Spark 使用预定义的模式来加载文件。请参阅NSG 日志架构部分。 通过提供预定义的模式,Spark 可以有效地将 JSON 文件加载为数据帧,而无需模式推理的开销。这种方法提高了 Spark 的性能,尤其是在处理大量数据时。此外,它还可以更好地控制模式并确保数据结构的一致性,从而提高后续数据处理和分析任务的可靠性。

解析 NSG 流事件 JSON 文件

提供的代码定义了一个名为 NSGruleDef 的函数,该函数使用 Spark DataFrames 处理 NSG(网络安全组)流日志。让我们逐步分解代码:

加载 NSG 流日志:

  1. 域名on("recursiveFileLookup", "true")设置递归文件查找选项以启用遍历嵌套目录。

  2. .format("json")指定正在加载的文件为 JSON 格式。

  3. .schema(insightsNeworkFlowEventsSchema)(insightsNeworkFlowEventsSchema)指定加载 JSON 文件时要使用的预定义架构。

  4. .load(filepath)将提供的 JSON 文件加载filepath到名为loadNSGDF.

分解嵌套结构:

  1. explodeNSGDF = 域名ct(explode("records").alias("record"))使用 explode 函数来展平loadNSGDFDataFrame 中的嵌套记录结构。每条记录在explodeNSGDF.

  2. parsedNSGDF = 域名ct(col("域名urceId").alias("resource_id"),col("域名erties").getField("flows").alias("flows"))从 中提取特定列explodeNSGDF,包括 resourceId 和流(表示流数据)。

爆炸流元组:

  1. explodeFlowsDF = 域名Column("flow", explode("flows")).select("resource_id", col("域名").alias("rule_name"), col("域名Tuples").alias("flow_tuples"))再次使用 explode 函数将 flows 列扩展为多行,创建一个名为 flow 的新列。它还会选择resource_id, rule_name, 和flow_tuples列。

过滤 NSG 允许规则:

  1. filterNSGAllowDF = 域名e(~col('rule_name').contains('Deny'))过滤掉列中rule_name不包含字符串的行'Deny'。此步骤仅保留表示允许(非拒绝)规则的行。

纯文本1个

请注意,为了有效管理网络安全组的许可规则,在我们的例子中,我们建议只为网络安全组内的允许入站和出站规则设置规则。因此,在这种情况下我们忽略被拒绝的规则。


爆炸流元组:

  1. explodeFlowTuplesDF = 域名ct("resource_id", "rule_name", explode(col("flow_tuples")).alias("flow_rules"))flow_tuples使用 explode 函数进一步将列扩展为单独的行。它创建了一个名为explodeFlowTuplesDFcolumns resource_id、rule_name和的新 DataFrame flow_rules。

分组流元组:

  1. groupFlowTuplesDF = 域名pBy("resource_id", "rule_name").agg(collect_set("flow_rules").alias("collect_flow_tuples")

选择列并应用 UDF:

  1. 域名ct("system_id", "rule_name", collect_arrays_udf("collect_flow_tuples").alias("flow_rules")) 从DataFrame 中选择列system_id、rule_name和。collect_flow_tuplesgroupFlowTuplesDF

  2. collect_arrays_udf 指的是一种用户定义函数(UDF),它将collect_flow_tuples列作为输入并将多个数组合并为一个数组。UDF 将collect_flow_tuples列的元素聚合到一个数组或列表结构中。然后将生成的列别名为flow_rules.

选择最终列:

  1. .select("resource_id", "rule_name", "flow_rules")从中间 DataFrame 中选择列resource_id、rule_name和。flow_rules

  2. 此步骤确保最终的名为 的 DataFramecollectFlowTuplesDF包含用于进一步处理或分析的所需列。

Spark 中的 NSG 数据转换

网络安全组数据加载为具有多个记录的 Spark DataFrame。DataFrame 中的每条记录都包含有关网络安全组及其各自规则和的信息flowtuples。为了有效地处理此数据,记录分为三列:resourceID、ruleName和flowTuples,遵循 NSG 流事件的解析格式。此步骤可以更轻松地分析和操作数据。为了将flowTuples属于同一行的resourceID数据合并ruleName到一行中,该groupBy操作用于 DataFrame,根据这两列对数据进行分组。这个过程看似简单,但在 Spark 转换中,它实际上是根据resourceID和创建分区ruleName。从本质上讲,groupByresourceID操作执行与 Spark 中的重新分区 ( , )操作类似的功能ruleName。

使用groupBy有其优点和缺点。让我们从积极的方面开始。如果 NSG 用户规则和默认规则是均匀分布的,并且从一开始就不太宽松地创建,那么就不会有任何重大问题。该groupBy操作将成功组合flowTuplesfor eachresourceID和ruleName,从而产生所需的输出。

但是,在使用groupBy. 主要问题之一是它会创建不均匀的分区,从而导致数据倾斜。当分区之间的数据分布不平衡时会发生数据倾斜,导致某些分区包含的数据明显多于其他分区。这会对 Spark 作业的性能产生负面影响,因为某些分区可能需要更长时间才能处理,从而导致瓶颈。在某些情况下,数据倾斜可能非常严重,以至于 Spark 执行程序超过了 Kyro 序列化程序允许的最大缓冲区大小,通常设置为 2GB。发生这种情况时,作业将无法成功执行。groupBy为避免此类失败,仔细分析数据并确定使用是否是最佳方法至关重要。

数据倾斜 

Spark 数据倾斜是指在 Apache Spark 中执行并行处理时数据跨分区分布的不平衡。当数据分布不均匀或某些键或值比其他键或值不成比例地更常见时,就会发生这种倾斜。这导致某些分区处理的数据量远远大于其他分区,从而造成性能瓶颈并阻碍 Spark 计算的效率。

为了直观地解释这个概念,让我们考虑一个简单的例子。假设我们有一个客户交易数据集,其中每笔交易都与客户 ID 和货币价值相关联。我们想对这个数据集进行一些聚合,比如计算每个客户的总交易金额。

在分布式环境中,数据被分成多个分区,每个分区由一个工作节点独立处理。在理想情况下,数据会均匀分布在各个分区中


标签:网络安全
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。