python codis集群客户端(二) - 基于zookeeper对实例创建与摘除
时间:2022-05-03
本文章向大家介绍python codis集群客户端(二) - 基于zookeeper对实例创建与摘除,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
在这一篇中我们实现了不通过zk来编写codis集群proxys的api,
如果codis集群暴露zk给你的话,那么就方便了,探活和故障摘除与恢复codis集群都给你搞定了,你只需要监听zookeeper中实例的状态就好了。
下面看我的实现。
1、CodisByZKPool.py
这里通过zk读取并初始化pool_shards,简单说一下如何故障摘除和恢复
1)我们监听zk中节点状态改变,当发现某个实例对应的节点状态变化了,比如DELETE了,那么我们认为这个实例挂了,我们就会重新_create_pool刷新shards列表,摘除故障实例。
2)同样,当我们发现节点CREATE,就是新增了实例,或者实例从崩溃中恢复了,我们也会重新_create_pool刷新shards列表,新增实例。
# -*- coding:utf-8 -*-
import redis
import logging
from kazoo.client import KazooClient
from Podis import Podis
from PickUp import RandomPickUp, PickUp
logger = logging.getLogger(__name__)
class CodisByZKPool(object):
def __init__(self, zk_config):
self._pool_shards = []
self.zk_config = zk_config
self.zk = self._init_zk()
def _init_zk(self):
return KazooClient(hosts=self.zk_config.get('hosts'), timeout=self.zk_config.get('timeout'))
def _create_pool(self):
try:
if not self.zk.connected:
self.zk.start()
address_list = self.zk.get_children(self.zk_config.get('path'), watch=self._watch_codis_instances)
for address in address_list:
host = address.split(':')[0]
port = address.split(':')[1]
self._pool_shards.append(
Podis(
redis.ConnectionPool(
host=host, port=port, db=0,
password=None,
max_connections=None
)
)
)
if len(self._pool_shards) == 0:
raise Exception('create pool failure!')
except Exception, ex:
raise
finally:
self.zk.stop()
def _watch_codis_instances(self, event):
if event.type == "CREATED" and event.state == "CONNECTED":
self._create_pool()
elif event.type == "DELETED" and event.state == "CONNECTED":
self._create_pool()
elif event.type == "CHANGED" and event.state == "CONNECTED":
self._create_pool()
elif event.type == "CHILD" and event.state == "CONNECTED":
self._create_pool()
else:
logger.error('failure: not cover this event - %s'.format(event.type))
def get_connection(self, pick_up=None):
if isinstance(pick_up, PickUp):
codisPool = pick_up.pick_up(self._pool_shards)
else:
pick_up = RandomPickUp()
codisPool = pick_up.pick_up(self._pool_shards)
return codisPool
def get_availables(self):
return self._pool_shards
2、负载均衡PickUp.py
跟上一篇一样,这里就不多说了。
# -*- coding:utf-8 -*-
import abc
import uuid
import threading
class PickUp(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self):
pass
@abc.abstractmethod
def pick_up(self, pool_list):
return
class RandomPickUp(PickUp):
def __init__(self):
PickUp.__init__(self)
def pick_up(self, pool_list):
pool_size = len(pool_list)
index = abs(hash(uuid.uuid4())) % pool_size
pool = pool_list[index]
print "RandomPickUp, 拿到第", index, "个pool"
return pool
class RoundRobinPickUp(PickUp):
def __init__(self):
PickUp.__init__(self)
self.index = 0
self.round_robin_lock = threading.Lock()
def pick_up(self, pool_list):
with self.round_robin_lock:
pool_size = len(pool_list)
self.index += 1
index = abs(self.index) % pool_size
pool = pool_list[index]
print "RoundRobinPickUp, 拿到第", index, "个pool"
return pool
3、配置文件
这里就只用zk_config就可以了,我们认为在zk中已经有所有的codisproxy实例的address了。
codis_config = {
'addrs': '100.90.186.47:3000,100.90.187.33:3000'
}
zk_config = {
'hosts': '10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181',
'timeout': 10,
'path': '/codis/instances'
}
4、链接类Podis.py
# -*- coding:utf-8 -*-
import redis
import logging
import traceback
logger = logging.getLogger(__name__)
def redis_getter(func):
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
return result or None
except Exception, ex:
logger.error(traceback.format_exc())
raise
return wrapper
def redis_setter(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
return True
except Exception, ex:
logger.error(traceback.format_exc())
raise
return wrapper
class Podis(object):
def __init__(self, pool):
self._connection = redis.StrictRedis(connection_pool=pool)
@redis_getter
def ping(self):
return self._connection.ping()
@redis_getter
def get(self, key):
return self._connection.get(key)
@redis_setter
def set(self, key, value):
self._connection.set(key, value)
@redis_setter
def lpush(self, key, *value):
self._connection.lpush(key, *value)
@redis_getter
def lpop(self, key):
return self._connection.lpop(key)
@redis_getter
def lrange(self, key, start, end):
return self._connection.lrange(key, start, end)
@redis_setter
def sadd(self, key, *value):
self._connection.sadd(key, *value)
@redis_setter
def srem(self, key, *value):
self._connection.srem(key, *value)
@redis_getter
def zrange(self,key,start,end):
return self._connection.zrange(key,start,end)
@redis_getter
def zrevrange(self,key,start,end):
return self._connection.zrevrange(key,start,end,withscores=True)
@redis_getter
def zscore(self,key,*value):
return self._connection.zscore(key,value)
@redis_setter
def zadd(self,key,score,*value):
self._connection.zadd(key,score,value)
@redis_getter
def smembers(self, key):
return self._connection.smembers(key)
@redis_getter
def hgetall(self, key):
return self._connection.hgetall(key)
@redis_getter
def hget(self, key, name):
return self._connection.hget(key, name)
@redis_getter
def hkeys(self, key):
return self._connection.hkeys(key)
@redis_setter
def hset(self, key, name, value):
self._connection.hset(key, name, value)
@redis_setter
def hmset(self, name, mapping):
self._connection.hmset(name, mapping)
@redis_setter
def hdel(self, key, name):
self._connection.hdel(key, name)
@redis_setter
def delete(self, *key):
self._connection.delete(*key)
# codis不支持
@redis_getter
def keys(self, pattern):
return self._connection.keys(pattern)
@redis_setter
def expire(self, key, time):
return self._connection.expire(key, time)
@redis_getter
def ttl(self, key):
return self._connection.ttl(key)
5、例子
import sys
sys.path.append('../')
import time
import threading
from pycodis.CodisConfig import zk_config
from pycodis.CodisByZKPool import CodisByZKPool
from pycodis.PickUp import RoundRobinPickUp
codis_pool1 = CodisByZKPool(zk_config)
print '------1-------'
pick_up1 = RoundRobinPickUp()
print '------2-------'
codis_pool2 = CodisByZKPool(zk_config)
print '------3-------'
pick_up2 = RoundRobinPickUp()
print '------4-------'
def func(i):
for i in range(10):
podis1 = codis_pool1.get_connection(pick_up=pick_up1)
podis2 = codis_pool2.get_connection(pick_up=pick_up2)
podis1.delete(i)
podis2.delete(i)
time.sleep(1)
thread_list = []
for i in range(100):
thread_list.append(threading.Thread(target=func, args=[i]))
for thread in thread_list:
thread.setDaemon(True)
thread.start()
time.sleep(10)
- Python 工匠:编写条件分支代码的技巧
- python3 下 Zabbix监控调用graph.get并且下载监控图
- 用R语言复盘美国总统大选结果~
- 最新Apache Spark平台的NLP库,助你轻松搞定自然语言处理任务
- 使用Seq2Seq+attention实现简单的Chatbot
- R语言可视化——用ggplot构造期待已久的雷达图
- 【实战】最新Deep Learning with Keras图书加代码,教你从零开发一个复杂深度学习模型(附下载)
- 基于zabbix 自动抓取每天监控数据!/usr/local/python/bin/python3.5
- 大过年的,一起来用Seq2Seq来作对联吧!
- python3 下调用zabbix api 获取多个机房的IP
- TensorFlow从0到1 - 14 - 交叉熵损失函数——防止学习缓慢
- 用优雅的配色来缔造图表专业主义~
- python文件名与包名冲突
- python3 Zabbix监控-api的使用-python
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- leetcode: explore-array-21 从排序数组中删除重复项
- leetcode: explore-array-22 买卖股票的最佳时机 II
- leetcode: explore-strings-32 反转字符串
- leetcode: explore-strings-33 反转字符串
- 02-leetcode: explore-strings-34 字符串中的第一个唯一字符
- 《操作系统》:理解一些基础概念
- Json与对象相互转换 - alibaba fastjson
- 2020-08-21
- Kubernetes Dashboard 与 LDAP 的集成
- Grafana Loki 简明教程
- 深度学习中7种最优化算法的可视化与理解
- 【基础详解】手磕实现 CNN卷积神经网络!
- 直播间源码android音视频开发
- 批量转化字符编码
- JAR项目部署