在Python中并行调用函数

使用python3.6.9,我想实现以下目标。

有一个客户机ips的列表,我想ping每个客户机以达到监控的目的。 使用for-loop导致的问题是,它需要相当长的时间才能完成,特别是如果一些ping需要等待超时。

我怎么做呢?目前看起来是这样的。

for i in vpnClients:
    response_list=pythonping.ping(vpnClients[i]['virtualAddress'], count=2)
    singleStat={ "measurement": "ping", "tags": { "virtual ip": vpnClients[i]['virtualAddress']}, "time": datetime.datetime.utcnow(), "fields": { "min": float(response_list.rtt_min_ms), "max": float(response_list.rtt_max_ms), "avg": float(response_list.rtt_avg_ms) }}
    json_body.append(singleStat)

如果我不想执行pythonping,而是想对一个函数进行异步调用(我可以用这样的方式来包装:

 def doPing(ip):
    response_list=pythonping.ping(vpnClients[ip]['virtualAddress'], count=2)
    singleStat={ "measurement": "ping", "tags": { "virtual ip": vpnClients[ip]['virtualAddress']}, "time": datetime.datetime.utcnow(), "fields": { "min": float(response_list.rtt_min_ms), "max": float(response_list.rtt_max_ms), "avg": float(response_list.rtt_avg_ms) }}
    json_body.append(singleStat)


 for i in vpnClients:
    call_function(doPing(i))

这可能吗?

谢谢,安德烈

解决方案:

这个问题并不像看起来那么简单。理想情况下,pythonping包确实提供了一些多目标的API,但它没有(2020年4月)。不幸的是,pythonping包也不是线程安全的,因此你不能采用线程池的方式(也不能像建议的那样使用async,因为pythonping不支持async)。如果你查看 本源,你可以看到,它只是在检查ICMP数据包,而不能按目标区分。

while time_left > 0:
        # Keep listening until a packet arrives
        raw_packet, source_socket, time_left = self.socket.receive(time_left)
        # If we actually received something
        if raw_packet != b'':
            response.unpack(raw_packet)
            # Ensure we have not unpacked the packet we sent (RHEL will also listen to outgoing packets)
            if response.id == packet_id and response.message_type !=  icmp.Types.EchoRequest.type_id:
return Response(Message('', response, source_socket[0]), timeout-time_left)

如果我们发出10个数据包,第一个回复会确认所有目标。不爽。

剩下的,就是进程池的方式了。如果我们真的要使用async api这样的东西。

import datetime
import asyncio
import concurrent.futures
import pythonping

# python 3.6 we had had to do this:
loop = asyncio.get_event_loop()

vpnClients = [
        {'virtualAddress': '192.168.111.113'},
        {'virtualAddress': '192.168.111.1'},
        {'virtualAddress': '192.168.111.100'},
        ]


def myping(client):
    target = client['virtualAddress']
    res = pythonping.ping(target, count=2, timeout=2)
    return { "measurement": "ping", "tags": { "virtual ip": target}, "time": datetime.datetime.utcnow(), "fields": { "min": float(res.rtt_min_ms), "max": float(res.rtt_max_ms), "avg": float(res.rtt_avg_ms) }}


async def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
        executors = [loop.run_in_executor(pool, myping, c) for c in vpnClients]
        result = await asyncio.gather(*executors)

    print(result)


# You could do this if you would have python 3.7
# asyncio.run(main())
# but for 3.6, we must:
loop.run_until_complete(main())

我们必须使用自定义的 myping 功能,因为进程池执行器功能 必须 返回picklable对象,因此pythonping的ResponseList无法使用。你使用的 dict 就很好(它是可拾取的)。

当然,你可以简单地使用多处理API。

但是如果你的网络稳定且速度快(virtualhosts?),将超时时间降低到较小的程度可能会更好。

 pythonping.ping(target, count=2, timeout=0.1)

给TA打赏
共{{data.count}}人
人已打赏
未分类

LINK:致命错误 LNK1104:无法打开文件'OLDNAMES.lib'。

2022-9-9 4:57:18

未分类

Webpack。如何在客户端(浏览器)注入process.env运行时,使构建独立于环境。

2022-9-9 4:57:20

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索