Kafka 101 - Part IV
Kafka Rebalancing Consumer
Kafka đạt được sự song song bằng cách sử dụng partitions để phân phối message trên nhiều hàng đợi, cho phép các consumer khác nhau trong cùng một consumer group xử lý song song. Để đạt được hiệu quả xử lý message đòi hỏi phải cân bằng khối lượng công việc đúng cách vì mất cân bằng tải có thể dẫn đến tắc nghẽn, vấn đề độ trễ và mất ổn định hệ thống tổng thể, dẫn đến nỗ lực bảo trì thêm hoặc phân bổ nguồn lực bổ sung. Khi một consumer được bắt đầu với một hoặc nhiều partition được phân vùng cho từng consumer. Quá trình ban đầu này được gọi là “Phân bổ phân vùng”. Sau khi ứng dụng được bắt đầu và chạy ổn định, vì nhiều lý do, Kafka consumer có thể quyết định cân bằng lại các phân bổ phân vùng này, vì vậy một consumer tích cực được chỉ định với cùng một phân vùng hoặc khác nhau và bắt đầu xử lý lại. Việc phân bổ lại phân vùng này được gọi là “Tái cân bằng lại phân vùng”.
Consumer Groups
Các consumer xử lý các message trong topic kafka hoạt động dưới dạng một consumer group. Các consumer được cấu hình với một group.id. Để bất kỳ trường hợp consumer nào có cùng group.id sẽ thược về cùng một consumer group.
⇒ Việc này giúp tăng khả năng mở rộng quy mô của consumer và cùng với việc tăng số lượng phân vùng trong một topic cung cấp một cơ chế để tăng thông lượng message.
Việc phân chia partition sẽ được Group Coordinator thuộc broker quản lý. Nhiệm vụ chính của nó là quản lý các consumer group và các consumers bên trong mỗi nhóm đó. Khi một consumer group được khởi tạo hoặc có thay đổi trong nhóm (như thêm hoặc bớt consumer), Group Coordinator sẽ chọn một consumer trong nhóm làm "leader consumer" (thường là consumer đầu tiên join group). Leader consumer sẽ nhận được danh sách tất cả các consumer trong group từ GroupCoordinator và có trách nhiệm tính toán cách phân bổ partitions của các topic mà nhóm đó subscribe cho mỗi consumer. Nó sử dụng một triển khai của PartitionAssignor để quyết định phân vùng nào sẽ được xử lý bởi consumer nào. Sau khi lead consumer tính toán xong, kết quả phân bổ partitions sẽ được gửi trở lại Group Coordinator.
Sau khi nhận kết quả phân bổ partitions Group Coordinator sau đó sẽ gán các partitions theo kết quả tính toán này cho các consumers tương ứng.Cuối cùng, Group Coordinator gửi thông tin phân bổ partitions đến từng consumer trong nhóm. Các consumers sẽ nhận biết được partitions nào chúng cần xử lý và bắt đầu quá trình xử lý dữ liệu. Quá trình này sẽ lặp đi lặp lại mỗi khi sự tái cân bằng xảy ra.
Note:
Có sự khác biệt giữa Group Coordinator và Leader Consumer. Group Coordinator là một trong những broker, còn Leader Consumer là một trong những consumer trong một group consumer.
Group Coordinator là một trong những broker nhận heartbeats hoặc polling message từ tất cả các consumer của một consumer group. Mỗi consumer group có một Group Coordinator. Nếu một consumer ngừng gửi heartbeats hoặc hết thời gian chờ polling thì Group Coordinator sẽ kích hoạt rebalancing.
Nếu có hai consumer group với group.id khác nhau cùng listen trên một topic thì hai group sẽ được phân vùng một cách độc lập và tiêu thụ một cách độc lập. Các partition của topic sẽ được phân bổ cho các consumer trong nhóm đó. Kafka đảm bảo rằng mỗi partition chỉ được tiêu thụ bởi một consumer duy nhất trong cùng một group để tránh trùng lặp dữ liệu.
Rebalance Triggers
Việc tái cân bằng có thể xay ra bởi nhiều lý do:
Một hoặc nhiều trường hợp khởi động lại.
Mở rộng hoặc thu hẹp số lượng consumer trong consumer group.
Bất kỳ consumer nào trong trạng thái bị treo không gửi thông tin heartbeats hoặc trong trạng thái không hoàn thành công việc trong khoảng thời gian cấu hình.
Bất kỳ một yêu cầu nào cần phân bổ lại tài nguyên.
Khi một consumer mới tham gia vào group, nó sẽ gửi yêu cầu JoinGroup đến Group Coordinator trên broker. Lúc này, các partition của topic sẽ được tái phân bổ cho tất cả consumer trong group. Tương tự, khi một consumer rời khỏi group, nó sẽ thông báo cho Group Coordinator thông qua yêu cầu LeaveGroup, và các partition sẽ được tái phân bổ cho các consumer còn lại (nếu có).
Nếu Group Coordinator không nhận được tín hiệu từ một consumer trong khoảng thời gian mong đợi, chẳng hạn như heartbeat hoặc lần gọi poll() tiếp theo, nó sẽ loại bỏ consumer đó khỏi group và tin rằng consumer đã gặp sự cố. Lúc này, các partition sẽ được tái phân bổ cho các consumer còn lại trong group.
Nếu một dịch vụ có nhiều consumer đăng ký vào các topic khác nhau nhưng dùng chung một group.id, bất kỳ sự tái cân bằng nào do một consumer gây ra cũng sẽ ảnh hưởng đến các consumer khác trong group. Ví dụ, Consumer A đăng ký vào topic abc, trong khi Consumer B đăng ký vào topic def và cả hai đều thuộc cùng một consumer group foo. Nếu Consumer A mất quá nhiều thời gian để xử lý một lô dữ liệu và bị loại khỏi group, thì việc tái cân bằng sẽ xảy ra. Tất cả các phân bổ partition trong group, bao gồm cả những phân bổ của Consumer B, sẽ bị thu hồi và tái phân bổ lại.
Khi Consumer A cuối cùng hoàn thành việc poll và tham gia lại vào consumer group, một lần tái cân bằng nữa sẽ được kích hoạt, và tất cả việc xử lý lại dừng lại khi các partition bị thu hồi và tái phân bổ. Do đó, việc định nghĩa các consumer group riêng biệt cho các consumer lắng nghe các topic khác nhau là một biện pháp khôn ngoan. Ví dụ, đặt tên theo cấu trúc [service]-[topic]-consumer-group.
Rebalance Configuration
Việc tái cân bằng trong kafka để đảm bảo tất cả các partition đều được giám sát và tiêu thụ bởi một consumer. Ngoài ra cần đảm bảo các consumer được cân bằng tải như nhau. Tuy nhiên việc tái cân bằng phải trả giá bằng việc tác động đến thông lượng. Việc tái cân bàng có thể mất vài giây đến vài phút. Trong khi việc tái cân bằng diễn ra các consumer trong group sẽ tạm dừng quá trình xử lý. Nó tuơng tự như cách hoạt động của garbage collection trong JVM. Việc dọn dẹp bộ bó là quan trọng nhưng quá nhiều tiến trình dọn dẹp xảy ra sẽ làm chậm toàn bộ quá trình xử lý JVM.
Chính vì vậy cần các giải pháp, cấu hình để giảm thiểu tối đa việc rebalancing.
Heartbeat and Session Timeout
Trong consumer group các consumer gửi heartbeat định kỳ cho Group Coordinator. Điều này cho phép Group Coordinator theo dõi các consumer trong group. Mỗi heartbeat phải được nhận trong session.timeout.ms và heartbeat được gửi dựa trên heartneat.interval.ms
Nên cấu hình **heartbeat.interval.ms** không quá một phần ba session.timeout.ms. Điều này đảm bảo rằng nếu một hoặc hai nhịp tim bị mất do vấn đề mạng thoáng qua, thì người tiêu dùng không được coi là đã thất bại.
Nếu người tiêu dùng thất bại và không gửi heartbeats thì nó sẽ bị đuổi khỏi consumer group sau khi hết thời gian chờ phiên, dẫn đến việc tái cân bằng consumer group.
Poll Interval
Heartbeats được thực hiện trên một luồng riêng biệt với luồng xử lý chính. Consumer poll các partition của topic trên luồng xử lý chính và mỗi lần gọi poll() phải diễn ra trong khoảng thời gian tối đa được cấu hình là max.poll.interval.ms. Sơ đồ dưới đây sẽ bổ sung luồng xử lý của consumer, thể hiện trách nhiệm của luồng này cùng với luồng heartbeat.
Lần đầu tiên gọi poll() và bất kỳ lần gọi poll() nào bao gồm các thay đổi như phân bổ lại partition, sẽ kích hoạt luồng heartbeat. Mỗi lần gọi poll() tiếp theo sẽ khởi động lại thời gian poll, đảm bảo rằng nó có toàn bộ khoảng thời gian max.poll.interval.ms để hoàn thành. Luồng heartbeat kiểm tra trạng thái của việc xử lý consumer, và nếu khoảng thời gian giữa các lần poll vượt quá max.poll.interval.ms, thay vì gửi một heartbeat, nó sẽ gửi yêu cầu LeaveGroup. Group Coordinator sau đó sẽ loại bỏ consumer khỏi consumer group và kích hoạt quá trình tái cân bằng.
Việc cấu hình max.poll.interval.ms cần được cân nhắc kỹ lưỡng. Nếu đặt quá thấp, nguy cơ là lô tin nhắn được tiêu thụ trong một lần poll sẽ không được xử lý kịp thời, dẫn đến tái cân bằng và việc gửi lại tin nhắn trùng lặp. Nếu đặt khoảng thời gian quá cao, khi một consumer gặp sự cố, broker sẽ mất nhiều thời gian hơn để nhận ra và phân bổ lại các partition của consumer đó. Trong thời gian này, các tin nhắn trên các partition được phân bổ cho consumer bị lỗi sẽ bị tắc nghẽn. Hoặc có thể điều chỉnh lại số lượng message trong một lô qua thông số max.poll.record
max.poll.records = (max.poll.interval.ms / 95th RT in ms (thời gian phản hồi trung bình của 95% request)) * ( Minimum of (số phân vùng, số consumer đang chạy)) * 0.8.
Việc rebalancing consumer có các chiến thuật nhất định, để biết rõ hơn các về các chiến thuật rebalancing hãy đón chờ phần tiếp theo!!!











