Bạn có chắc chắn muốn xóa bài viết này không ?
Bạn có chắc chắn muốn xóa bình luận này không ?
ank - python Microservice: Giới thiệu và hướng dẫn
ank
Microservice framework
Source: https://github.com/sunary/ank
Giới thiệu
Thực sự thì mình khá thích ý tưởng chia bài toán ra các processor nhỏ hơn, và có thể hoạt động độc lập mà không ảnh hưởng/phụ thuộc vào môi trường. Các processor này sẽ được ghép nối với nhau dựa trên yêu cầu của bài toán, và có thể dễ dàng thay thế chỉnh sửa từng proccesor mà không ảnh hường đến các processor khác.
ank ra đời là vì vậy, ank giúp các bạn viết các app độc lập và hổ trợ lắp ghép với nhau (pipeline và braching) mà không làm mất đi sự trong sáng của chương trình.
Xin nói qua 1 chút ưu/nhược điểm để các bạn có thể hình dung:
Ưu nhược điểm
-
Ưu điểm
- Python: Dành cho những ai viết và yêu thích python
- Nhẹ: Tất nhiên
-
Trong sáng: Tất cả
App
đều là của bạn, ank chỉ hổ trợ phần kết nối với nhau và với queue, scheduler và API-interface - Docker
- Luôn luôn lắng nghe :D
-
Nhược điểm
- Chưa tập trung vào bài toán tối ưu
- Ít tài liệu: Ngoại trừ blog này và docs trong github thì mình chưa publish gì cả, hy vọng sẽ có tài liệu chính thức trên readthedoc.
- Và dev còn thiếu kinh nghiệm cover các case còn lại.
Cấu trúc
Các khái niệm:
Trước khi đi vào chi tiết, mình đưa ra 1 số định nghĩa mình dùng trong bài viết và chương trình:
-
App: Là 1 tác vụ nhỏ nhất có thể hoạt độngtheo định nghĩa higher-order function, có thể là 1 hàm để log message, post message lên queue hay cũng là 1 hàm để control branching. Nôm na là các hàm thành phần mà các bạn tạo ra tùy theo style code của 1 Worker. ank có các
App
hổ trợ trong việc quản lý luồng:- LogApp: Đơn giản là log lại mỗi message qua nó
- Consumer: nhận message từ queue (support: rabbitMQ, zmq và kafka)
- Producer: post message lên queue
- JoinApp: Ví dụ mỗi lần bạn xử lý mỗi message, và sẽ tối ưu hơn nếu App phía trước xử lý 100 message 1 lần thay vì 100 lần xử lý, JoinProcessor sẽ đứng ở phía trước và gom đủ số message cần thiết rồi mới đi xử lý
- SplitApp: Tất nhiên, bạn đã gom lại thì phải split ra chứ.
-
Worker: Là tập hợp các
App
để 1Worker
có thể ghép nối với các Worker khác thông qua queue message, tức là nó sẽ có cácApp
input/output và cácApp
xử lý, có thể 1 hoặc nhiều và việc của bạn là khai báo chúng vào 1 chain. -
Chain: Là chất kết dính các
App
lại với nhau, và kiêm luôn quản lý luồng. Tức là sao? Chain process sẽ quản lý luôn việc chọn hướng đi và số lượng message qua mỗiApp-process
, còn việc quản lý luồng củaWorker
thì bạn phải cấu hình bên queue message -
Branching: Là cấu trúc rẻ nhánh trong mỗi
App
, để thực hiện việc này thì bạn phải khai báo trongChain
vàApp-braching
kiểu trả về là (message, list flags củaApp
tiếp theo sẽ đi) -
Message: Là message, là thông tin mỗi
App
nhận vào để xử lý và xuất ra sau khi xử lý xong. Định dạng{'content': 'Nội dung message', 'flags': 'là list boolean, chỉ dùng trong model 1 - n, tức sẽ xử lý processor kế tiếp nếu index của
flagsis True'}
Framework hổ trợ 3 loại App
phổ biến hiện nay đó là:
-
PipeApp: Là
App
cơ bản nhất trong Pipeline, bạn phải khai báo input và output cho nó. Thường được dùng để đọc message từ 1 queue và output là push lên 1 queue khác, và cái này đã được ank hổ trợ (rabbitMQ, zmq và kafka). - ScheduleApp: Được xem như là 1 crond-job, nó ko có input bởi khi đến thời gian bạn khai báo (theo format của contab) nó sẽ tự chạy 1 process được khai báo từ trước và bạn chỉ thiết lập output thôi.
- APIApp: cung cấp interface REST-API giùm bạn, được chạy trên Flask. Những gì bạn cần làm là khai báo 1 hàm và ank sẽ tự route request của bạn đến đúng hàm của bạn đã khai báo dựa trên tên hàm.
Chain model hổ trợ:
- 1 - 1:
processor - processor
- 1 - n:
/ processor processor - processor \ processor
- n - 1:
processor \ processor - processor # get message at the first processor only processor /
- join message:
message \ message - [message, message, message] message /
- split message:
/ message [message, message, message] - message \ message
Config
Để viết 1 chương trình hòan chỉnh, ngoài các app class, bạn cần phải khai báo thêm 2 file services.yml
và settings.yml
, format của 2 file này là yaml
.
services.yml
Chưa thông tin về luồng của chương trình. Chương trình cần tham số gì, các tham số đó được phân bố như thế nào đều được tổ chức ở đây:
- ví dụ
services:
App:
class: demo.App
arguments: ['%app_var%', '$Ojbect$']
Ojbect:
class: demo.Object
arguments: '%object_var%'
NextApp:
class: demo.NextApp
arguments: ~
chains:
- App
- NextApp
Theo đó, chương trình sẽ chạy từ App
với các tham số là giá trị app_var
và instance của Object(object_var)
, rồi NextApp
không có tham số kèm theo
settings.yml
Ở đây sẽ chứa thông tin các giá trị của các biến trên services.yml
, cụ thể là:
parameters:
- app_var: 15
- object_var: 'text'
Demo đi
Nãy h toàn là chữ, nên mình xin demo bài toán đơn giản sử dụng hầu hết các App
có sắn của ank
Bài toán:
Cho dãy các chữ số nguyên, nếu là số chẵn thì chia cho 2, nếu là số lẽ thì -1
khai báo services.yml
trước:
services:
- InitApp:
class: processor.InitApp
arguments: '%len_list%'
- JoinApp:
class: apps.join_app.JoinApp
arguments: '%batch_size%'
- LogApp:
class: apps.log_app.LogApp
arguments: ~
- SplitApp:
class: apps.split_app.SplitApp
arguments: ~
- BeforeBranching:
class: processor.BeforeBranching
arguments: ~
- EvenNumber:
class: processor.EvenNumber
arguments: ~
- OddNumber:
class: processor.OddNumber
arguments: ~
- PrintResult:
class: processor.PrintResult
arguments: ~
chains:
- InitApp
- JoinApp
- SplitApp
- LogApp
- BeforeBranching
- [EvenNumber, OddNumber]
- PrintResult
Nhìn vào chains của services.yml
, bạn cũng đoán được chương trình sẽ chạy như thế nào rồi, bây h bắt đầu code nè, đặt tên là processor.py
vì trong services.yml
bạn lấy các class từ đây:
from base_apps.pipe_app import PipeApp
class InitApp(PipeApp):
def init_app(self, len_list=None):
self.len_list = len_list
def start(self):
# Your self.process become self.chain_process
print('Start chain')
for i in range(len_list):
self.chain_process({'content': i})
def process(self, message=None):
return message
class BeforeBranching(PipeApp):
def init_app(self):
pass
def process(self, message=None):
if message['content'] % 2:
# route this flow to 2nd app list apps after branching
message.update({'flags': [False, True]})
else:
# route this flow to 1st app list apps after branching
message.update({'flags': [True, False]})
return message
class EvenNumber(PipeApp):
def init_app(self):
pass
def process(self, message=None):
# process with even number
message['content'] /= 2
return message
class OddNumber(PipeApp):
def init_app(self):
pass
def process(self, message=None):
# process with odd number
message['content'] -= 1
return message
class PrintResult(PipeApp):
def init_app(self, *args):
pass
def process(self, message=None):
print('Result {}'.format)
return message
Ở trên các bạn sẽ thấy biến len_list
và batch_size
và nó vào bằng settings.yml
:
parameters:
- len_list: 1000
- batch_size: 10
Vậy là xong, chương trình sẽ tự tìm các file và ghép lại thành 1 file hoàn chỉnh, tức là nó sẽ đọc file services.yml
để tìm các class, đọc file settings.yml
để lấy giá trị đầu vào rồi dựa vào DI tạo ra 1 chương trình hoàn chỉnh.
Trong repo của project có các ví dụ khác, để hiểu rỏ hơn các bạn có thể vào đó để xem thêm. Hoặc có thể liên hệ trực tiếp mình :D
chạy chương trình:
$ ank -r
các bạn cũng có thể generate ra file _processor.py
để hiểu rỏ hơn DI chạy như thế nào, bằng cách:
$ ank -p
và đương nhiên: python _processor.py
cũng sẽ cho kết quả y hệt
CLI ank
Ở trên các bạn đã thấy 1 số lệnh của ank, Hãy xem chi tiết hơn về nó.
Hiện tại ank support Docker để generate, build, deploy và test.
Build Docker:
$ ank -b
$ docker run --entrypoint /bin/sh docker_image_id
Test with parameters from test-settings.yml
ank -t -f test-settings.yml
Những tính năng hy vọng sớm có mặt:
- GUI
- Monitor
- Và sự đóng góp của tất cả các bạn.
@sunary 22-03-2017


