ank - python Microservice: Giới thiệu và hướng dẫn

ank Microservice framework

Source: https://github.com/sunary/ank

Giới thiệu

Thực sự thì mình khá thích ý tưởng chia bài toán ra các processor nhỏ hơn, và có thể hoạt động độc lập mà không ảnh hưởng/phụ thuộc vào môi trường. Các processor này sẽ được ghép nối với nhau dựa trên yêu cầu của bài toán, và có thể dễ dàng thay thế chỉnh sửa từng proccesor mà không ảnh hường đến các processor khác.

ank ra đời là vì vậy, ank giúp các bạn viết các app độc lập và hổ trợ lắp ghép với nhau (pipeline và braching) mà không làm mất đi sự trong sáng của chương trình.

Xin nói qua 1 chút ưu/nhược điểm để các bạn có thể hình dung:

Ưu nhược điểm

  • Ưu điểm
    • Python: Dành cho những ai viết và yêu thích python
    • Nhẹ: Tất nhiên
    • Trong sáng: Tất cả App đều là của bạn, ank chỉ hổ trợ phần kết nối với nhau và với queue, scheduler và API-interface
    • Docker
    • Luôn luôn lắng nghe :D
  • Nhược điểm
    • Chưa tập trung vào bài toán tối ưu
    • Ít tài liệu: Ngoại trừ blog này và docs trong github thì mình chưa publish gì cả, hy vọng sẽ có tài liệu chính thức trên readthedoc.
    • Và dev còn thiếu kinh nghiệm cover các case còn lại.

Cấu trúc

Các khái niệm:

Trước khi đi vào chi tiết, mình đưa ra 1 số định nghĩa mình dùng trong bài viết và chương trình:

  • App: Là 1 tác vụ nhỏ nhất có thể hoạt độngtheo định nghĩa higher-order function, có thể là 1 hàm để log message, post message lên queue hay cũng là 1 hàm để control branching. Nôm na là các hàm thành phần mà các bạn tạo ra tùy theo style code của 1 Worker. ank có các App hổ trợ trong việc quản lý luồng:
    • LogApp: Đơn giản là log lại mỗi message qua nó
    • Consumer: nhận message từ queue (support: rabbitMQ, zmq và kafka)
    • Producer: post message lên queue
    • JoinApp: Ví dụ mỗi lần bạn xử lý mỗi message, và sẽ tối ưu hơn nếu App phía trước xử lý 100 message 1 lần thay vì 100 lần xử lý, JoinProcessor sẽ đứng ở phía trước và gom đủ số message cần thiết rồi mới đi xử lý
    • SplitApp: Tất nhiên, bạn đã gom lại thì phải split ra chứ.
  • Worker: Là tập hợp các App để 1 Worker có thể ghép nối với các Worker khác thông qua queue message, tức là nó sẽ có các App input/output và các App xử lý, có thể 1 hoặc nhiều và việc của bạn là khai báo chúng vào 1 chain.
  • Chain: Là chất kết dính các App lại với nhau, và kiêm luôn quản lý luồng. Tức là sao? Chain process sẽ quản lý luôn việc chọn hướng đi và số lượng message qua mỗi App-process, còn việc quản lý luồng của Worker thì bạn phải cấu hình bên queue message
  • Branching: Là cấu trúc rẻ nhánh trong mỗi App, để thực hiện việc này thì bạn phải khai báo trong ChainApp-braching kiểu trả về là (message, list flags của App tiếp theo sẽ đi)
  • Message: Là message, là thông tin mỗi App nhận vào để xử lý và xuất ra sau khi xử lý xong. Định dạng {'content': 'Nội dung message', 'flags': 'là list boolean, chỉ dùng trong model 1 - n, tức sẽ xử lý processor kế tiếp nếu index củaflagsis True'}

Framework hổ trợ 3 loại App phổ biến hiện nay đó là:

  • PipeApp:App cơ bản nhất trong Pipeline, bạn phải khai báo input và output cho nó. Thường được dùng để đọc message từ 1 queue và output là push lên 1 queue khác, và cái này đã được ank hổ trợ (rabbitMQ, zmq và kafka).
  • ScheduleApp: Được xem như là 1 crond-job, nó ko có input bởi khi đến thời gian bạn khai báo (theo format của contab) nó sẽ tự chạy 1 process được khai báo từ trước và bạn chỉ thiết lập output thôi.
  • APIApp: cung cấp interface REST-API giùm bạn, được chạy trên Flask. Những gì bạn cần làm là khai báo 1 hàm và ank sẽ tự route request của bạn đến đúng hàm của bạn đã khai báo dựa trên tên hàm.

Chain model hổ trợ:

  • 1 - 1: processor - processor
  • 1 - n: / processor processor - processor \ processor
  • n - 1: processor \ processor - processor # get message at the first processor only processor /
  • join message: message \ message - [message, message, message] message /
  • split message: / message [message, message, message] - message \ message

Config

Để viết 1 chương trình hòan chỉnh, ngoài các app class, bạn cần phải khai báo thêm 2 file services.ymlsettings.yml, format của 2 file này là yaml.

services.yml

Chưa thông tin về luồng của chương trình. Chương trình cần tham số gì, các tham số đó được phân bố như thế nào đều được tổ chức ở đây:

  • ví dụ
services:
    App:
        class: demo.App
        arguments: ['%app_var%', '$Ojbect$']
    Ojbect:
        class: demo.Object
        arguments: '%object_var%'
    NextApp:
        class: demo.NextApp
        arguments: ~
chains:
    - App
    - NextApp

Theo đó, chương trình sẽ chạy từ App với các tham số là giá trị app_var và instance của Object(object_var), rồi NextApp không có tham số kèm theo

settings.yml

Ở đây sẽ chứa thông tin các giá trị của các biến trên services.yml, cụ thể là:

parameters:
    - app_var: 15
    - object_var: 'text'

Demo đi

Nãy h toàn là chữ, nên mình xin demo bài toán đơn giản sử dụng hầu hết các App có sắn của ank

Bài toán:

Cho dãy các chữ số nguyên, nếu là số chẵn thì chia cho 2, nếu là số lẽ thì -1

khai báo services.yml trước:

services:
    - InitApp:
        class: processor.InitApp
            arguments: '%len_list%'
    - JoinApp:
        class: apps.join_app.JoinApp
        arguments: '%batch_size%'
    - LogApp:
        class: apps.log_app.LogApp
        arguments: ~
    - SplitApp:
        class: apps.split_app.SplitApp
        arguments: ~
    - BeforeBranching:
        class: processor.BeforeBranching
        arguments: ~
    - EvenNumber:
        class: processor.EvenNumber
        arguments: ~
    - OddNumber:
        class: processor.OddNumber
        arguments: ~
    - PrintResult:
        class: processor.PrintResult
        arguments: ~

chains:
    - InitApp
    - JoinApp
    - SplitApp
    - LogApp
    - BeforeBranching
    - [EvenNumber, OddNumber]
    - PrintResult

Nhìn vào chains của services.yml, bạn cũng đoán được chương trình sẽ chạy như thế nào rồi, bây h bắt đầu code nè, đặt tên là processor.py vì trong services.yml bạn lấy các class từ đây:

from base_apps.pipe_app import PipeApp


class InitApp(PipeApp):

    def init_app(self, len_list=None):
        self.len_list = len_list

    def start(self):
        # Your self.process become self.chain_process
        print('Start chain')
        for i in range(len_list):
            self.chain_process({'content': i})

    def process(self, message=None):
        return message


class BeforeBranching(PipeApp):

    def init_app(self):
        pass

    def process(self, message=None):
        if message['content'] % 2:
            # route this flow to 2nd app list apps after branching
            message.update({'flags': [False, True]})
        else:
            # route this flow to 1st app list apps after branching
            message.update({'flags': [True, False]})
    return message


class EvenNumber(PipeApp):

    def init_app(self):
        pass

    def process(self, message=None):
    # process with even number
        message['content'] /= 2
    return message


class OddNumber(PipeApp):

    def init_app(self):
        pass

    def process(self, message=None):
    # process with odd number
    message['content'] -= 1
        return message


class PrintResult(PipeApp):

    def init_app(self, *args):
        pass

    def process(self, message=None):
        print('Result {}'.format)
        return message

Ở trên các bạn sẽ thấy biến len_listbatch_size và nó vào bằng settings.yml:

parameters:
    - len_list: 1000
    - batch_size: 10

Vậy là xong, chương trình sẽ tự tìm các file và ghép lại thành 1 file hoàn chỉnh, tức là nó sẽ đọc file services.yml để tìm các class, đọc file settings.yml để lấy giá trị đầu vào rồi dựa vào DI tạo ra 1 chương trình hoàn chỉnh.

Trong repo của project có các ví dụ khác, để hiểu rỏ hơn các bạn có thể vào đó để xem thêm. Hoặc có thể liên hệ trực tiếp mình :D

chạy chương trình:

$ ank -r

các bạn cũng có thể generate ra file _processor.py để hiểu rỏ hơn DI chạy như thế nào, bằng cách:

$ ank -p

và đương nhiên: python _processor.py cũng sẽ cho kết quả y hệt

CLI ank

Ở trên các bạn đã thấy 1 số lệnh của ank, Hãy xem chi tiết hơn về nó.
Hiện tại ank support Docker để generate, build, deploy và test.

Build Docker:

$ ank -b
$ docker run --entrypoint /bin/sh docker_image_id

Test with parameters from test-settings.yml

ank -t -f test-settings.yml

Những tính năng hy vọng sớm có mặt:

  • GUI
  • Monitor
  • Và sự đóng góp của tất cả các bạn.

@sunary 22-03-2017
bài này dc post tại blog của mình: www.aku.vn/blog/ank

Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

White

Nhat

1 bài viết.
4 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Bài viết liên quan
White
5 0
Tranh chấp tài nguyên là vấn đề lớn cần phải giải quyết trong việc xây dựng hệ thống lớn. Việc tranh chấp tài nguyên ảnh hưởng lớn tới hiệu năng và...
Nghia Minh Le viết gần 2 năm trước
5 0
White
2 0
Khi xây dựng hệ thống xử lý bất đồng bộ dựa trên message thì việc monitor có tính chất cốt tử. Đó là một thách thức rất khó khăn, không có một hệ t...
Nghia Minh Le viết gần 2 năm trước
2 0
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


White
{{userFollowed ? 'Following' : 'Follow'}}
1 bài viết.
4 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!