内网调用腾讯云其它资源
概述
如果您希望您的云函数可以安全地访问您的腾讯云账号下的其它资源,如 MySQL、Redis、Kafka,甚至其它 CVM 上部署的服务等,您可以使用云函数的「内网互联」功能。开启内网互联功能之后,即可在云函数中通过内网 IP 访问该 VPC 下的相关资源。
内网互联的优势
- 安全性高:数据传输不经过公网,降低安全风险
- 性能优异:内网延迟低,带宽大,访问速度快
- 成本节约:避免公网流量费用
- 稳定可靠:内网环境更加稳定,减少网络波动影响
配置内网互联
学习如何在云函数中配置和启用内网互联功能
访问数据库服务
通过内网连接各种腾讯云数据库服务的详细示例
访问其他服务
连接 CVM、容器服务等其他腾讯云资源
最佳实践
内网互联的安全配置、性能优化和故障排查
配置内网互联
前提条件
- 您的腾讯云账号下已有 VPC 网络
- 目标资源(如数据库实例)已部署在该 VPC 中
- 云函数和目标资源在同一地域
配置步骤
1. 开启内网互联
在云函数控制台中配置内网互联:
- 登录 云开发平台/云函数
- 选择对应的函数,进入函数配置页面
- 在网络配置部分,点击编辑
- 开启内网互联开关
- 选择目标 VPC 和子网
- 保存配置
2. 配置安全组
确保安全组规则允许云函数访问目标资源:
# 示例:允许云函数访问 MySQL(端口 3306)
入站规则:
- 协议:TCP
- 端口:3306
- 来源:云函数所在子网的 CIDR(如 10.0.1.0/24)
# 示例:允许云函数访问 Redis(端口 6379)
入站规则:
- 协议:TCP
- 端口:6379
- 来源:云函数所在子网的 CIDR
3. 获取内网地址
在对应的云服务控制台获取资源的内网访问地址:
| 服务类型 | 控制台位置 | 内网地址示例 |
|---|---|---|
| MySQL | 数据库 MySQL > 实例详情 | 10.0.1.100:3306 |
| Redis | 数据库 Redis > 实例详情 | 10.0.1.101:6379 |
| Kafka | 消息队列 CKafka > 实例详情 | 10.0.1.102:9092 |
| CVM | 云服务器 CVM > 实例详情 | 10.0.1.103:80 |
访问数据库服务
MySQL 数据库
- Node.js
- Python
const mysql = require('mysql2/promise');
// 使用内网地址连接 MySQL
const pool = mysql.createPool({
host: '10.0.1.100', // MySQL 内网地址
port: 3306,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
connectionLimit: 10,
acquireTimeout: 60000,
timeout: 60000
});
exports.main = async (event, context) => {
try {
const connection = await pool.getConnection();
try {
// 执行查询
const [rows] = await connection.query('SELECT * FROM users LIMIT 10');
return {
statusCode: 200,
body: {
success: true,
data: rows,
message: '查询成功'
}
};
} finally {
connection.release();
}
} catch (error) {
console.error('MySQL 连接失败:', error);
return {
statusCode: 500,
body: {
success: false,
error: error.message
}
};
}
};
import pymysql
import json
import os
def main_handler(event, context):
try:
# 使用内网地址连接 MySQL
connection = pymysql.connect(
host='10.0.1.100', # MySQL 内网地址
port=3306,
user=os.environ['DB_USER'],
password=os.environ['DB_PASSWORD'],
database=os.environ['DB_NAME'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
with connection:
with connection.cursor() as cursor:
# 执行查询
cursor.execute("SELECT * FROM users LIMIT 10")
result = cursor.fetchall()
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'data': result,
'message': '查询成功'
}, ensure_ascii=False)
}
except Exception as e:
print(f'MySQL 连接失败: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': str(e)
}, ensure_ascii=False)
}
Redis 缓存
- Node.js
- Python
const redis = require('redis');
// 创建 Redis 客户端(使用内网地址)
const client = redis.createClient({
host: '10.0.1.101', // Redis 内网地址
port: 6379,
password: process.env.REDIS_PASSWORD,
db: 0,
retry_strategy: (options) => {
if (options.error && options.error.code === 'ECONNREFUSED') {
return new Error('Redis 服务器拒绝连接');
}
if (options.total_retry_time > 1000 * 60 * 60) {
return new Error('重试时间超过 1 小时');
}
if (options.attempt > 10) {
return undefined;
}
return Math.min(options.attempt * 100, 3000);
}
});
exports.main = async (event, context) => {
try {
// 连接 Redis
await client.connect();
const { action, key, value } = event;
let result;
switch (action) {
case 'get':
result = await client.get(key);
break;
case 'set':
await client.set(key, value, 'EX', 3600); // 设置 1 小时过期
result = 'OK';
break;
case 'del':
result = await client.del(key);
break;
case 'exists':
result = await client.exists(key);
break;
default:
throw new Error('不支持的操作');
}
return {
statusCode: 200,
body: {
success: true,
data: result,
message: '操作成功'
}
};
} catch (error) {
console.error('Redis 操作失败:', error);
return {
statusCode: 500,
body: {
success: false,
error: error.message
}
};
} finally {
await client.quit();
}
};
import redis
import json
import os
def main_handler(event, context):
try:
# 使用内网地址连接 Redis
r = redis.Redis(
host='10.0.1.101', # Redis 内网地址
port=6379,
password=os.environ.get('REDIS_PASSWORD'),
db=0,
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5
)
action = event.get('action')
key = event.get('key')
value = event.get('value')
if action == 'get':
result = r.get(key)
elif action == 'set':
r.setex(key, 3600, value) # 设置 1 小时过期
result = 'OK'
elif action == 'del':
result = r.delete(key)
elif action == 'exists':
result = r.exists(key)
else:
raise ValueError('不支持的操作')
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'data': result,
'message': '操作成功'
}, ensure_ascii=False)
}
except Exception as e:
print(f'Redis 操作失败: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': str(e)
}, ensure_ascii=False)
}
Kafka 消息队列
- Node.js
- Python
const { Kafka } = require('kafkajs');
// 创建 Kafka 客户端(使用内网地址)
const kafka = Kafka({
clientId: 'scf-kafka-client',
brokers: ['10.0.1.102:9092'], // Kafka 内网地址
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD
}
});
exports.main = async (event, context) => {
const { action, topic, message, groupId } = event;
try {
if (action === 'produce') {
// 生产消息
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: topic,
messages: [{
key: Date.now().toString(),
value: JSON.stringify(message),
timestamp: Date.now()
}]
});
await producer.disconnect();
return {
statusCode: 200,
body: {
success: true,
message: '消息发送成功'
}
};
} else if (action === 'consume') {
// 消费消息
const consumer = kafka.consumer({ groupId: groupId || 'scf-group' });
await consumer.connect();
await consumer.subscribe({ topic: topic });
const messages = [];
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
messages.push({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
value: message.value?.toString(),
timestamp: message.timestamp
});
// 限制消息数量,避免超时
if (messages.length >= 10) {
await consumer.stop();
}
}
});
// 等待一段时间收集消息
await new Promise(resolve => setTimeout(resolve, 5000));
await consumer.disconnect();
return {
statusCode: 200,
body: {
success: true,
data: messages,
message: '消息消费成功'
}
};
}
} catch (error) {
console.error('Kafka 操作失败:', error);
return {
statusCode: 500,
body: {
success: false,
error: error.message
}
};
}
};
from kafka import KafkaProducer, KafkaConsumer
import json
import os
from datetime import datetime
def main_handler(event, context):
action = event.get('action')
topic = event.get('topic')
try:
if action == 'produce':
# 生产消息
producer = KafkaProducer(
bootstrap_servers=['10.0.1.102:9092'], # Kafka 内网地址
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username=os.environ['KAFKA_USERNAME'],
sasl_plain_password=os.environ['KAFKA_PASSWORD'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
message = event.get('message', {})
message['timestamp'] = datetime.now().isoformat()
future = producer.send(topic, message)
producer.flush()
producer.close()
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'message': '消息发送成功'
}, ensure_ascii=False)
}
elif action == 'consume':
# 消费消息
consumer = KafkaConsumer(
topic,
bootstrap_servers=['10.0.1.102:9092'],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username=os.environ['KAFKA_USERNAME'],
sasl_plain_password=os.environ['KAFKA_PASSWORD'],
group_id=event.get('groupId', 'scf-group'),
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=5000 # 5 秒超时
)
messages = []
for message in consumer:
messages.append({
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'key': message.key.decode('utf-8') if message.key else None,
'value': message.value,
'timestamp': message.timestamp
})
# 限制消息数量
if len(messages) >= 10:
break
consumer.close()
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'data': messages,
'message': '消息消费成功'
}, ensure_ascii=False)
}
except Exception as e:
print(f'Kafka 操作失败: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': str(e)
}, ensure_ascii=False)
}
访问其他服务
CVM 服务器
const axios = require('axios');
exports.main = async (event, context) => {
try {
// 访问 CVM 上的 HTTP 服务(使用内网地址)
const response = await axios({
method: 'GET',
url: 'http://10.0.1.103:8080/api/data', // CVM 内网地址
timeout: 10000,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.API_TOKEN}`
}
});
return {
statusCode: 200,
body: {
success: true,
data: response.data,
message: 'CVM 服务调用成功'
}
};
} catch (error) {
console.error('CVM 服务调用失败:', error);
return {
statusCode: 500,
body: {
success: false,
error: error.message
}
};
}
};
最佳实践
连接池管理
// 全局连接池,避免每次调用都创建新连接
let mysqlPool;
let redisClient;
function getMysqlPool() {
if (!mysqlPool) {
mysqlPool = mysql.createPool({
host: process.env.MYSQL_HOST,
port: 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
connectionLimit: 5, // 云函数环境建议较小的连接数
acquireTimeout: 60000,
timeout: 60000,
reconnect: true
});
}
return mysqlPool;
}
function getRedisClient() {
if (!redisClient) {
redisClient = redis.createClient({
host: process.env.REDIS_HOST,
port: 6379,
password: process.env.REDIS_PASSWORD,
retry_strategy: (options) => {
if (options.attempt > 3) return undefined;
return Math.min(options.attempt * 100, 3000);
}
});
}
return redisClient;
}
错误处理和重试
// 带重试机制的数据库操作
async function executeWithRetry(operation, maxRetries = 3) {
let lastError;
for (let i = 0; i < maxRetries; i++) {
try {
return await operation();
} catch (error) {
lastError = error;
// 判断是否为可重试的错误
if (isRetryableError(error) && i < maxRetries - 1) {
const delay = Math.pow(2, i) * 1000; // 指数退避
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
throw lastError;
}
function isRetryableError(error) {
const retryableCodes = [
'ECONNRESET',
'ETIMEDOUT',
'ENOTFOUND',
'ECONNREFUSED'
];
return retryableCodes.includes(error.code) ||
error.message.includes('Connection lost');
}
安全配置
// 环境变量配置示例
const config = {
mysql: {
host: process.env.MYSQL_HOST,
port: parseInt(process.env.MYSQL_PORT) || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE
},
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT) || 6379,
password: process.env.REDIS_PASSWORD
},
kafka: {
brokers: process.env.KAFKA_BROKERS?.split(',') || [],
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD
}
};
// 配置验证
function validateConfig() {
const required = [
'MYSQL_HOST', 'MYSQL_USER', 'MYSQL_PASSWORD', 'MYSQL_DATABASE',
'REDIS_HOST', 'REDIS_PASSWORD',
'KAFKA_BROKERS', 'KAFKA_USERNAME', 'KAFKA_PASSWORD'
];
const missing = required.filter(key => !process.env[key]);
if (missing.length > 0) {
throw new Error(`缺少必要的环境变量: ${missing.join(', ')}`);
}
}