内网调用腾讯云其它数据库资源
概述
如果您希望您的云函数可以安全地访问您的腾讯云账号下的其它资源,如 MySQL、Redis、Kafka,甚至其它 CVM 上部署的服务等,您可以使用云函数的「内网互联」功能。开启内网互联功能之后,即可在云函数中通过内网 IP 访问该 VPC 下的相关资源。
内网互联的优势
- 安全性高:数据传输不经过公网,降低安全风险
- 性能优异:内网延迟低,带宽大,访问速度快
- 成本节约:避免公网流量费用
- 稳定可靠:内网环境更加稳定,减少网络波动影响
📄️ 配置内网互联
学习如何在云函数中配置和启用内网互联功能
📄️ 访问数据库服务
通过内网连接各种腾讯云数据库服务的详细示例
📄️ 访问其他服务
连接 CVM、容器服务等其他腾讯云资源
📄️ 最佳实践
内网互联的安全配置、性能优化和故障排查
配置内网互联
前提条件
- 您的腾讯云账号下已有 VPC 网络
- 目标资源(如数据库实例)已部署在该 VPC 中
- 云函数和目标资源在同一地域
配置步骤
1. 开启内网互联
在云函数控制台中配置内网互联:
- 登录 云函数控制台
- 选择对应的函数,进入函数配置页面
- 在网络配置部分,点击编辑
- 开启内网互联开关
- 选择目标 VPC 和子网
- 保存配置
2. 配置安全组
确保安全组规则允许云函数访问目标资源:
# 示例:允许云函数访问 MySQL(端口 3306)
入站规则:
- 协议:TCP
- 端口:3306
- 来源:云函数所在子网的 CIDR(如 10.0.1.0/24)
# 示例:允许云函数访问 Redis(端口 6379)
入站规则:
- 协议:TCP
- 端口:6379
- 来源:云函数所在子网的 CIDR
3. 获取内网地址
在对应的云服务控制台获取资源的内网访问地址:
| 服务类型 | 控制台位置 | 内网地址示例 |
|---|---|---|
| MySQL | 数据库 MySQL > 实例详情 | 10.0.1.100:3306 |
| Redis | 数据库 Redis > 实例详情 | 10.0.1.101:6379 |
| Kafka | 消息队列 CKafka > 实例详情 | 10.0.1.102:9092 |
| CVM | 云服务器 CVM > 实例详情 | 10.0.1.103:80 |
访问数据库服务
MySQL 数据库
- Node.js
- Python
const mysql = require('mysql2/promise');
// 使用内网地址连接 MySQL
const pool = mysql.createPool({
host: '10.0.1.100', // MySQL 内网地址
port: 3306,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
connectionLimit: 10,
acquireTimeout: 60000,
timeout: 60000
});
exports.main = async (event, context) => {
try {
const connection = await pool.getConnection();
try {
// 执行查询
const [rows] = await connection.query('SELECT * FROM users LIMIT 10');
return {
statusCode: 200,
body: {
success: true,
data: rows,
message: '查询成功'
}
};
} finally {
connection.release();
}
} catch (error) {
console.error('MySQL 连接失败:', error);
return {
statusCode: 500,
body: {
success: false,
error: error.message
}
};
}
};
import pymysql
import json
import os
def main_handler(event, context):
try:
# 使用内网地址连接 MySQL
connection = pymysql.connect(
host='10.0.1.100', # MySQL 内网地址
port=3306,
user=os.environ['DB_USER'],
password=os.environ['DB_PASSWORD'],
database=os.environ['DB_NAME'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
with connection:
with connection.cursor() as cursor:
# 执行查询
cursor.execute("SELECT * FROM users LIMIT 10")
result = cursor.fetchall()
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'data': result,
'message': '查询成功'
}, ensure_ascii=False)
}
except Exception as e:
print(f'MySQL 连接失败: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': str(e)
}, ensure_ascii=False)
}
Redis 缓存
- Node.js
- Python
const redis = require('redis');
// 创建 Redis 客户端(使用内网地址)
const client = redis.createClient({
host: '10.0.1.101', // Redis 内网地址
port: 6379,
password: process.env.REDIS_PASSWORD,
db: 0,
retry_strategy: (options) => {
if (options.error && options.error.code === 'ECONNREFUSED') {
return new Error('Redis 服务器拒绝连接');
}
if (options.total_retry_time > 1000 * 60 * 60) {
return new Error('重试时间超过 1 小时');
}
if (options.attempt > 10) {
return undefined;
}
return Math.min(options.attempt * 100, 3000);
}
});
exports.main = async (event, context) => {
try {
// 连接 Redis
await client.connect();
const { action, key, value } = event;
let result;
switch (action) {
case 'get':
result = await client.get(key);
break;
case 'set':
await client.set(key, value, 'EX', 3600); // 设置 1 小时过期
result = 'OK';
break;
case 'del':
result = await client.del(key);
break;
case 'exists':
result = await client.exists(key);
break;
default:
throw new Error('不支持的操作');
}
return {
statusCode: 200,
body: {
success: true,
data: result,
message: '操作成功'
}
};
} catch (error) {
console.error('Redis 操作失败:', error);
return {
statusCode: 500,
body: {
success: false,
error: error.message
}
};
} finally {
await client.quit();
}
};
import redis
import json
import os
def main_handler(event, context):
try:
# 使用内网地址连接 Redis
r = redis.Redis(
host='10.0.1.101', # Redis 内网地址
port=6379,
password=os.environ.get('REDIS_PASSWORD'),
db=0,
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5
)
action = event.get('action')
key = event.get('key')
value = event.get('value')
if action == 'get':
result = r.get(key)
elif action == 'set':
r.setex(key, 3600, value) # 设置 1 小时过期
result = 'OK'
elif action == 'del':
result = r.delete(key)
elif action == 'exists':
result = r.exists(key)
else:
raise ValueError('不支持的操作')
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'data': result,
'message': '操作成功'
}, ensure_ascii=False)
}
except Exception as e:
print(f'Redis 操作失败: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': str(e)
}, ensure_ascii=False)
}
Kafka 消息队列
- Node.js
- Python
const { Kafka } = require('kafkajs');
// 创建 Kafka 客户端(使用内网地址)
const kafka = Kafka({
clientId: 'scf-kafka-client',
brokers: ['10.0.1.102:9092'], // Kafka 内网地址
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD
}
});
exports.main = async (event, context) => {
const { action, topic, message, groupId } = event;
try {
if (action === 'produce') {
// 生产消息
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: topic,
messages: [{
key: Date.now().toString(),
value: JSON.stringify(message),
timestamp: Date.now()
}]
});
await producer.disconnect();
return {
statusCode: 200,
body: {
success: true,
message: '消息发送成功'
}
};
} else if (action === 'consume') {
// 消费消息
const consumer = kafka.consumer({ groupId: groupId || 'scf-group' });
await consumer.connect();
await consumer.subscribe({ topic: topic });
const messages = [];
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
messages.push({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
value: message.value?.toString(),
timestamp: message.timestamp
});
// 限制消息数量,避免超时
if (messages.length >= 10) {
await consumer.stop();
}
}
});
// 等待一段时间收集消息
await new Promise(resolve => setTimeout(resolve, 5000));
await consumer.disconnect();
return {
statusCode: 200,
body: {
success: true,
data: messages,
message: '消息消费成功'
}
};
}
} catch (error) {
console.error('Kafka 操作失败:', error);
return {
statusCode: 500,
body: {
success: false,
error: error.message
}
};
}
};
from kafka import KafkaProducer, KafkaConsumer
import json
import os
from datetime import datetime
def main_handler(event, context):
action = event.get('action')
topic = event.get('topic')
try:
if action == 'produce':
# 生产消息
producer = KafkaProducer(
bootstrap_servers=['10.0.1.102:9092'], # Kafka 内网地址
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username=os.environ['KAFKA_USERNAME'],
sasl_plain_password=os.environ['KAFKA_PASSWORD'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
message = event.get('message', {})
message['timestamp'] = datetime.now().isoformat()
future = producer.send(topic, message)
producer.flush()
producer.close()
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'message': '消息发送成功'
}, ensure_ascii=False)
}
elif action == 'consume':
# 消费消息
consumer = KafkaConsumer(
topic,
bootstrap_servers=['10.0.1.102:9092'],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username=os.environ['KAFKA_USERNAME'],
sasl_plain_password=os.environ['KAFKA_PASSWORD'],
group_id=event.get('groupId', 'scf-group'),
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=5000 # 5 秒超时
)
messages = []
for message in consumer:
messages.append({
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'key': message.key.decode('utf-8') if message.key else None,
'value': message.value,
'timestamp': message.timestamp
})
# 限制消息数量
if len(messages) >= 10:
break
consumer.close()
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'data': messages,
'message': '消息消费成功'
}, ensure_ascii=False)
}
except Exception as e:
print(f'Kafka 操作失败: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': str(e)
}, ensure_ascii=False)
}
访问其他服务
CVM 服务器
const axios = require('axios');
exports.main = async (event, context) => {
try {
// 访问 CVM 上的 HTTP 服务(使用内网地址)
const response = await axios({
method: 'GET',
url: 'http://10.0.1.103:8080/api/data', // CVM 内网地址
timeout: 10000,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.API_TOKEN}`
}
});
return {
statusCode: 200,
body: {
success: true,
data: response.data,
message: 'CVM 服务调用成功'
}
};
} catch (error) {
console.error('CVM 服务调用失败:', error);
return {
statusCode: 500,
body: {
success: false,
error: error.message
}
};
}
};
最佳实践
连接池管理
// 全局连接池,避免每次调用都创建新连接
let mysqlPool;
let redisClient;
function getMysqlPool() {
if (!mysqlPool) {
mysqlPool = mysql.createPool({
host: process.env.MYSQL_HOST,
port: 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
connectionLimit: 5, // 云函数环境建议较小的连接数
acquireTimeout: 60000,
timeout: 60000,
reconnect: true
});
}
return mysqlPool;
}
function getRedisClient() {
if (!redisClient) {
redisClient = redis.createClient({
host: process.env.REDIS_HOST,
port: 6379,
password: process.env.REDIS_PASSWORD,
retry_strategy: (options) => {
if (options.attempt > 3) return undefined;
return Math.min(options.attempt * 100, 3000);
}
});
}
return redisClient;
}
错误处理和重试
// 带重试机制的数据库操作
async function executeWithRetry(operation, maxRetries = 3) {
let lastError;
for (let i = 0; i < maxRetries; i++) {
try {
return await operation();
} catch (error) {
lastError = error;
// 判断是否为可重试的错误
if (isRetryableError(error) && i < maxRetries - 1) {
const delay = Math.pow(2, i) * 1000; // 指数退避
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
throw lastError;
}
function isRetryableError(error) {
const retryableCodes = [
'ECONNRESET',
'ETIMEDOUT',
'ENOTFOUND',
'ECONNREFUSED'
];
return retryableCodes.includes(error.code) ||
error.message.includes('Connection lost');
}
安全配置
// 环境变量配置示例
const config = {
mysql: {
host: process.env.MYSQL_HOST,
port: parseInt(process.env.MYSQL_PORT) || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE
},
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT) || 6379,
password: process.env.REDIS_PASSWORD
},
kafka: {
brokers: process.env.KAFKA_BROKERS?.split(',') || [],
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD
}
};
// 配置验证
function validateConfig() {
const required = [
'MYSQL_HOST', 'MYSQL_USER', 'MYSQL_PASSWORD', 'MYSQL_DATABASE',
'REDIS_HOST', 'REDIS_PASSWORD',
'KAFKA_BROKERS', 'KAFKA_USERNAME', 'KAFKA_PASSWORD'
];
const missing = required.filter(key => !process.env[key]);
if (missing.length > 0) {
throw new Error(`缺少必要的环境变量: ${missing.join(', ')}`);
}
}
性能监控
// 性能监控装饰器
function withMonitoring(operation, operationName) {
return async function(...args) {
const startTime = Date.now();
const requestId = Math.random().toString(36).substr(2, 9);
console.log(`[${requestId}] ${operationName} 开始`);
try {
const result = await operation.apply(this, args);
const duration = Date.now() - startTime;
console.log(`[${requestId}] ${operationName} 完成: ${duration}ms`);
// 记录慢操作
if (duration > 5000) {
console.warn(`[${requestId}] 慢操作检测: ${operationName} ${duration}ms`);
}
return result;
} catch (error) {
const duration = Date.now() - startTime;
console.error(`[${requestId}] ${operationName} 失败: ${duration}ms`, error.message);
throw error;
}
};
}
// 使用示例
const monitoredMysqlQuery = withMonitoring(
async (sql, params) => {
const pool = getMysqlPool();
const connection = await pool.getConnection();
try {
return await connection.query(sql, params);
} finally {
connection.release();
}
},
'MySQL查询'
);
故障排查
常见问题
无法连接到内网资源
可能原因:
- 内网互联未正确配置
- 安全组规则阻止访问
- 目标服务未启动或地址错误
排查步骤:
- 检查云函数的 VPC 配置是否正确
- 验证安全组规则是否允许相应端口
- 确认目标资源的内网地址和端口
- 在云函数中使用 ping 或 telnet 测试连通性
// 连通性测试代码
const net = require('net');
function testConnection(host, port, timeout = 5000) {
return new Promise((resolve, reject) => {
const socket = new net.Socket();
socket.setTimeout(timeout);
socket.on('connect', () => {
socket.destroy();
resolve(true);
});
socket.on('timeout', () => {
socket.destroy();
reject(new Error('连接超时'));
});
socket.on('error', (error) => {
reject(error);
});
socket.connect(port, host);
});
}
连接频繁断开
解决方案:
- 配置连接池和重连机制
- 适当设置超时时间
- 实现健康检查
// 健康检查示例
async function healthCheck() {
try {
await testConnection(process.env.MYSQL_HOST, 3306);
await testConnection(process.env.REDIS_HOST, 6379);
return { status: 'healthy' };
} catch (error) {
return { status: 'unhealthy', error: error.message };
}
}
监控和日志
// 统一日志记录
function logger(level, message, extra = {}) {
const logEntry = {
timestamp: new Date().toISOString(),
level,
message,
requestId: context.requestId,
...extra
};
console.log(JSON.stringify(logEntry));
}
// 使用示例
exports.main = async (event, context) => {
logger('INFO', '函数开始执行', { event });
try {
// 业务逻辑
const result = await processRequest(event);
logger('INFO', '函数执行成功', { result });
return result;
} catch (error) {
logger('ERROR', '函数执行失败', { error: error.message, stack: error.stack });
throw error;
}
};
相关文档
📄️ 内网互联配置
详细的内网互联功能配置指南
提示
- 内网互联功能仅支持同一地域的资源访问
- 建议使用连接池来提高性能和资源利用率
- 重要操作务必实现重试机制和错误处理
- 定期监控连接状态和性能指标
注意
- 确保安全组规则配置正确,避免安全风险
- 内网地址可能会发生变化,建议使用域名或配置中心
- 注意云函数的执行时间限制,避免长时间连接
- 生产环境中务必配置适当的超时时间和重试策略