您的位置:

Python Dask(第 2 部分)

在上一个教程中,我们已经理解了分布式计算的概念和 Dask 简介。除了 Dask 接口的介绍,我们还了解了什么是 Dask 集群以及如何安装 Dask。

桌面界面

正如我们已经讨论过的,Dask 接口有多种用于分布式计算的并行算法集。数据科学从业者很少使用基本的用户界面来扩展 NumPy、Pandas 和 scikit-learn:

  1. 阵列:并行 NumPy
  2. 数据帧:平行 Pandas
  3. 机器学习:并行 Scikit-Learn

我们已经在前面的教程中介绍了 Dask Array 让我们直接进入 Dask 数据帧。

DAX 数据帧

我们已经观察到,为了形成 Dask 阵列,需要对多个 NumPy 阵列进行分组。同样,一个 Dask 数据帧包含许多较小的 Pandas 数据帧。Pandas 的大数据帧按行分开,以便形成多个较小的数据帧。这些较小的数据帧在单个系统或多个系统上可用(因此,允许我们存储比内存更大的数据集)。Dask 数据帧的每一次计算都会并行化 Pandas 数据帧上的功能。

下图显示了 Dask 数据帧结构:

Dask 数据帧还提供了与 Pandas 数据帧完全相同的 API。

现在,让我们考虑一些用 Dask 数据帧执行基本功能的例子。

例 1:读取 CSV 文件

借助 Pandas读取文件


# reading the file using pandas
import pandas as pd

my_pdfile = pd.read_csv("covid_19_india.csv")
print(my_pdfile)

输出:

       Sno      Date     Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational   Cured  Deaths  Confirmed
0        1  30/01/20  6:00 PM               Kerala                       1                        0       0       0          1
1        2  31/01/20  6:00 PM               Kerala                       1                        0       0       0          1
2        3  01/02/20  6:00 PM               Kerala                       2                        0       0       0          2
3        4  02/02/20  6:00 PM               Kerala                       3                        0       0       0          3
4        5  03/02/20  6:00 PM               Kerala                       3                        0       0       0          3
...    ...       ...      ...                  ...                     ...                      ...     ...     ...        ...
9286  9287  09/12/20  8:00 AM            Telengana                       -                        -  266120    1480     275261
9287  9288  09/12/20  8:00 AM              Tripura                       -                        -   32169     373      32945
9288  9289  09/12/20  8:00 AM          Uttarakhand                       -                        -   72435    1307      79141
9289  9290  09/12/20  8:00 AM        Uttar Pradesh                       -                        -  528832    7967     558173
9290  9291  09/12/20  8:00 AM          West Bengal                       -                        -  475425    8820     507995

[9291 rows x 9 columns]

借助 Pandas读取文件


# reading the file using dask
import dask.dataframe as ddf

my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.compute())    

输出:

Sno      Date     Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational   Cured  Deaths  Confirmed
0        1  30/01/20  6:00 PM               Kerala                       1                        0       0       0          1
1        2  31/01/20  6:00 PM               Kerala                       1                        0       0       0          1
2        3  01/02/20  6:00 PM               Kerala                       2                        0       0       0          2
3        4  02/02/20  6:00 PM               Kerala                       3                        0       0       0          3
4        5  03/02/20  6:00 PM               Kerala                       3                        0       0       0          3
...    ...       ...      ...                  ...                     ...                      ...     ...     ...        ...
9286  9287  09/12/20  8:00 AM            Telengana                       -                        -  266120    1480     275261
9287  9288  09/12/20  8:00 AM              Tripura                       -                        -   32169     373      32945
9288  9289  09/12/20  8:00 AM          Uttarakhand                       -                        -   72435    1307      79141
9289  9290  09/12/20  8:00 AM        Uttar Pradesh                       -                        -  528832    7967     558173
9290  9291  09/12/20  8:00 AM          West Bengal                       -                        -  475425    8820     507995

[9291 rows x 9 columns]

说明:

在上面的例子中,我们创建了两个不同的程序。在第一个程序中,我们导入了Pandas库,使用 read_csv() 函数读取 csv 文件。相比之下,我们导入了 dask 库的数据帧模块,使用 read_csv() 函数读取 csv 文件。

两个程序的结果相同,但处理时间不同。与 Pandas 相比,Dask 数据帧执行功能的速度更快。一旦实际使用,同样的效果会很明显。

示例 2:查找特定列的值计数


import dask.dataframe as ddf

my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.State.value_counts().compute())

输出:

Kerala                                      315
Delhi                                       283
Rajasthan                                   282
Haryana                                     281
Uttar Pradesh                               281
Tamil Nadu                                  278
Ladakh                                      278
Jammu and Kashmir                           276
Karnataka                                   276
Punjab                                      275
Maharashtra                                 275
Andhra Pradesh                              273
Uttarakhand                                 270
Odisha                                      269
West Bengal                                 267
Puducherry                                  267
Chhattisgarh                                266
Gujarat                                     265
Chandigarh                                  265
Madhya Pradesh                              264
Himachal Pradesh                            264
Bihar                                       263
Manipur                                     261
Mizoram                                     260
Andaman and Nicobar Islands                 259
Goa                                         259
Assam                                       253
Jharkhand                                   253
Arunachal Pradesh                           251
Tripura                                     247
Meghalaya                                   240
Telengana                                   236
Nagaland                                    207
Sikkim                                      200
Dadra and Nagar Haveli and Daman and Diu    181
Cases being reassigned to states             60
Telangana                                    45
Dadar Nagar Haveli                           37
Unassigned                                    3
Telangana***                                  1
Maharashtra***                                1
Telengana***                                  1
Chandigarh***                                 1
Daman & Diu                                   1
Punjab***                                     1
Name: State, dtype: int64

说明:

在上例中,我们导入了 dask 库的数据帧模块,并使用 read_csv() 函数从 csv 文件中读取内容。然后,我们使用列名“States”后跟 value_counts() 方法来计算该特定列中每个值的总数。因此,我们得到了该列中出现的所有州名及其出现的总数。

示例 3:在 Dask 数据帧上使用 groupby 函数


import dask.dataframe as ddf

my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.groupby(my_ddfile.State).Cured.max().compute())

输出:

State
Andaman and Nicobar Islands                    4647
Andhra Pradesh                               860368
Arunachal Pradesh                             15690
Assam                                        209447
Bihar                                        232563
Cases being reassigned to states                  0
Chandigarh                                    16981
Chandigarh***                                 14381
Chhattisgarh                                 227158
Dadar Nagar Haveli                                2
Dadra and Nagar Haveli and Daman and Diu       3330
Daman & Diu                                       0
Delhi                                        565039
Goa                                           46924
Gujarat                                      203111
Haryana                                      232108
Himachal Pradesh                              37871
Jammu and Kashmir                            107282
Jharkhand                                    107898
Karnataka                                    858370
Kerala                                       582351
Ladakh                                         8056
Madhya Pradesh                               200664
Maharashtra                                 1737080
Maharashtra***                              1581373
Manipur                                       23166
Meghalaya                                     11686
Mizoram                                        3772
Nagaland                                      10781
Odisha                                       316970
Puducherry                                    36308
Punjab                                       145093
Punjab***                                    130406
Rajasthan                                    260773
Sikkim                                         4735
Tamil Nadu                                   770378
Telangana                                     41332
Telangana***                                  40334
Telengana                                    266120
Telengana***                                  42909
Tripura                                       32169
Unassigned                                        0
Uttar Pradesh                                528832
Uttarakhand                                   72435
West Bengal                                  475425
Name: Cured, dtype: int64

说明:

在上面的程序中,我们再次导入了 dask 库的数据帧模块,并使用了 read_csv 来读取指定的 csv 文件。然后,我们使用 dask 数据帧的 groupby 功能和 max() 功能来查找每个状态下的最大治愈人数。

现在,让我们了解另一个 Dask 接口,那就是 Dask 机器学习。

Dask 机器学习

Dask 机器学习提供了可扩展的 Python 机器学习算法,与 scikit-learn 兼容。让我们从理解使用 scikit-learn 处理计算的方式开始,然后仔细研究 Dask 如何以不同的方式执行这些功能。

用户可以通过放置参数 njobs = -1,在 scikit-learn 的帮助下(在单独的系统上)执行并行计算。Scikit-learn 利用 Joblib 来执行这些并行计算。 Joblib 是一个支持并行化的 Python 库。当我们调用 fit() 函数时,基于要执行的任务(无论是超参数搜索还是拟合模型),Joblib 将任务分布在可用的内核中。

然而,我们可以在 scikit-learn 库的帮助下将并行计算扩展到多台机器。然而,Dask 在一个单独的系统上表现良好,并且可以很容易地扩展到一个集群系统。

Dask 提供了一个中央任务调度器和一组工作人员。调度程序将任务分配给每个工作人员。然后,这些工作人员被分配了一些可以执行计算的内核。工人提供两种功能:

  1. 计算分配的任务
  2. 根据要求将结果提供给其他员工。

让我们考虑一个演示调度程序和工作人员之间对话方式的例子(这个例子是由 Dask 的一个开发人员提供的,即 Matthew Rocklin ):

中央任务调度器将工作以 python 函数的形式发送给工作人员,以便在同一系统或集群系统上执行。

  1. 工人 A 请计算 x = f(1),工人 B 请计算 y = g(2)
  2. 工人 A,一旦 g(2)功能完成,请从工人 B 获取 y,并执行 z = h(x,y)

上面的例子应该给我们提供了一个关于 Dask 工作的清晰演示。现在让我们了解机器学习和 Dask-search CV 的模型。

机器学习模型

Dask 机器学习(也称为 Dask-ML)提供了 Python 中可扩展的机器学习。但是在开始之前,让我们遵循下面给出的 Dask-ML 安装步骤:

安装使用 conda


conda install -c conda-forge dask-ml

使用管道安装


$ pip install dask-ml

让我们继续了解并行化 Scikit-直接学习并使用 Dask 数组重新实现算法。

1。并行化 Scikit-直接学习

正如我们已经讨论过的,Scikit-Learn(也称为 sklearn )在 Joblib 的帮助下提供并行计算(在单个 CPU 上)。我们可以直接利用 Dask,通过插入几行代码(甚至不需要对当前代码进行任何修改)来并行化一个以上的 sklearn 估计器。

主要步骤是从 dask 库的分布式模块导入客户端。此命令将在系统上生成本地计划和工作人员。


from dask.distributed import Client 
# starting a local Dask client
my_client = Client()

下一步是在后端实例化 dask 的 joblib。我们必须从 sklearn 库的 joblib 导入parallel _ 后端,如下语法所示:


import dask_ml.joblib
from sklearn.externals.joblib import parallel_backend
with parallel_backend('dask'):
    # Normal scikit-learn code goes here
     from sklearn.ensemble import RandomForestClassifier 
     my_model = RandomForestClassifier()

2。使用 Dask 阵列重新实现算法

为了使用 NumPy 数组,Dask-ML 重新实现了简单的机器学习算法。为了实现可扩展算法,使用 Dask 阵列将 NumPy 阵列替换为 Dask 阵列。这种替换有助于实现:

  1. 线性模型(例如,线性回归、泊松回归、逻辑回归等。)
  2. 预处理(例如,缩放器、变换等。)
  3. 聚类(例如,K 均值、谱聚类等。)

A .线性模型示例


from dask_ml.linear_model import LogisticRegression

mymodel = LogisticRegression()
mymodel.fit(data, labels)

B .预处理示例


from dask_ml.preprocessing import OneHotEncoder

myencoder = OneHotEncoder(sparse=True)
myresult = myencoder.fit(data)

C .聚类示例


from dask_ml.cluster import KMeans

mymodel = KMeans()
mymodel.fit(data)

搜索简历

【超参数调整】 被认为是建立模型的重要一步,可以极大地改变模型的实现。机器学习模型有各种各样的超参数,很难理解哪个参数在特定情况下表现更好。手动执行这项任务是相当令人厌烦的工作。然而,Scikit-Learn 库提供了网格搜索,以简化超参数调整的任务。用户必须提供参数, Gridsearch 将提供这些参数的最佳组合。

让我们考虑一个例子,在这个例子中,我们需要选择一种随机森林技术来适应数据集。该模型有三个重要的可调参数——第一参数、第二参数和第三参数。

现在,让我们在下面设置这些参数的值:

第一个参数-自举=真

第二参数-最大深度- [8,9]

第三参数-n _ 估计量:[50,100,200]

1。sklearn Gridsearch: 对于每一个参数组合,Scikit-learn Gridsearch 都会执行任务,有时最终会多次迭代单个任务。下图表明,这并不是最有效的方法:

2。Dask-Search CV: 与 sklearn 的grid Search CV不同,Dask 提供了一个名为 Dask-Search CV 的库。为了减少重复,Dask-Search CV 合并了这些步骤。我们可以通过以下步骤安装 Dask-search :

使用 conda 安装 Dask-搜索 CV


conda install dask-searchcv -c conda-forge

使用 pip 安装 Dask-搜索 CV


$ pip install dask-searchcv

下图展示了 Dask-Search CV 的工作原理:

Spark 和 Dask 的区别

以下是 Spark 和 Dask 的一个关键区别:

| 南号码 | 火花 | 达斯克 | | one | Spark 是用 Scala 编程语言编写的。 | Dask 是用 Python 编程语言编写的。 | | Two | Spark 提供对 R 和 Python 的支持。 | Dask 只支持 Python。 | | three | Spark 提供了自己的生态系统。 | Dask 是 Python 生态系统的组成部分之一。 | | four | Spark 提供了自己的 API。 | Dask 重新利用 Pandas 的 API | | five | 对于 Scala 和 SQL 用户来说,Spark 很容易理解和实现。 | Python 从业者通常更喜欢 Dask。 | | six | Spark 本身不支持多维数组。 | Dask 为可扩展多维阵列的 NumPy 模型提供全面支持。 |