protobuf

protobuf是一种序列化数据的方法——它能够在不同平台之间、不同语言编写的程序之间进行数据的通信。 程序也可以很方便地对protobuf数据进行读取、修改和扩展。可以看成是一种效率更高、面向通信和存储的XML数据格式。下面展示了一个message以及一个grpc service的定义:

    message ack {
        optional int32 ack = 1;
    }
    
    // server of the pub/sub system that receive, store and distribute messages.
    service topic {
        // publisher publishes a message to the topic
        rpc publish (pubMessage) returns (ack);
        // subscriber subscribes messages from the topic
        rpc subscribe (subscriberInfo) returns (messageList);
        // subscriber responds to the topic
        rpc sub_ack (ack) returns (empty);
    }

protobuf提供了接口编译器protoc,对于一个定义好了的.proto文件和目标语言,protoc将会生成对应语言的类和方法,提供编程者读写数据的接口。protobuf的定义方式比较类似C语言中的结构体,并且使用关键字段repeated来实现类似C++中的变长数组。在使用方式上,protobuf就像是(嵌套的)结构体,通过成员运算符’.‘来访问各级变量;对于变长数组,在Python上protobuf提供了类似列表list的访问和操作方式。

grpc based on protobuf

grpc基于protobuf实现。grpc使用protobuf来表达远程过程调用中的参数和返回值。调用protoc编译器时,加入参数就可以增加对grpc的支持:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. filename.proto

会生成protobuf对应的文件filename_pb2.py以及对应grpc两端的文件filename_pb2_grpc.py。用户将要实现一个用于执行服务的类,它继承grpc文件中定义的父类,并且给出rpc函数在服务器端的具体实现。在客户端上,使用grpc前需要生成一个stub,并通过这个stub对远程的rpc函数进行调用。

What is Pub/Sub Messaging?

Google Cloud网站提供了对Pub/Sub服务模式简洁有效的介绍[1]。简单来说,Pub/Sub模式由下面这些组件构成:

其中发布者和订阅者的关系可以是多对多的。Pub/Sub模式的工作流程如图所示,订阅者可以从主题拉取消息,也可以等待主题向自己推送消息。订阅者需要对已经接收了的消息返回一个确认。

Pub/Sub

Implementation Details

Locks

显然,Topic中很可能会同时发生订阅者对消息的拉取和发布者对消息的发布。因此使用互斥锁来保护消息存储的正确性。例如消息的发布如下所示:

    # ensure mutual exclusion
    self.dirLock.acquire()
    
    if request.category in self.messageDir:
        self.dirLock.release()
        return tp.ack(ack=0)
    self.messageDir[request.category] = [request.timeoutLimit, request.content]

    self.dirLock.release()

Multithreading

作业要求实现对消息存储时间的控制,使用Python的threading多线程实现——在初始化的时候spawn一个线程,周期性地扫描存储在Topic的消息,消息过期的时候就将它们删除。

    rmList = []
    for i in messageDir.keys():
        messageDir[i][0] -= 1
        if messageDir[i][0] == 0:
            rmList.append(i)
    # remove them fron the dictionary
    for i in rmList:
        del messageDir[i]