{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "31782ed7269c0351",
   "metadata": {},
   "source": [
    "# 🫁 ThreadPoolExecutor线程池\n",
    "\n",
    "标准库`concurrent.futures`提供了`ThreadPoolExecutor`（线程池）和`ProcessPoolExecutor`（进程池）两个类"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "5ab193bcb6b6059f",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2023-11-18T03:13:04.722965400Z",
     "start_time": "2023-11-18T03:13:04.705962Z"
    }
   },
   "outputs": [],
   "source": [
    "from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor\n",
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "2de747c5",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2023-11-18T03:13:07.198205700Z",
     "start_time": "2023-11-18T03:13:07.171195700Z"
    }
   },
   "outputs": [],
   "source": [
    "def run(delay):\n",
    "    time.sleep(delay)\n",
    "    print(f'Delay Task {delay} Finished')\n",
    "    return delay"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f0deb32e6298c705",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "## 壹丨基本使用"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "e13b3b33e05cf64c",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2023-11-18T03:13:12.666150800Z",
     "start_time": "2023-11-18T03:13:09.656136500Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Task 1 Finished: False\n",
      "Task 2 Finished: False\n",
      "Task 3 Finished: False\n",
      "Delay Task 1 Finished\n",
      "Delay Task 2 Finished\n",
      "Task 1 Finished: True\n",
      "Task 2 Finished: True\n",
      "Task 3 Finished: False\n",
      "Task 1 Result:  1\n",
      "Delay Task 3 Finished\n"
     ]
    }
   ],
   "source": [
    "# 创建一个最大容量为5的线程池\n",
    "with ThreadPoolExecutor(max_workers=5) as t:\n",
    "    task1 = t.submit(run, 1)\n",
    "    task2 = t.submit(run, 2)\n",
    "    task3 = t.submit(run, 3)\n",
    "\n",
    "    # 通过done来判断线程是否完成\n",
    "    print(f\"Task 1 Finished: {task1.done()}\")\n",
    "    print(f\"Task 2 Finished: {task2.done()}\")\n",
    "    print(f\"Task 3 Finished: {task3.done()}\")\n",
    "\n",
    "    time.sleep(2.5)\n",
    "    print(f\"Task 1 Finished: {task1.done()}\")\n",
    "    print(f\"Task 2 Finished: {task2.done()}\")\n",
    "    print(f\"Task 3 Finished: {task3.done()}\")\n",
    "\n",
    "    # 通过result来获取返回值\n",
    "    print('Task 1 Result: ', task1.result())\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "74f7f7354ffe28bc",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "1. 使用`submit()`提交线程任务，并立即返回任务句柄。\n",
    "2. 使用`done()`方法判断是否结束\n",
    "3. 使用`result()`方法获取线程返回值"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5f0c603e29502351",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "## 贰丨主要方法\n",
    "\n",
    "### 1. `wait()`方法\n",
    "\n",
    "```python\n",
    "wait(fs, timeout=None, return_when=ALL_COMPLETED)\n",
    "```\n",
    "`fs`：要执行的序列\n",
    "\n",
    "`timeout`：等待最大时间，如果超过时间未执行完成则直接返回\n",
    "\n",
    "`return_when`：`wait()`返回结果的条件，默认为`ALL_COMPLETED`（全部执行完成后返回）"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "8e7339db58e1cac3",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2023-11-18T03:13:20.483141500Z",
     "start_time": "2023-11-18T03:13:16.472884600Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Delay Task 1 Finished\n",
      "Continue Other Tasks\n",
      "Delay Task 2 Finished\n",
      "Delay Task 3 Finished\n",
      "Continue Other Tasks\n",
      "Delay Task 4 Finished\n"
     ]
    }
   ],
   "source": [
    "from concurrent.futures import wait, FIRST_COMPLETED\n",
    "\n",
    "# 创建一个最大容量为5的线程池\n",
    "with ThreadPoolExecutor(max_workers=5) as t:\n",
    "    all_task = [t.submit(run, delay) for delay in range(1, 5)]\n",
    "    wait(all_task, return_when=FIRST_COMPLETED)\n",
    "    print('Continue Other Tasks')\n",
    "\n",
    "    wait(all_task, timeout=2.5)\n",
    "    print('Continue Other Tasks')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "29282015fed2c334",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "1. 执行完第一个任务后，停止等待，继续主线任务\n",
    "2. 在第一个任务完成后再进行2.5s延时，即总时间3.5s时停止等待，继续主线任务，此时只有任务4还在进行"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "203fb3db425eeaac",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "### 2. `as_completed()`方法\n",
    "\n",
    "当任务结束时，给主线程返回结果，使用`result()`方法获取结果"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "a2b757bff2a2ae08",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2023-11-18T03:13:26.813506200Z",
     "start_time": "2023-11-18T03:13:22.802587600Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Delay Task 1 Finished\n",
      "Task <Future at 0x1d16d055fd0 state=finished returned int> Result: 1\n",
      "Delay Task 2 Finished\n",
      "Task <Future at 0x1d16e5e4f50 state=finished returned int> Result: 2\n",
      "Delay Task 3 Finished\n",
      "Task <Future at 0x1d16e9cd510 state=finished returned int> Result: 3\n",
      "Delay Task 4 Finished\n",
      "Task <Future at 0x1d16ea50390 state=finished returned int> Result: 4\n"
     ]
    }
   ],
   "source": [
    "from concurrent.futures import as_completed\n",
    "\n",
    "# 创建一个最大容量为5的线程池\n",
    "with ThreadPoolExecutor(max_workers=5) as t:\n",
    "    obj_list = []\n",
    "    for delay in range(1, 5):\n",
    "        obj = t.submit(run, delay)\n",
    "        obj_list.append(obj)\n",
    "\n",
    "    for future in as_completed(obj_list):\n",
    "        res = future.result()\n",
    "        print(f'Task {future} Result: {res}')"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "951145b7ed9a032d",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "`as_completed()`方法是一个生成器，当没有完成任务时，会一直阻塞，除非设置`time_out`"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "af01985b87ef6952",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "### 3. `map()`方法\n",
    "\n",
    "```python\n",
    "map(fn, *iterables, timeout=None)\n",
    "```\n",
    "\n",
    "`fn`: 需要线程执行的函数\n",
    "\n",
    "`iterables`: 接受一个可迭代对象\n",
    "\n",
    "`timeout`: 和`wait()`的`timeout`一样，由于`map`返回线程执行结果，如果在`timeout`内没有执行完，会抛出异常`TimeoutError`\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "ba35e699af00d417",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2023-11-18T03:24:31.038400200Z",
     "start_time": "2023-11-18T03:24:27.489408200Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Delay Task 1 Finished\n",
      "Delay Task 2 Finished\n",
      "Delay Task 3 Finished\n"
     ]
    },
    {
     "ename": "TimeoutError",
     "evalue": "",
     "output_type": "error",
     "traceback": [
      "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[1;31mTimeoutError\u001b[0m                              Traceback (most recent call last)",
      "Cell \u001b[1;32mIn[10], line 3\u001b[0m\n\u001b[0;32m      1\u001b[0m executor \u001b[38;5;241m=\u001b[39m ThreadPoolExecutor(max_workers\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m5\u001b[39m)\n\u001b[1;32m----> 3\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m i, result \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28menumerate\u001b[39m(executor\u001b[38;5;241m.\u001b[39mmap(run, [\u001b[38;5;241m4\u001b[39m, \u001b[38;5;241m2\u001b[39m, \u001b[38;5;241m3\u001b[39m, \u001b[38;5;241m1\u001b[39m], timeout\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m3.5\u001b[39m)):\n\u001b[0;32m      4\u001b[0m     \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mTask \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mi\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m Result: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mresult\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\n",
      "File \u001b[1;32m~\\anaconda3\\envs\\MasterMaoPy311\\Lib\\concurrent\\futures\\_base.py:621\u001b[0m, in \u001b[0;36mExecutor.map.<locals>.result_iterator\u001b[1;34m()\u001b[0m\n\u001b[0;32m    619\u001b[0m             \u001b[38;5;28;01myield\u001b[39;00m _result_or_cancel(fs\u001b[38;5;241m.\u001b[39mpop())\n\u001b[0;32m    620\u001b[0m         \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m--> 621\u001b[0m             \u001b[38;5;28;01myield\u001b[39;00m _result_or_cancel(fs\u001b[38;5;241m.\u001b[39mpop(), end_time \u001b[38;5;241m-\u001b[39m time\u001b[38;5;241m.\u001b[39mmonotonic())\n\u001b[0;32m    622\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[0;32m    623\u001b[0m     \u001b[38;5;28;01mfor\u001b[39;00m future \u001b[38;5;129;01min\u001b[39;00m fs:\n",
      "File \u001b[1;32m~\\anaconda3\\envs\\MasterMaoPy311\\Lib\\concurrent\\futures\\_base.py:317\u001b[0m, in \u001b[0;36m_result_or_cancel\u001b[1;34m(***failed resolving arguments***)\u001b[0m\n\u001b[0;32m    315\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m    316\u001b[0m     \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m--> 317\u001b[0m         \u001b[38;5;28;01mreturn\u001b[39;00m fut\u001b[38;5;241m.\u001b[39mresult(timeout)\n\u001b[0;32m    318\u001b[0m     \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[0;32m    319\u001b[0m         fut\u001b[38;5;241m.\u001b[39mcancel()\n",
      "File \u001b[1;32m~\\anaconda3\\envs\\MasterMaoPy311\\Lib\\concurrent\\futures\\_base.py:458\u001b[0m, in \u001b[0;36mFuture.result\u001b[1;34m(self, timeout)\u001b[0m\n\u001b[0;32m    456\u001b[0m             \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m__get_result()\n\u001b[0;32m    457\u001b[0m         \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m--> 458\u001b[0m             \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mTimeoutError\u001b[39;00m()\n\u001b[0;32m    459\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[0;32m    460\u001b[0m     \u001b[38;5;66;03m# Break a reference cycle with the exception in self._exception\u001b[39;00m\n\u001b[0;32m    461\u001b[0m     \u001b[38;5;28mself\u001b[39m \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n",
      "\u001b[1;31mTimeoutError\u001b[0m: "
     ]
    }
   ],
   "source": [
    "executor = ThreadPoolExecutor(max_workers=5)\n",
    "\n",
    "for i, result in enumerate(executor.map(run, [4, 2, 3, 1], timeout=3.5)):\n",
    "    print(f\"Task {i} Result: {result}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "beaf3d68920bbaec",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "使用列表循环添加线程，在设置`timeout`期限内未完成时会报`TimeoutError`，中断整个程序"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "ecd8c459082eb6da",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2023-11-18T03:25:41.349741200Z",
     "start_time": "2023-11-18T03:25:37.323653900Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Delay Task 1 Finished\n",
      "Delay Task 2 Finished\n",
      "Delay Task 3 Finished\n",
      "Delay Task 4 Finished\n",
      "Task 0 Result: 4\n",
      "Task 1 Result: 2\n",
      "Task 2 Result: 3\n",
      "Task 3 Result: 1\n"
     ]
    }
   ],
   "source": [
    "executor = ThreadPoolExecutor(max_workers=5)\n",
    "\n",
    "for i, result in enumerate(executor.map(run, [4, 2, 3, 1], timeout=5)):\n",
    "    print(f\"Task {i} Result: {result}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d185cbcea6bb4ee",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "对比在`timeout`内完成任务，可以正常执行后续操作\n",
    "\n",
    "* 使用`map()`方法，无需再使用`submit()`方法\n",
    "* 与`as_complete()`方法不同，如果任务的完成顺序与提交顺序不同，`map()`方法会等待所有任务完成，并按照提交的顺序返回结果\n",
    "* `map()`方法返回结果是有序的，但是任务执行的顺序不一定有序"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d60acabf4d1450df",
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "source": [
    "## 参考\n",
    "\n",
    "[1] 稀土掘金，@全村de希望，[Python线程池 ThreadPoolExecutor 的用法及实战](https://juejin.cn/post/6844903861245706253)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
