Python从门到精通(七):网络-03-创建Rpc服务接口
  TEZNKK3IfmPf 2024年03月29日 13 0

本章会用两种方式来实现,原生和grpc框架来实现。

1.1、Server

from multiprocessing.connection import Listener
from threading import Thread
from remote_call import RPCHandler

def rpc_server(handler, address, authkey):
sock = Listener(address, authkey=authkey)
while True:
client = sock.accept()
t = Thread(target=handler.handle_connection, args=(client,))
t.daemon = True
t.start()

# Some remote functions
def add(x, y):
return x + y

def sub(x, y):
return x - y

# Register with a handler
handler = RPCHandler()
handler.register_function(add)
handler.register_function(sub)

# Run the server
rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')

1.2、Handler

class RPCHandler:
def __init__(self):
self._functions = { }

def register_function(self, func):
self._functions[func.__name__] = func

def handle_connection(self, connection):
try:
while True:
# Receive a message
func_name, args, kwargs = pickle.loads(connection.recv())
# Run the RPC and send a response
try:
r = self._functions[func_name](*args,**kwargs)
connection.send(pickle.dumps(r))
except Exception as e:
connection.send(pickle.dumps(e))
except EOFError:
pass

1.3、Proxy

import pickle

class RPCProxy:
def __init__(self, connection):
self._connection = connection

def __getattr__(self, name):
def do_rpc(*args, **kwargs):
self._connection.send(pickle.dumps((name, args, kwargs)))
result = pickle.loads(self._connection.recv())
if isinstance(result, Exception):
raise result
return result
return do_rpc

1.4、Client

from multiprocessing.connection import Client
from chapter11.rpc_proxy import RPCProxy

c = Client(('localhost', 17000), authkey=b'peekaboo')
proxy = RPCProxy(c)
print(f'add(3, 5) = {proxy.add(3, 5)}')
print(f'sub(5, 12) = {proxy.sub(5, 12)}')
proxy.sub([1, 2], 4)

1.5、JSON实现

1.5.1、Server

import json

class RPCHandler:
def __init__(self):
self._functions = { }

def register_function(self, func):
self._functions[func.__name__] = func

def handle_connection(self, connection):
try:
while True:
# Receive a message
func_name, args, kwargs = json.loads(connection.recv())
# Run the RPC and send a response
try:
r = self._functions[func_name](*args,**kwargs)
connection.send(json.dumps(r))
except Exception as e:
connection.send(json.dumps(str(e)))
except EOFError:
pass

1.5.2、Client

import json

class RPCProxy:
def __init__(self, connection):
self._connection = connection

def __getattr__(self, name):
def do_rpc(*args, **kwargs):
self._connection.send(json.dumps((name, args, kwargs)))
result = json.loads(self._connection.recv())
return result
return do_rpc

1.6、xml

from xmlrpc.server import SimpleXMLRPCServer

class KeyValueServer:
_rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
def __init__(self, address):
self._data = {}
self._serv = SimpleXMLRPCServer(address, allow_none=True)
for name in self._rpc_methods_:
self._serv.register_function(getattr(self, name))

def get(self, name):
return self._data[name]

def set(self, name, value):
self._data[name] = value

def delete(self, name):
del self._data[name]

def exists(self, name):
return name in self._data

def keys(self):
return list(self._data)

def serve_forever(self):
self._serv.serve_forever()

if __name__ == '__main__':
kvserv = KeyValueServer(('', 15000))
kvserv.serve_forever()
from xmlrpc.client import ServerProxy
s = ServerProxy('https://localhost:15000', allow_none=True)
s.set('foo','bar')
s.set('spam', [1, 2, 3])
s.keys()
s.get('foo')
s.get('spam')
s.delete('spam')
s.exists('spam')

二、Grpc框架

2.1、安装

sudo python3 -m pip install grpcio
sudo python3 -m pip install grpcio-tools
python -m grpc_tools.protoc -I../../protos --python_out=. --grpc_python_out=. ../../protos/product.proto

2.2、Server

from concurrent import futures
import logging
import uuid
import grpc
import time

import product_info_pb2
import product_info_pb2_grpc

class ProductInfoServicer(product_info_pb2_grpc.ProductInfoServicer):

def __init__(self):
self.productMap = {}

def addProduct(self, request, context):
id = uuid.uuid1()
request.id = str(id)
print("addProduct:request", request)
self.productMap[str(id)] = request
response = product_info_pb2.ProductID(value = str(id))

print("addProduct:response", response)
return response

def getProduct(self, request, context):
print("getProduct:request", request)
id = request.value
response = self.productMap[str(id)]
print("getProduct:response", response)
return response

# create a gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

# use the generated function `add_CalculatorServicer_to_server`
# to add the defined class to the server
product_info_pb2_grpc.add_ProductInfoServicer_to_server(
ProductInfoServicer(), server)

# listen on port 50051
print('Starting server. Listening on port 50051.')
server.add_insecure_port('[::]:50051')
server.start()

# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)

2.3、Client

import grpc
import product_info_pb2
import product_info_pb2_grpc
import time;

def run():
# open a gRPC channel
channel = grpc.insecure_channel('localhost:50051')
# create a stub (client)
stub = product_info_pb2_grpc.ProductInfoStub(channel)

response = stub.addProduct(product_info_pb2.Product(name = "Apple iPhone 11", description = "Meet Apple iPhone 11. All-new dual-camera system with Ultra Wide and Night mode.", price = 699.0 ))
print("add product: response", response)
productInfo = stub.getProduct(product_info_pb2.ProductID(value = response.value))
print("get product: response", productInfo)

run()
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2024年03月29日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   24天前   21   0   0 json
TEZNKK3IfmPf